RxJava Observable.cache invalidate
Estoy tratando de aprender rxjava dentro del entorno de Android. Digamos que tengo un observable que emite el resultado de una llamada de red. Si he entendido correctamente, un enfoque muy común para hacer frente a los cambios de configuración es:
-
Almacenar el observable en un fragmento retenido / singleton / objeto de aplicación
-
Aplicar el operador de caché a la
-
Suscribirse / anular suscripción en los controladores de ciclo de vida adecuados
Haciendo esto, no perderíamos el resultado del observable que volverá a observarse una vez que la nueva configuración haya tenido lugar.
Ahora, mi pregunta es:
¿Hay una manera de forzar el observable a emitir un nuevo valor (e invalidar el en caché)? ¿Necesito crear un nuevo observable cada vez que quiero datos nuevos de la red (que no suena como una mala práctica en el mundo android porque haría que el gc hacer un trabajo extra)?
Muchas gracias,
Federico
- OnNext comience otro Observable
- ¿Cómo crear un Observable en Android?
- Cómo recibir la última emisión de secuencia al llamar a rx.Observable.sample ()?
- Cómo emitir elementos de una lista con retraso en RxJava?
- Retrofit / Rxjava y servicios basados en sesiones
- ¿Por qué Rxjava podría causar fugas de memoria?
- ¿Cómo puedo hacer que este zip rxjava funcione en paralelo?
- Limitar el rendimiento con RxJava
Realice una implementación personalizada de OnSubscribe
que haga lo que quiera:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> { private final AtomicBoolean refresh = new AtomicBoolean(true); private final Observable<T> source; private volatile Observable<T> current; public OnSubscribeRefreshingCache(Observable<T> source) { this.source = source; this.current = source; } public void reset() { refresh.set(true); } @Override public void call(Subscriber<? super T> subscriber) { if (refresh.compareAndSet(true, false)) { current = source.cache(); } current.unsafeSubscribe(subscriber); } }
Este bit de código demuestra el uso y muestra que la caché se está restableciendo esencialmente:
Observable<Integer> o = Observable.just(1) .doOnCompleted(() -> System.out.println("completed")); OnSubscribeRefreshingCache<Integer> cacher = new OnSubscribeRefreshingCache<Integer>(o); Observable<Integer> o2 = Observable.create(cacher); o2.subscribe(System.out::println); o2.subscribe(System.out::println); cacher.reset(); o2.subscribe(System.out::println);
Salida:
completed 1 1 completed 1
Por cierto, puede notar que .cache
no emite hasta la finalización. Este es un error que debe ser corregido por rxjava 1.0.14.
En términos de sus preocupaciones de la presión del GC, cada operador cuando se aplica a un observable crea un nuevo observable generalmente vía lift
o create
. El estado miembro base asociado con la creación de un nuevo Observable es la referencia a la función onSubscribe
. cache
es diferente de la mayoría en que mantiene el estado a través de suscripciones y esto tiene potencial para la presión del GC si tiene una gran cantidad de estado y se tira con frecuencia. Incluso si se utiliza la misma estructura de datos mutable para mantener el estado a través de reajustes GC todavía tendría que lidiar con el contenido de la estructura de datos cuando se borra por lo que no podría ganar mucho.
El operador de cache
RxJava se crea para varias suscripciones simultáneas. Usted puede probablemente imaginar una funcionalidad de restablecimiento podría resultar problemático para implementar. Por todos los medios plantear un problema en RxJava github si desea explorar más.
- Método onClick no funciona correctamente después de que NestedScrollView desplazado
- Cómo acelerar las creaciones de android ndk