scala - Akka streams flattening Flows via -
flows can connected via like:
def atob: flow[a, b, notused] = { ??? } def btoc: flow[b, c, notused] = { ??? } def atoc: flow[a, c, notused] = { atob.via(btoc) } i equivalent of flatmap:
def atosomeb: flow[a, some[b], notused] = { ??? } def atosomec: flow[a, some[c], notused] = { atosomeb.flatvia(btoc) } is there built-in way flatvia? seems common need things option unwrapping , error flattening.
it depends if interested in keeping nones around, or if want throw them away.
as typed flow flow[a, some[c], notused] seems not interested in nones @ all. means can filter them out collect, e.g.
def atosomec: flow[a, c, notused] = { atosomeb.collect{case some(x) ⇒ x}.via(btoc) } if, otherwise, need track nones (or lefts if you're dealing eithers), you'll need write "lifting" stage yourself. can written generically. example, can written function takes flow flow[i, o, m] , returns flow flow[either[e, i], either[e, o], m]. because requires fan-out , fan-in stages, usage of graphdsl required.
def lifteither[i, o, e, m](f: flow[i, o, m]): graph[flowshape[either[e, i], either[e, o]], m] = flow.fromgraph(graphdsl.create(f) { implicit builder: graphdsl.builder[m] => f => val fin = builder.add(flow[either[e, i]]) val p = builder.add(partition[either[e, i]](2, _.fold(_ ⇒ 0, _ ⇒ 1))) val merge = builder.add(merge[either[e, o]](2)) val toright = builder.add(flow[o].map(right(_))) p.out(0).collect{case left(x) ⇒ left(x)} ~> merge fin.out ~> p.in p.out(1).collect{case(right(x)) ⇒ x} ~> f ~> toright ~> merge new flowshape(fin.in, merge.out) }) this can used per below
def atosomeb: flow[a, either[throwable, b], notused] = ??? def atosomec: flow[a, either[throwable, c], notused] = atosomeb.via(lifteither(btoc)) note options can converted eithers leverage same helper function.
Comments
Post a Comment