scala - Akka streams reduce to smaller stream -
i have ordered stream of data
a a b b c c c c ... (very long)
and want transform stream of aggregates in form (item, count):
(a, 3) (b, 2) (c, 4)
what operators use in akka streams this?
source.frompublisher(publisher) .aggregatesomehow() // ? .runwith(sink)
i've looked .groupby requires know number of categories in advance don't. believe keep groups in memory i'd avoid. should able discard (a, 3) after has been processed , free resources consumes.
edit: this question ask similar functionality using subflows. using subflows doesn't seem required because have solution using statefulmapconcat
combinator.
one option use statefulmapconcat combinator:
source(list("a", "a", "b", "b", "b", "c", "c", "")) .statefulmapconcat({ () => var lastchar = "" var count = 0 char => if(lastchar == char) { count += 1 list.empty } else { val charcount = (lastchar, count) lastchar = char count = 1 list(charcount) } }) .runforeach(println)
however required appending element input stream mark end.
output:
(,0) (a,2) (b,3) (c,2)
thanks @chunjef suggestion in comments
Comments
Post a Comment