python - Window timeseries with step in Spark/Scala -
i have input :
timestamp,user 1,a 2,b 5,c 9,e 12,f the result wanted :
timestamprange,userlist 1 2,[a,b] 3 4,[] or null 5 6,[c] 7 8,[] or null 9 10,[e] 11 12,[f] i tried using window, problem, doesn't include empty timestamp range.
any hints helpful.
don't know if widowing function cover gaps between ranges, can take following approach :
define dataframe, df_ranges:
val ranges = list((1,2), (3,4), (5,6), (7,8), (9,10)) val df_ranges = sc.parallelize(ranges).todf("start", "end") +-----+---+ |start|end| +-----+---+ | 1| 2| | 3| 4| | 5| 6| | 7| 8| | 9| 10| +-----+---+ data timestamp column, df_data :
val data = list((1,"a"), (2,"b"), (5,"c"), (9,"e")) val df_data = sc.parallelize(data).todf("timestamp", "user") +---------+----+ |timestamp|user| +---------+----+ | 1| a| | 2| b| | 5| c| | 9| e| +---------+----+ join 2 dataframe on start, end, timestamp columns:
df_ranges.join(df_data, df_ranges.col("start").equalto(df_data.col("timestamp")).or(df_ranges.col("end").equalto(df_data.col("timestamp"))), "left") +-----+---+---------+----+ |start|end|timestamp|user| +-----+---+---------+----+ | 1| 2| 1| a| | 1| 2| 2| b| | 5| 6| 5| c| | 9| 10| 9| e| | 3| 4| null|null| | 7| 8| null|null| +-----+---+---------+----+ now simple aggregation collect_list function :
res4.groupby("start", "end").agg(collect_list("user")).orderby("start") +-----+---+------------------+ |start|end|collect_list(user)| +-----+---+------------------+ | 1| 2| [a, b]| | 3| 4| []| | 5| 6| [c]| | 7| 8| []| | 9| 10| [e]| +-----+---+------------------+
Comments
Post a Comment