apache spark - add sequence number column in dataframe usnig scala -


below logic add sequence number column in dataframe. working expected when reading data delimited files. today have new task read data oracle table , add sequence number , process further. facing issue below logic add sequence number in data frame when read oracle table.

oracletabledf dataframe

   //creating sequence no. logic seqnum    val rowrdd = oracletabledf.rdd.zipwithindex().map(indexedrow => row.fromseq((((indexedrow._2.tolong+1)).tolong) +: indexedrow._1.toseq))     //creating structtype add seqnum in schema         val newstructure = structtype(array(structfield("seqnum",longtype)).++(oracletabledf.schema.fields))    //creating new data frame seqnum   oracletabledf = spark.createdataframe(rowrdd, newstructure) 

i not able locate actual issue. because logic working expected in cluster when read files. facing issue when read oracle table. working expected in local mode also.

below error :

"error scheduler.tasksetmanager: task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 1.0 failed 4 times, recent failure: lost task 0.3 in stage 1.0 (tid 4, xxxx, executor 1): java.lang.noclassdeffounderror: not initialize class oracledataprocess$"

one option can use monotonically_increasing_id() create new column incremental id

val dataframe = oracletabledf.withcolumn("incremental_id", monotonically_increasing_id()) 

Comments

Popular posts from this blog

Sort a complex associative array in PHP -

vb.net - How to ignore if a cell is empty nothing -

recursion - Can every recursive algorithm be improved with dynamic programming? -