Note: My grouping can contain up to 5-10K rows per group for the aggregation. So an efficient code is highly desirable.
My Data
val df1 = sc.parallelize(Seq(
("user2", "iphone", "2017-12-23 16:58:08", "Success"),
("user2", "iphone", "2017-12-23 16:58:12", "Success"),
("user2", "iphone", "2017-12-23 16:58:20", "Success"),
("user2", "iphone", "2017-12-23 16:58:25", "Success"),
("user2", "iphone", "2017-12-23 16:58:35", "Success"),
("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-23 16:58:08|Success|
| user2|iphone|2017-12-23 16:58:12|Success|
| user2|iphone|2017-12-23 16:58:20|Success|
| user2|iphone|2017-12-23 16:58:25|Success|
| user2|iphone|2017-12-23 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
What I want
A grouping by (username,device) for the latest time an event occurred.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
Exceptions in desired output:
Now since I mentioned it has to be in a specific time window for example in the input dataset below where the last row has
the latest date timestamp of 23rd December. Now If I want a specific time window of going back 1 day and give me the last attempt, the 'previous_attempt_at' column will be null, since there are no events the previous day which should be in 22nd January. It all depends on the input timestamp range.
//Initial Data
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-20 16:58:08|Success|
| user2|iphone|2017-12-20 16:58:12|Success|
| user2|iphone|2017-12-20 16:58:20|Success|
| user2|iphone|2017-12-20 16:58:25|Success|
| user2|iphone|2017-12-20 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
// Desired Output
A grouping by (username,device) for the latest time an event occurred.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success| null|
+--------+------+-------------------+-------+-------------------+
What I Have.
val w = (Window.partitionBy("username", "device")
.orderBy(col("attempt_at").cast("timestamp").cast("long"))
.rangeBetween(-3600, -1)
)
val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:08|Success| null|
| user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
| user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
| user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
| user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
Notes. The code I have does windowing for every row in the specific user grouping. Which is highly inefficient while working with large scale of data also doesn't give the latest attempt. I don't need all the rows except for the last one.