scala - Akka streams: dealing with futures within graph stage -
within akka stream stage flowshape[a, b]
, part of processing need on a's save/query datastore query built data. datastore driver query gives me future, , not sure how best deal (my main question here).
case class obj(a: string, b: int, c: string) case class foo(myobject: obj, name: string) case class bar(st: string) // class saveandgetid extends graphstage[flowshape[foo, bar]] { val dao = new dbdao // dao async driver override def createlogic(inheritedattributes: attributes) = new graphstagelogic(shape) { sethandlers(in, out, new inhandler outhandler { override def onpush() = { val foo = grab(in) val add = foo.record.value() val result: future[string] = dao.saveandgetrecord(add.myobject)//saves , returns id string //the naive approach val record = await(result, duration.inf) push(out, bar(record))// ***tests pass every time //mapping future approach result.map { x=> push(out, bar(x)) } //***tests fail every time
the next stage depends on id of db record returned query, want avoid await
. not sure why mapping approach fails:
"it should work" in { val source = source.single(foo(obj("hello", 1, "world"))) val probe = source .via(new saveandgetid)) .runwith(testsink.probe) probe .request(1) .expectbarwithid("one")//say know .expectcomplete() } private implicit class richtestprobe(probe: probe[bar]) { def expectbarwithid(expected: string): probe[bar] = probe.expectnextchainingpf{ case r @ bar(str) if str == expected => r } }
when run mapping future, failure:
should work ***failed*** java.lang.assertionerror: assertion failed: expected: message matching partial function got unexpected message oncomplete @ scala.predef$.assert(predef.scala:170) @ akka.testkit.testkitbase$class.expectmsgpf(testkit.scala:406) @ akka.testkit.testkit.expectmsgpf(testkit.scala:814) @ akka.stream.testkit.testsubscriber$manualprobe.expecteventpf(streamtestkit.scala:570)
the async side channels example in docs has future in constructor of stage, opposed building future within stage, doesn't seem apply case.
i think graphstage
unnecessarily overcomplicated. below flow
performs same actions without need write custom stage:
val dao = new dbdao val parallelism = 10 //number of parallel db queries val saveandgetid : flow[foo, bar, _] = flow[foo] .map(foo => foo.record.value().myobject) .mapasync(parallelism)(rec => dao.saveandgetrecord(rec)) .map(bar.apply)
i try treat graphstage
last resort, there idiomatic way of getting same flow using methods provided akka-stream library.
Comments
Post a Comment