Rx Observable emitiendo valores periódicamente

Tengo que consultar periódicamente algunos puntos finales RESTful para actualizar los datos de mi aplicación Android. También tengo que hacer una pausa y reanudarlo basado en la conectividad (si el teléfono está desconectado, no hay necesidad de intentarlo). Mi solución actual está funcionando, pero utiliza ScheduledExecutorService estándar de Java para realizar tareas periódicas, pero me gustaría permanecer en el paradigma de Rx.

Aquí está mi código actual, partes de las cuales se saltan por brevedad.

 userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() { @Override public void call(final Subscriber<? super UserProfile> subscriber) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); final Runnable runnable = new Runnable() { @Override public void run() { // making http request here } }; final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1); networkStatusObservable.subscribe(new Action1<Boolean>() { @Override public void call(Boolean networkAvailable) { if (!networkAvailable) { pause(); } else { pause(); futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS)); } } private void pause() { for (ScheduledFuture<?> future : futures) { future.cancel(true); } futures.clear(); } }); final Subscription subscription = new Subscription() { private boolean isUnsubscribed = false; @Override public void unsubscribe() { scheduledExecutorService.shutdownNow(); isUnsubscribed = true; } @Override public boolean isUnsubscribed() { return isUnsubscribed; } }; subscriber.add(subscription); } }).multicast(BehaviorSubject.create()).refCount(); 

networkStatusObservable es básicamente un receptor de difusión envuelto en Observable<Boolean> , lo que indica que el teléfono está conectado a la red.

Como he dicho, esta solución está funcionando, pero quiero usar Rx enfoque para la encuesta periódica y la emisión de nuevo UserProfile s, porque hay numerosos problemas con la programación de las cosas manualmente, que quiero evitar. Sé sobre Observable.timer y Observable.interval , pero no puedo averiguar cómo aplicarlos a esta tarea (y no estoy seguro si necesito usarlos en absoluto).

Hay algunos enfoques en este problema de GitHub que puede ser útil.

https://github.com/ReactiveX/RxJava/issues/448

Las tres implementaciones son:


Observable.interval

 Observable.interval(delay, TimeUnit.SECONDS).timeInterval() .flatMap(new Func1<Long, Observable<Notification<AppState>>>() { public Observable<Notification<AppState>> call(Long seconds) { return lyftApi.updateAppState(params).materialize(); } }); 

Scheduler.schedulePeriodically

 Observable.create({ observer -> Schedulers.newThread().schedulePeriodically({ observer.onNext("application-state-from-network"); }, 0, 1000, TimeUnit.MILLISECONDS); }).take(10).subscribe({ v -> println(v) }); 

Recursividad manual

 Observable.create(new OnSubscribeFunc<String>() { @Override public Subscription onSubscribe(final Observer<? super String> o) { return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() { @Override public Subscription call(Scheduler inner, Long t2) { o.onNext("data-from-polling"); return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS); } }); } }).toBlockingObservable().forEach(new Action1<String>() { @Override public void call(String v) { System.out.println("output: " + v); } }); 

Y la conclusión es que la recursión manual es el camino a seguir porque espera hasta que la operación se complete antes de programar la siguiente ejecución.

Una de las opciones es usar Observable.interval y comprobar el estado del usuario cuando se emiten los intervalos:

  Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS); //pulling the user data Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() { Random random = new Random(); @Override public Observable<String> call(Long tick) { //here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations switch(random.nextInt(10)){ case 0://suppose this is for cases when network in not available or exception happens return Observable.<String>just(null); case 1: case 2: return Observable.just("Alice"); default: return Observable.just("Bob"); } } }); Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() { @Override public Observable<? extends String> call(Observable<String> stringObservable) { return stringObservable; } }); //filter valid data Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s != null; } }); //publish only changes Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged(); 

Puede hacerlo aún más simple si su networkStatusObservable está emitiendo eventos al menos con la frecuencia que necesita para comprobar los datos de usuario

  networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/) 

