apache spark - add sequence number column in dataframe usnig scala -
below logic add sequence number column in dataframe. working expected when reading data delimited files. today have new task read data oracle table , add sequence number , process further. facing issue below logic add sequence number in data frame when read oracle table.
oracletabledf dataframe
//creating sequence no. logic seqnum val rowrdd = oracletabledf.rdd.zipwithindex().map(indexedrow => row.fromseq((((indexedrow._2.tolong+1)).tolong) +: indexedrow._1.toseq)) //creating structtype add seqnum in schema val newstructure = structtype(array(structfield("seqnum",longtype)).++(oracletabledf.schema.fields)) //creating new data frame seqnum oracletabledf = spark.createdataframe(rowrdd, newstructure) i not able locate actual issue. because logic working expected in cluster when read files. facing issue when read oracle table. working expected in local mode also.
below error :
"error scheduler.tasksetmanager: task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 1.0 failed 4 times, recent failure: lost task 0.3 in stage 1.0 (tid 4, xxxx, executor 1): java.lang.noclassdeffounderror: not initialize class oracledataprocess$"
one option can use monotonically_increasing_id() create new column incremental id
val dataframe = oracletabledf.withcolumn("incremental_id", monotonically_increasing_id())
Comments
Post a Comment