1

Note: My grouping can contain up to 5-10K rows per group for the aggregation. So an efficient code is highly desirable.

My Data

val df1 = sc.parallelize(Seq(
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:12", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:20", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:25", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:35", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-23 16:58:08|Success|
|   user2|iphone|2017-12-23 16:58:12|Success|
|   user2|iphone|2017-12-23 16:58:20|Success|
|   user2|iphone|2017-12-23 16:58:25|Success|
|   user2|iphone|2017-12-23 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

What I want
A grouping by (username,device) for the latest time an event occurred.

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

Exceptions in desired output:
Now since I mentioned it has to be in a specific time window for example in the input dataset below where the last row has the latest date timestamp of 23rd December. Now If I want a specific time window of going back 1 day and give me the last attempt, the 'previous_attempt_at' column will be null, since there are no events the previous day which should be in 22nd January. It all depends on the input timestamp range.

//Initial Data
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-20 16:58:08|Success|
|   user2|iphone|2017-12-20 16:58:12|Success|
|   user2|iphone|2017-12-20 16:58:20|Success|
|   user2|iphone|2017-12-20 16:58:25|Success|
|   user2|iphone|2017-12-20 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

// Desired Output
A grouping by (username,device) for the latest time an event occurred.

    +--------+------+-------------------+-------+-------------------+
    |username|device|         attempt_at|   stat|previous_attempt_at|
    +--------+------+-------------------+-------+-------------------+
    |   user2|iphone|2017-12-23 16:58:45|Success|               null|
    +--------+------+-------------------+-------+-------------------+

What I Have.

val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, -1)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:08|Success|               null|
|   user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
|   user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
|   user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
|   user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

Notes. The code I have does windowing for every row in the specific user grouping. Which is highly inefficient while working with large scale of data also doesn't give the latest attempt. I don't need all the rows except for the last one.

zero323
  • 322,348
  • 103
  • 959
  • 935

1 Answers1

3

All you need is an additional groupBy and aggregation but before that you would need collect_list function for cumulative collection of previous dates and udf function to check for the previous attempt_at is within the time limit and to convert the three columns ("attempt_at", "stat", "previous_attempt_at") as struct for selecting the last one as

import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
  val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
  val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
  if(diffDates.size > 0) diffDates.last else null
})

import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))

val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
  .withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
  .groupBy("username", "device").agg(max("struct").as("struct"))
  .select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))

This should give you for the later example

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|null               |
+--------+------+---------------------+-------+-------------------+

and the following for the previous input data

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+

and you can change the logic for hours by changing the ChronoUnit.DAYS in udf function to ChronoUnit.HOURS and so on

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • Thanks but what about the time window, lets say I want to look back by 1 hour, 1 week etc. For example for the code above if all the rows from 1-5 happened 2-3 hours ago, the result for row 6 would come as null. If the window to look back is like a week, we would get the result as given. – annonymous_guy Apr 28 '18 at 12:46
  • Thanks @Ramesh Maharjan. Please look at the new edit, its under "exceptions under desired output" – annonymous_guy Apr 28 '18 at 23:48
  • Thanks, would you please explain how the "max(struct)" selects the latest timestamp row from the Array. It works but makes no sense to me. Also if could explain how "reverse fat arrow" in the UDF works. – annonymous_guy Apr 29 '18 at 13:59
  • struct has three elements and max function selects maximum value from the struct. it first checks the first element, in case of tie selects max from second element and so on. And that is not fat reverse arrow, its less than or equals to sign, (< and = combined) ;) thanks for the upvote – Ramesh Maharjan Apr 29 '18 at 14:06
  • Thanks to you! . There is another issue though, What if I want hourly data of per (user,device). So it is the last attempt of the. (user,device) at every hour/every day/week etc. Is this another question or can you add something in addition to what you have. – annonymous_guy Apr 29 '18 at 14:18
  • In the last line of the answer i mentioned `you can change the logic for hours by changing the ChronoUnit.DAYS in udf function to ChronoUnit.HOURS and so on` didn't that answers your query? – Ramesh Maharjan Apr 29 '18 at 14:23
  • Yes thanks, I will check that in a bit. For this one, I added some more text. Look under 'Additional Input Data For the solution below' – annonymous_guy Apr 29 '18 at 14:32
  • I can post it as another question, if its too complicated. – annonymous_guy Apr 29 '18 at 14:54
  • your other question https://stackoverflow.com/questions/50082260/select-latest-timestamp-record-after-a-window-operation-for-every-group-in-the-d covers that I guess please look at that – Ramesh Maharjan Apr 29 '18 at 14:58
  • and remove that additional part in this question please – Ramesh Maharjan Apr 29 '18 at 14:59
  • I did thanks! Still having a hard time understanding struct. How does it choose a maxiumum value within two timestamp and a string. Can you share an elaborate example? just trying to learn. Thanks – annonymous_guy Apr 29 '18 at 18:14
  • Well what I know is that it does give you the maximum timestamp. You can test it on small datasets yourself. That would be the best way to learn – Ramesh Maharjan Apr 30 '18 at 05:25