scala - Akka streams flattening Flows via -
flows can connected via
like:
def atob: flow[a, b, notused] = { ??? } def btoc: flow[b, c, notused] = { ??? } def atoc: flow[a, c, notused] = { atob.via(btoc) }
i equivalent of flatmap
:
def atosomeb: flow[a, some[b], notused] = { ??? } def atosomec: flow[a, some[c], notused] = { atosomeb.flatvia(btoc) }
is there built-in way flatvia
? seems common need things option
unwrapping , error flattening.
it depends if interested in keeping none
s around, or if want throw them away.
as typed flow flow[a, some[c], notused]
seems not interested in none
s @ all. means can filter them out collect
, e.g.
def atosomec: flow[a, c, notused] = { atosomeb.collect{case some(x) ⇒ x}.via(btoc) }
if, otherwise, need track none
s (or left
s if you're dealing either
s), you'll need write "lifting" stage yourself. can written generically. example, can written function takes flow flow[i, o, m]
, returns flow flow[either[e, i], either[e, o], m
]. because requires fan-out , fan-in stages, usage of graphdsl required.
def lifteither[i, o, e, m](f: flow[i, o, m]): graph[flowshape[either[e, i], either[e, o]], m] = flow.fromgraph(graphdsl.create(f) { implicit builder: graphdsl.builder[m] => f => val fin = builder.add(flow[either[e, i]]) val p = builder.add(partition[either[e, i]](2, _.fold(_ ⇒ 0, _ ⇒ 1))) val merge = builder.add(merge[either[e, o]](2)) val toright = builder.add(flow[o].map(right(_))) p.out(0).collect{case left(x) ⇒ left(x)} ~> merge fin.out ~> p.in p.out(1).collect{case(right(x)) ⇒ x} ~> f ~> toright ~> merge new flowshape(fin.in, merge.out) })
this can used per below
def atosomeb: flow[a, either[throwable, b], notused] = ??? def atosomec: flow[a, either[throwable, c], notused] = atosomeb.via(lifteither(btoc))
note option
s can converted either
s leverage same helper function.
Comments
Post a Comment