java - RxJava. Observable.delay work strange (lacks some items at the end) -


i'm trying understand rxjava. test code is:

import rx.observable; import rx.subscriber; import rx.functions.action1;  import java.util.concurrent.timeunit;  public class hello {     public static void main(string[] args) {              observable<string> observable = observable.create(new observable.onsubscribe<string>() {             @override             public void call(subscriber<? super string> subscriber) {                 try {                     thread.sleep(1000);                     subscriber.onnext("a");                     thread.sleep(1000);                     subscriber.onnext("b");                     thread.sleep(1000);                     subscriber.onnext("c");                     thread.sleep(1000);                     subscriber.onnext("d");                     thread.sleep(1000);                     subscriber.onnext("e");                     thread.sleep(1000);                     subscriber.onnext("f");                     thread.sleep(1000);                     subscriber.onnext("g");                     thread.sleep(1000);                     subscriber.onnext("h");                 } catch (interruptedexception e) {                     subscriber.onerror(e);                 }             }         });          observable                 .delay(2, timeunit.seconds)                 .subscribe(new action1<string>() {                     @override                     public void call(string string) {                         system.out.println(string);                     }                 });     } } 

without .delay(2, timeunit.seconds) have output: b c d e f g h .delay(2, timeunit.seconds) output lacks "g" , "h": b c d e f

how can be? documentation says delay emits items emitted source observable shifted forward in time specified delay

the delay overload using schedules work on different thread , results in implicit race condition.all temporal operators (such delay, buffer, , window) need use scheduler schedule effect later , can result in unexpected race conditions if aren't aware of , use them carefully. in case delay operator schedules work downstream on separate thread pool. here order of execution (on main thread) in test.

  1. your observable subscribed , waits 1000 millis before onnext("a")
  2. next it's received delay. schedules downstream onnext 2 seconds later.
  3. control flow returns observable waits 1000 millis.
  4. observable onnext("b") delay. delay schedules onnext of "b" 2 seconds later.
  5. .... (repeat)
  6. when observable calls onnext("h") schedules work returns subscribe , terminates test (causing scheduled work disappear).

in order execute asynchronously you can schedule delay on trampoline scheduler implementation.

.delay(2, timeunit.seconds, schedulers.trampoline()) 

Comments

Popular posts from this blog

resizing Telegram inline keyboard -

command line - How can a Python program background itself? -

php - "cURL error 28: Resolving timed out" on Wordpress on Azure App Service on Linux -