java - RxJava. Observable.delay work strange (lacks some items at the end) -


i'm trying understand rxjava. test code is:

import rx.observable; import rx.subscriber; import rx.functions.action1;  import java.util.concurrent.timeunit;  public class hello {     public static void main(string[] args) {              observable<string> observable = observable.create(new observable.onsubscribe<string>() {             @override             public void call(subscriber<? super string> subscriber) {                 try {                     thread.sleep(1000);                     subscriber.onnext("a");                     thread.sleep(1000);                     subscriber.onnext("b");                     thread.sleep(1000);                     subscriber.onnext("c");                     thread.sleep(1000);                     subscriber.onnext("d");                     thread.sleep(1000);                     subscriber.onnext("e");                     thread.sleep(1000);                     subscriber.onnext("f");                     thread.sleep(1000);                     subscriber.onnext("g");                     thread.sleep(1000);                     subscriber.onnext("h");                 } catch (interruptedexception e) {                     subscriber.onerror(e);                 }             }         });          observable                 .delay(2, timeunit.seconds)                 .subscribe(new action1<string>() {                     @override                     public void call(string string) {                         system.out.println(string);                     }                 });     } } 

without .delay(2, timeunit.seconds) have output: b c d e f g h .delay(2, timeunit.seconds) output lacks "g" , "h": b c d e f

how can be? documentation says delay emits items emitted source observable shifted forward in time specified delay

the delay overload using schedules work on different thread , results in implicit race condition.all temporal operators (such delay, buffer, , window) need use scheduler schedule effect later , can result in unexpected race conditions if aren't aware of , use them carefully. in case delay operator schedules work downstream on separate thread pool. here order of execution (on main thread) in test.

  1. your observable subscribed , waits 1000 millis before onnext("a")
  2. next it's received delay. schedules downstream onnext 2 seconds later.
  3. control flow returns observable waits 1000 millis.
  4. observable onnext("b") delay. delay schedules onnext of "b" 2 seconds later.
  5. .... (repeat)
  6. when observable calls onnext("h") schedules work returns subscribe , terminates test (causing scheduled work disappear).

in order execute asynchronously you can schedule delay on trampoline scheduler implementation.

.delay(2, timeunit.seconds, schedulers.trampoline()) 

Comments

Popular posts from this blog

javascript - How to bind ViewModel Store to View? -

recursion - Can every recursive algorithm be improved with dynamic programming? -

c - Why does alarm() cause fgets() to stop waiting? -