Spark RDD.forEach re-initializing external object -
i missing out on basic spark concept. trying convert rdd of integers comma separated string. doing collecting rdd list , using iterator. however, on profiling jvm, seems brings work in single thread not efficient. hence, trying call foreach method on rdd itself, behaving strangely. below unit test
javardd<integer> rdd = jsc.parallelize(arrays.aslist(1,2,3)); stringbuilder sb = new stringbuilder(""); rdd.foreach(t->{ system.out.println(string.valueof(t)); if(sb.length() > 0) sb.append(","); sb.append(string.valueof(t)); system.out.println(sb); }); system.out.println(sb.length()); the output:
1 3 2 2 3 1 0 apparently stringbuilder gets re-instantiated @ each call. there way this?
you can using mappartitions also. in way each partition work in parallel , collect them @ end.
val rdd1 = sc.parallelize(seq(1, 2, 3, 4, 5, 6, 7),5) // have 6 number of partitions val rdd3 = rdd1.mappartitions(x => { val str = x.mkstring(",") list(str).iterator }) // here creating comma separated string each partitions if has elements in val test1 = rdd3.collect.filternot(x => { x.equals("") }) // filternot required number of partitions can more number of elements in sequence( based on spark.default.parallelism property). partitions no elements generate "" strings. for java can try below code -
javardd<integer> rdd1 = jsc.parallelize(list); javardd<string> collection = rdd1.mappartitions((iterator<integer> iter) -> { arraylist<string> out = new arraylist<string>(); stringbuffer strbf = new stringbuffer(""); while(iter.hasnext()) { integer current = iter.next(); strbf.append(current); } out.add(strbf.tostring()); return out.iterator(); }); stringbuffer strbffinal = new stringbuffer(""); collection.collect().foreach(item -> { if(!"".equals(item)){ strbffinal.append(item); } }); the stringbuffer has appened list of numbers.
Comments
Post a Comment