scala - Akka streams: dealing with futures within graph stage -


within akka stream stage flowshape[a, b] , part of processing need on a's save/query datastore query built data. datastore driver query gives me future, , not sure how best deal (my main question here).

case class obj(a: string, b: int, c: string) case class foo(myobject: obj, name: string) case class bar(st: string) // class saveandgetid extends graphstage[flowshape[foo, bar]] {  val dao = new dbdao // dao async driver    override def createlogic(inheritedattributes: attributes) = new graphstagelogic(shape) {   sethandlers(in, out, new inhandler outhandler {    override def onpush() = {     val foo = grab(in)     val add = foo.record.value()     val result: future[string] = dao.saveandgetrecord(add.myobject)//saves , returns id string     //the naive approach     val record = await(result, duration.inf)     push(out, bar(record))// ***tests pass every time    //mapping future approach     result.map { x=>      push(out, bar(x))     } //***tests fail every time 

the next stage depends on id of db record returned query, want avoid await. not sure why mapping approach fails:

"it should work" in {   val source = source.single(foo(obj("hello", 1, "world")))   val probe = source     .via(new saveandgetid))     .runwith(testsink.probe)   probe    .request(1)    .expectbarwithid("one")//say know    .expectcomplete()  }  private implicit class richtestprobe(probe: probe[bar]) {   def expectbarwithid(expected: string): probe[bar] =     probe.expectnextchainingpf{     case r @ bar(str) if str == expected => r   }  } 

when run mapping future, failure:

should work ***failed*** java.lang.assertionerror: assertion failed: expected: message matching partial function got unexpected message oncomplete @ scala.predef$.assert(predef.scala:170) @ akka.testkit.testkitbase$class.expectmsgpf(testkit.scala:406) @ akka.testkit.testkit.expectmsgpf(testkit.scala:814) @ akka.stream.testkit.testsubscriber$manualprobe.expecteventpf(streamtestkit.scala:570) 

the async side channels example in docs has future in constructor of stage, opposed building future within stage, doesn't seem apply case.

i think graphstage unnecessarily overcomplicated. below flow performs same actions without need write custom stage:

val dao = new dbdao  val parallelism = 10 //number of parallel db queries  val saveandgetid : flow[foo, bar, _] =    flow[foo]     .map(foo => foo.record.value().myobject)     .mapasync(parallelism)(rec => dao.saveandgetrecord(rec))     .map(bar.apply) 

i try treat graphstage last resort, there idiomatic way of getting same flow using methods provided akka-stream library.


Comments

Popular posts from this blog

resizing Telegram inline keyboard -

command line - How can a Python program background itself? -

php - "cURL error 28: Resolving timed out" on Wordpress on Azure App Service on Linux -