Spark Scala CSV Input to Nested Json -
this how input data looks like,
20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000 20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000 20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000 20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000 and want convert same keyvalue rdd, key integer , value json object , purpose write same elasticsearch
( 2024270, { "metrics": { "date" : 20170201, "style_id" : 1234709, "revenue" : 1000, "list_count" : 1000, "pdp_count" : 1000, "add_to_cart_count" : 1000 } } ) in python, able same using below piece of code,
metrics_rdd = sc.textfile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics) def format_metrics(line): tokens = line.split('^') try: return (tokens[1], { 'metrics': { 'date': tokens[0], 'mrp': float(tokens[2]), 'revenue': float(tokens[3]), 'quantity': int(tokens[4]), 'product_discount': float(tokens[5]), 'coupon_discount': float(tokens[6]), 'total_discount': float(tokens[7]), 'list_count': int(tokens[8]), 'add_to_cart_count': int(tokens[9]), 'pdp_count': int(tokens[10]) } }) if len(tokens) > 1 else ('', dict()) but not able figure out how achieve same in scala , newbie scala, managed below output, not able wrap json "metrics" block, pointers helpful ?
ordersdf.withcolumn("key", $"style_id") .withcolumn("json", to_json(struct($"date", $"style_id", $"mrp"))) .select("key", "json") .show(false) // exiting paste mode, interpreting. +-------+-------------------------------------------------+ |key |json | +-------+-------------------------------------------------+ |2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}| |2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}| |2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}| |1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}| +-------+-------------------------------------------------+
i tried @philantrovert has suggested , worked.
scala> val ordersdf = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallmetrics.csv") ordersdf: org.apache.spark.sql.dataframe = [date: int, style_id: int ... 9 more fields] scala> :paste // entering paste mode (ctrl-d finish) ordersdf.withcolumn("key", $"style_id") .withcolumn("metrics", to_json(struct($"date", $"style_id", $"mrp"))) .select("key", "metrics") .tojson .show(false) // exiting paste mode, interpreting. +-----------------------------------------------------------------------------------+ |value | +-----------------------------------------------------------------------------------+ |{"key":2024270,"metrics":"{\"date\":20170101,\"style_id\":2024270,\"mrp\":1000.0}"}| |{"key":2024333,"metrics":"{\"date\":20170101,\"style_id\":2024333,\"mrp\":1000.0}"}| |{"key":2023709,"metrics":"{\"date\":20170101,\"style_id\":2023709,\"mrp\":1000.0}"}| |{"key":1234709,"metrics":"{\"date\":20170201,\"style_id\":1234709,\"mrp\":1000.0}"}| +-----------------------------------------------------------------------------------+ i have tried other way using json4s library , worked,
def convertrowtojson(row: row) = { val json = ("metrics" -> ("date" -> row(1).tostring) ~ ("style_id" -> row.getint(1)) ~ ("mrp" -> row.getfloat(2)) ~ ("revenue" -> row.getfloat(3)) ~ ("quantity" -> row.getint(1)) ~ ("product_discount" -> row.getfloat(3)) ~ ("coupon_discount" -> row.getfloat(3)) ~ ("total_discount" -> row.getfloat(3)) ~ ("list_count" -> row.getint(1)) ~ ("add_to_cart_count" -> row.getint(1)) ~ ("pdp_count" -> row.getint(1)) ) (row.getint(1),compact(render(json)).tostring) } scala> val ordersdf = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallmetrics.csv").map(convertrowtojson) ordersdf: org.apache.spark.sql.dataset[(int, string)] = [_1: int, _2: string] scala> ordersdf.show(false) +-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |_1 |_2 | +-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |2024270|{"metrics":{"date":"2024270","style_id":2024270,"mrp":1000.0,"revenue":1000.0,"quantity":2024270,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024270,"add_to_cart_count":2024270,"pdp_count":2024270}}| |2024333|{"metrics":{"date":"2024333","style_id":2024333,"mrp":1000.0,"revenue":1000.0,"quantity":2024333,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024333,"add_to_cart_count":2024333,"pdp_count":2024333}}| |2023709|{"metrics":{"date":"2023709","style_id":2023709,"mrp":1000.0,"revenue":1000.0,"quantity":2023709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2023709,"add_to_cart_count":2023709,"pdp_count":2023709}}| |1234709|{"metrics":{"date":"1234709","style_id":1234709,"mrp":1000.0,"revenue":1000.0,"quantity":1234709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":1234709,"add_to_cart_count":1234709,"pdp_count":1234709}}| +-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Comments
Post a Comment