Spark Scala CSV Input to Nested Json -


this how input data looks like,

20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000 20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000 20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000 20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000 

and want convert same keyvalue rdd, key integer , value json object , purpose write same elasticsearch

(  2024270, { "metrics": {   "date" : 20170201,   "style_id" : 1234709,   "revenue" : 1000,   "list_count" : 1000,   "pdp_count" : 1000,   "add_to_cart_count" : 1000  } } ) 

in python, able same using below piece of code,

metrics_rdd = sc.textfile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)   def format_metrics(line):     tokens = line.split('^')     try:         return (tokens[1], {                     'metrics': {                         'date': tokens[0],                         'mrp': float(tokens[2]),                         'revenue': float(tokens[3]),                         'quantity': int(tokens[4]),                         'product_discount': float(tokens[5]),                         'coupon_discount': float(tokens[6]),                         'total_discount': float(tokens[7]),                         'list_count': int(tokens[8]),                         'add_to_cart_count': int(tokens[9]),                         'pdp_count': int(tokens[10])                     }                 }) if len(tokens) > 1 else ('', dict()) 

but not able figure out how achieve same in scala , newbie scala, managed below output, not able wrap json "metrics" block, pointers helpful ?

ordersdf.withcolumn("key", $"style_id")         .withcolumn("json", to_json(struct($"date", $"style_id", $"mrp")))         .select("key", "json")         .show(false)  // exiting paste mode, interpreting.  +-------+-------------------------------------------------+ |key    |json                                             | +-------+-------------------------------------------------+ |2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}| |2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}| |2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}| |1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}| +-------+-------------------------------------------------+ 

i tried @philantrovert has suggested , worked.

scala> val ordersdf = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallmetrics.csv") ordersdf: org.apache.spark.sql.dataframe = [date: int, style_id: int ... 9 more fields]  scala> :paste // entering paste mode (ctrl-d finish)  ordersdf.withcolumn("key", $"style_id")         .withcolumn("metrics", to_json(struct($"date", $"style_id", $"mrp")))         .select("key", "metrics")         .tojson         .show(false)  // exiting paste mode, interpreting.  +-----------------------------------------------------------------------------------+ |value                                                                              | +-----------------------------------------------------------------------------------+ |{"key":2024270,"metrics":"{\"date\":20170101,\"style_id\":2024270,\"mrp\":1000.0}"}| |{"key":2024333,"metrics":"{\"date\":20170101,\"style_id\":2024333,\"mrp\":1000.0}"}| |{"key":2023709,"metrics":"{\"date\":20170101,\"style_id\":2023709,\"mrp\":1000.0}"}| |{"key":1234709,"metrics":"{\"date\":20170201,\"style_id\":1234709,\"mrp\":1000.0}"}| +-----------------------------------------------------------------------------------+ 

i have tried other way using json4s library , worked,

def convertrowtojson(row: row) = {      val json =     ("metrics" ->       ("date" -> row(1).tostring) ~       ("style_id" -> row.getint(1)) ~       ("mrp" -> row.getfloat(2)) ~       ("revenue" -> row.getfloat(3)) ~       ("quantity" -> row.getint(1)) ~       ("product_discount" -> row.getfloat(3)) ~       ("coupon_discount" -> row.getfloat(3)) ~       ("total_discount" -> row.getfloat(3)) ~       ("list_count" -> row.getint(1)) ~       ("add_to_cart_count" -> row.getint(1)) ~       ("pdp_count" -> row.getint(1))       )     (row.getint(1),compact(render(json)).tostring) }  scala> val ordersdf = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallmetrics.csv").map(convertrowtojson) ordersdf: org.apache.spark.sql.dataset[(int, string)] = [_1: int, _2: string]  scala> ordersdf.show(false) +-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |_1     |_2                                                                                                                                                                                                                                                | +-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |2024270|{"metrics":{"date":"2024270","style_id":2024270,"mrp":1000.0,"revenue":1000.0,"quantity":2024270,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024270,"add_to_cart_count":2024270,"pdp_count":2024270}}| |2024333|{"metrics":{"date":"2024333","style_id":2024333,"mrp":1000.0,"revenue":1000.0,"quantity":2024333,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024333,"add_to_cart_count":2024333,"pdp_count":2024333}}| |2023709|{"metrics":{"date":"2023709","style_id":2023709,"mrp":1000.0,"revenue":1000.0,"quantity":2023709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2023709,"add_to_cart_count":2023709,"pdp_count":2023709}}| |1234709|{"metrics":{"date":"1234709","style_id":1234709,"mrp":1000.0,"revenue":1000.0,"quantity":1234709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":1234709,"add_to_cart_count":1234709,"pdp_count":1234709}}| +-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 

Comments

Popular posts from this blog

Sort a complex associative array in PHP -

vb.net - How to ignore if a cell is empty nothing -

recursion - Can every recursive algorithm be improved with dynamic programming? -