Rx Observable emitiendo valores periódicamente
Tengo que consultar periódicamente algunos puntos finales RESTful para actualizar los datos de mi aplicación Android. También tengo que hacer una pausa y reanudarlo basado en la conectividad (si el teléfono está desconectado, no hay necesidad de intentarlo). Mi solución actual está funcionando, pero utiliza ScheduledExecutorService
estándar de Java para realizar tareas periódicas, pero me gustaría permanecer en el paradigma de Rx.
Aquí está mi código actual, partes de las cuales se saltan por brevedad.
- RxJava para Android: Exponer Excepción y Reintentar (con retraso)
- RxAndroid textview eventos llamados automáticamente antes de cambios de texto eventos
- RXJava - buffer observable 1 hasta que se pueda observar 2 emite un elemento
- Cancelar la suscripción de un rx.Single en RxJava
- RxBindings para Spinner?
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() { @Override public void call(final Subscriber<? super UserProfile> subscriber) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); final Runnable runnable = new Runnable() { @Override public void run() { // making http request here } }; final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1); networkStatusObservable.subscribe(new Action1<Boolean>() { @Override public void call(Boolean networkAvailable) { if (!networkAvailable) { pause(); } else { pause(); futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS)); } } private void pause() { for (ScheduledFuture<?> future : futures) { future.cancel(true); } futures.clear(); } }); final Subscription subscription = new Subscription() { private boolean isUnsubscribed = false; @Override public void unsubscribe() { scheduledExecutorService.shutdownNow(); isUnsubscribed = true; } @Override public boolean isUnsubscribed() { return isUnsubscribed; } }; subscriber.add(subscription); } }).multicast(BehaviorSubject.create()).refCount();
networkStatusObservable
es básicamente un receptor de difusión envuelto en Observable<Boolean>
, lo que indica que el teléfono está conectado a la red.
Como he dicho, esta solución está funcionando, pero quiero usar Rx enfoque para la encuesta periódica y la emisión de nuevo UserProfile
s, porque hay numerosos problemas con la programación de las cosas manualmente, que quiero evitar. Sé sobre Observable.timer
y Observable.interval
, pero no puedo averiguar cómo aplicarlos a esta tarea (y no estoy seguro si necesito usarlos en absoluto).
- Cómo manejar correctamente onError dentro de RxJava (Android)?
- Cómo observar los cambios de red en RxAndroid
- InterruptedException en el subproceso de caché RxJava al depurar en Android
- Robolectric Test no llama a textWatcher.onTextChanged
- Manejar errores en Retrofit 2 RX
- ¿Cómo crear un observable de Hashmap?
- Realizar N llamadas de api secuenciales usando RxJava y Retrofit
- Realm, RxJava, asObservable () y doOnUnsubscribe ()
Hay algunos enfoques en este problema de GitHub que puede ser útil.
https://github.com/ReactiveX/RxJava/issues/448
Las tres implementaciones son:
Observable.interval
Observable.interval(delay, TimeUnit.SECONDS).timeInterval() .flatMap(new Func1<Long, Observable<Notification<AppState>>>() { public Observable<Notification<AppState>> call(Long seconds) { return lyftApi.updateAppState(params).materialize(); } });
Scheduler.schedulePeriodically
Observable.create({ observer -> Schedulers.newThread().schedulePeriodically({ observer.onNext("application-state-from-network"); }, 0, 1000, TimeUnit.MILLISECONDS); }).take(10).subscribe({ v -> println(v) });
Recursividad manual
Observable.create(new OnSubscribeFunc<String>() { @Override public Subscription onSubscribe(final Observer<? super String> o) { return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() { @Override public Subscription call(Scheduler inner, Long t2) { o.onNext("data-from-polling"); return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS); } }); } }).toBlockingObservable().forEach(new Action1<String>() { @Override public void call(String v) { System.out.println("output: " + v); } });
Y la conclusión es que la recursión manual es el camino a seguir porque espera hasta que la operación se complete antes de programar la siguiente ejecución.
Una de las opciones es usar Observable.interval y comprobar el estado del usuario cuando se emiten los intervalos:
Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS); //pulling the user data Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() { Random random = new Random(); @Override public Observable<String> call(Long tick) { //here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations switch(random.nextInt(10)){ case 0://suppose this is for cases when network in not available or exception happens return Observable.<String>just(null); case 1: case 2: return Observable.just("Alice"); default: return Observable.just("Bob"); } } }); Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() { @Override public Observable<? extends String> call(Observable<String> stringObservable) { return stringObservable; } }); //filter valid data Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s != null; } }); //publish only changes Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();
Puede hacerlo aún más simple si su networkStatusObservable está emitiendo eventos al menos con la frecuencia que necesita para comprobar los datos de usuario
networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)
Por último se puede crear observable que utiliza el planificador para emitir los estados de usuario periódicamente – consulte la documentación Schedulers para saber qué planificador se ajusta a que necesita lo mejor:
public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{ private final Scheduler scheduler; private final long initialDelay; private final long period; private final TimeUnit unit; public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) { this.scheduler = scheduler; this.initialDelay = initialDelay; this.period = period; this.unit = unit; } abstract T next() throws Exception; @Override public void call(final Subscriber<? super T> subscriber) { final Scheduler.Worker worker = scheduler.createWorker(); subscriber.add(worker); worker.schedulePeriodically(new Action0() { @Override public void call() { try { subscriber.onNext(next()); } catch (Throwable e) { try { subscriber.onError(e); } finally { worker.unsubscribe(); } } } }, initialDelay, period, unit); } } //And here is the sample usage Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){ Random random = new Random(); @Override String next() throws Exception { //if you use Schedulers.io, you can call the remote service synchronously switch(random.nextInt(10)){ case 0: return null; case 1: case 2: return "Alice"; default: return "Bob"; } } });
Bueno, voy a publicar mi propia solución, tal vez alguien se beneficiará de ella. Sólo publicaré la parte relacionada con la pregunta, omitiendo el HTTP y las cosas en caché. Así es como lo hago:
private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable, final Observable<Boolean> pauseResumeObservable) { final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable, new Func2<Boolean, Boolean, Boolean>() { @Override public Boolean call(Boolean networkAvailable, Boolean mustPause) { return mustPause && networkAvailable; } } ).distinctUntilChanged(); final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() { @Override public Boolean call(Boolean networkAvailable) { return networkAvailable; } }); final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() { @Override public Boolean call(Boolean networkAvailable) { return !networkAvailable; } }); return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() { @Override public Observable<Long> call(Boolean shouldResumeRequests) { if (shouldResumeRequests) { long timeToUpdate; final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt(); timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis()); Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS)); return Observable .timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS) .takeUntil(hasToStopObservable); } else { Log.d(TAG, "Have to pause updates"); return Observable.<Long>never().takeUntil(hasToResumeObservable); } } }).multicast(PublishSubject.<Long>create()); }
Como puedes ver, las condiciones para pausar o reanudar las actualizaciones se vuelven un poco más complicadas, con un nuevo Observable añadido para apoyar la pausa cuando la aplicación va al fondo.
Entonces en el núcleo de la solución es la operación concatMap
que emite los Observables
secuencialmente (de ahí concatMap, no flatMap, vea esta pregunta: ¿Cuál es la diferencia entre concatMap y flatMap en RxJava ). Emite un interval
o never
Observables
, dependiendo de si las actualizaciones deben continuar o pausarse. Entonces cada Observable
emitido es takenUntil
un Observable
contrario 'emite nuevo valor.
ConnectableObservable
se devuelve porque el Observable
creado es caliente y todos los suscriptores deseados tienen que suscribirse a él antes de que comience a emitir algo, de lo contrario se podrían perder los eventos iniciales. Llamo a connect
más tarde.
Acepto mi u otra respuesta basada en votos, en su caso.
Hay una manera más sencilla de hacerlo utilizando interval (). He probado este código y funciona. Pero primero, debe encapsular el trabajo que desea ejecutar periódicamente en una subclase de Action1.
class Act<T> implements Action1<T> { public Service service; public String data; public void call(T t){ service.log(data); //the periodic job } }
(He mantenido los campos públicos por brevedad, pero eso no es aconsejable). Ahora puede programarlo de la siguiente manera:
Act<Long> act=new Act<>(); act.data="dummy data"; act.service=this; Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act);
Esto no bloqueará los hilos en ningún lugar, a diferencia del enfoque dado en la otra respuesta. Este enfoque nos permite pasar una variable como un tipo de almacenamiento mutable dentro de la Acción que podría ser útil en las invocaciones posteriores. Además, de esta manera usted podría suscribir su llamada en su propio grupo de hilos.
- ¿Qué es el Panel Terminal de Android Studio?
- Animación del tamaño de texto de TextView de Android y no de TextView