scala - Akka streams reduce to smaller stream -


i have ordered stream of data

a a b b c c c c ... (very long) 

and want transform stream of aggregates in form (item, count):

(a, 3) (b, 2) (c, 4) 

what operators use in akka streams this?

source.frompublisher(publisher)     .aggregatesomehow()  // ?     .runwith(sink) 

i've looked .groupby requires know number of categories in advance don't. believe keep groups in memory i'd avoid. should able discard (a, 3) after has been processed , free resources consumes.

edit: this question ask similar functionality using subflows. using subflows doesn't seem required because have solution using statefulmapconcat combinator.

one option use statefulmapconcat combinator:

source(list("a", "a", "b", "b", "b", "c", "c", ""))       .statefulmapconcat({ () =>         var lastchar = ""         var count = 0          char => if(lastchar == char) {             count += 1             list.empty           } else {             val charcount = (lastchar, count)             lastchar = char             count = 1             list(charcount)           }       })     .runforeach(println) 

however required appending element input stream mark end.

output:

(,0) (a,2) (b,3) (c,2) 

thanks @chunjef suggestion in comments


Comments

Popular posts from this blog

resizing Telegram inline keyboard -

command line - How can a Python program background itself? -

php - "cURL error 28: Resolving timed out" on Wordpress on Azure App Service on Linux -