Por último se puede crear observable que utiliza el planificador para emitir los estados de usuario periódicamente – consulte la documentación Schedulers para saber qué planificador se ajusta a que necesita lo mejor:

 public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{ private final Scheduler scheduler; private final long initialDelay; private final long period; private final TimeUnit unit; public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) { this.scheduler = scheduler; this.initialDelay = initialDelay; this.period = period; this.unit = unit; } abstract T next() throws Exception; @Override public void call(final Subscriber<? super T> subscriber) { final Scheduler.Worker worker = scheduler.createWorker(); subscriber.add(worker); worker.schedulePeriodically(new Action0() { @Override public void call() { try { subscriber.onNext(next()); } catch (Throwable e) { try { subscriber.onError(e); } finally { worker.unsubscribe(); } } } }, initialDelay, period, unit); } } //And here is the sample usage Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){ Random random = new Random(); @Override String next() throws Exception { //if you use Schedulers.io, you can call the remote service synchronously switch(random.nextInt(10)){ case 0: return null; case 1: case 2: return "Alice"; default: return "Bob"; } } }); 

Bueno, voy a publicar mi propia solución, tal vez alguien se beneficiará de ella. Sólo publicaré la parte relacionada con la pregunta, omitiendo el HTTP y las cosas en caché. Así es como lo hago:

 private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable, final Observable<Boolean> pauseResumeObservable) { final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable, new Func2<Boolean, Boolean, Boolean>() { @Override public Boolean call(Boolean networkAvailable, Boolean mustPause) { return mustPause && networkAvailable; } } ).distinctUntilChanged(); final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() { @Override public Boolean call(Boolean networkAvailable) { return networkAvailable; } }); final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() { @Override public Boolean call(Boolean networkAvailable) { return !networkAvailable; } }); return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() { @Override public Observable<Long> call(Boolean shouldResumeRequests) { if (shouldResumeRequests) { long timeToUpdate; final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt(); timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis()); Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS)); return Observable .timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS) .takeUntil(hasToStopObservable); } else { Log.d(TAG, "Have to pause updates"); return Observable.<Long>never().takeUntil(hasToResumeObservable); } } }).multicast(PublishSubject.<Long>create()); } 

Como puedes ver, las condiciones para pausar o reanudar las actualizaciones se vuelven un poco más complicadas, con un nuevo Observable añadido para apoyar la pausa cuando la aplicación va al fondo.

Entonces en el núcleo de la solución es la operación concatMap que emite los Observables secuencialmente (de ahí concatMap, no flatMap, vea esta pregunta: ¿Cuál es la diferencia entre concatMap y flatMap en RxJava ). Emite un interval o never Observables , dependiendo de si las actualizaciones deben continuar o pausarse. Entonces cada Observable emitido es takenUntil un Observable contrario 'emite nuevo valor.

ConnectableObservable se devuelve porque el Observable creado es caliente y todos los suscriptores deseados tienen que suscribirse a él antes de que comience a emitir algo, de lo contrario se podrían perder los eventos iniciales. Llamo a connect más tarde.

Acepto mi u otra respuesta basada en votos, en su caso.

Hay una manera más sencilla de hacerlo utilizando interval (). He probado este código y funciona. Pero primero, debe encapsular el trabajo que desea ejecutar periódicamente en una subclase de Action1.

 class Act<T> implements Action1<T> { public Service service; public String data; public void call(T t){ service.log(data); //the periodic job } } 

(He mantenido los campos públicos por brevedad, pero eso no es aconsejable). Ahora puede programarlo de la siguiente manera:

 Act<Long> act=new Act<>(); act.data="dummy data"; act.service=this; Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act); 

Esto no bloqueará los hilos en ningún lugar, a diferencia del enfoque dado en la otra respuesta. Este enfoque nos permite pasar una variable como un tipo de almacenamiento mutable dentro de la Acción que podría ser útil en las invocaciones posteriores. Además, de esta manera usted podría suscribir su llamada en su propio grupo de hilos.

  • RxJava Break Chain en Condicional
  • Android Pros y Contras: Event Bus y RxJava
  • Cómo anular los Observables compartidos, infinitos con un retraso después de que el último suscriptor no se haya suscrito
  • RxJava y Retrofit - Aumento de excepciones personalizadas en función de la respuesta del servidor
  • No vuelva a ejecutar la llamada de Retrofit si todavía está en curso con RxJava 2
  • ¿Cómo usar un rxjava observable después de que se complete?
  • ¿Cuándo se debe usar RxJava Observable y cuándo Callback simple en Android?
  • Retrofit 2.0 + RxJava + Error JSON cuerpo
  • Combinación de llamadas API con RX Java
  • No se puede verificar la llamada de método de simulación desde el suscriptor de RxJava
  • Android: Sondeo de un servidor con Retrofit
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.