java - RxJava. Observable.delay work strange (lacks some items at the end) -
i'm trying understand rxjava. test code is:
import rx.observable; import rx.subscriber; import rx.functions.action1; import java.util.concurrent.timeunit; public class hello { public static void main(string[] args) { observable<string> observable = observable.create(new observable.onsubscribe<string>() { @override public void call(subscriber<? super string> subscriber) { try { thread.sleep(1000); subscriber.onnext("a"); thread.sleep(1000); subscriber.onnext("b"); thread.sleep(1000); subscriber.onnext("c"); thread.sleep(1000); subscriber.onnext("d"); thread.sleep(1000); subscriber.onnext("e"); thread.sleep(1000); subscriber.onnext("f"); thread.sleep(1000); subscriber.onnext("g"); thread.sleep(1000); subscriber.onnext("h"); } catch (interruptedexception e) { subscriber.onerror(e); } } }); observable .delay(2, timeunit.seconds) .subscribe(new action1<string>() { @override public void call(string string) { system.out.println(string); } }); } }
without .delay(2, timeunit.seconds)
have output: b c d e f g h .delay(2, timeunit.seconds)
output lacks "g" , "h": b c d e f
how can be? documentation says delay emits items emitted source observable shifted forward in time specified delay
the delay
overload using schedules work on different thread , results in implicit race condition.all temporal operators (such delay
, buffer
, , window
) need use scheduler schedule effect later , can result in unexpected race conditions if aren't aware of , use them carefully. in case delay operator schedules work downstream on separate thread pool. here order of execution (on main thread) in test.
- your observable subscribed , waits 1000 millis before
onnext("a")
- next it's received delay. schedules downstream onnext 2 seconds later.
- control flow returns observable waits 1000 millis.
- observable
onnext("b")
delay. delay schedules onnext of "b" 2 seconds later. - .... (repeat)
- when observable calls
onnext("h")
schedules work returns subscribe , terminates test (causing scheduled work disappear).
in order execute asynchronously you can schedule delay on trampoline scheduler implementation.
.delay(2, timeunit.seconds, schedulers.trampoline())
Comments
Post a Comment