0

I ran a count of attempts by (user,app) over a time window of day(86400). I want to extract the rows with latest timestamp with the count and remove unnecessary previous counts. Make sure your answer considers the time window. One user with 1 device can do make multiple attempts a day or a week, I wanna be able to retrieve those particular moments with the final count in every specific window.

My intial dataset is like this:

val df = sc.parallelize(Seq(
  ("user1", "iphone", "2017-12-22 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-22 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 09:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 09:15:12",  "failed"),
  ("user1", "android", "2017-12-20 09:25:20", "Success"),
  ("user1", "android", "2017-12-20 09:44:22", "Success"),
  ("user1", "android", "2017-12-20 09:58:22", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:20", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:25", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")

The code I ran and what I got.

// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
               .orderBy(col("date_time").cast("date_time").cast("long").desc)
               .rangeBetween(-86400, 0) 


val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))

Now I have

// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|   user1|       android|  2017-12-20 09:58:22|Success|       1|
|   user1|       android|  2017-12-20 09:44:22|Success|       2|
|   user1|       android|  2017-12-20 09:25:20|Success|       3|
|   user1|        iphone|  2017-12-22 12:06:18|Success|       1|
|   user1|        iphone|  2017-12-22 11:15:12| failed|       2|
|   user1|        iphone|  2017-12-22 10:06:18|Success|       3|
|   user1|        iphone|  2017-12-22 09:15:12| failed|       4|
|   user1|        iphone|  2017-12-20 16:44:35|Success|       1|
|   user1|        iphone|  2017-12-20 16:44:25|Success|       2|
|   user1|        iphone|  2017-12-20 16:44:20|Success|       3|
|   user1|        iphone|  2017-12-20 12:06:18|Success|       4|
|   user1|        iphone|  2017-12-20 11:15:12| failed|       5|
|   user1|        iphone|  2017-12-20 10:06:18|Success|       6|
|   user1|        iphone|  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+

What I want. So I want the latest timestamp along with its count by making sure I'm in the same time window.

+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|  user1     |       android    |  2017-12-20 09:25:20|Success|       3|
|  user1     |        iphone    |  2017-12-22 09:15:12| failed|       4|
|  user1     |        iphone    |  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+**
  • if you partitonBy only "username", "application_id" you won't get the output you are getting. You should be using date value of transaction_date_time too to get the output you are getting . isn't that so? – Ramesh Maharjan Apr 29 '18 at 05:08
  • Possible duplicate of [How to select the first row of each group?](https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group) – Alper t. Turker Apr 29 '18 at 10:59

1 Answers1

1

You are almost there. You have figured out the counts by looking at one day range. Now all you have to do is figure out the latest record in that one day range which can be done by using last on the same window function but with the range reversed.

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

def day(x: Int) = x * 86400

val w1 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(0, day(1))

val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
                            .withColumn("att", last("attempts").over(w2))
                            .filter(col("attempts") === col("att"))
                            .drop("att")

which should give you

+--------+--------------+---------------------+-------+--------+
|username|        device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1   |android       |2017-12-20 09:25:20  |Success|3       |
|user1   |iphone        |2017-12-22 09:15:12  | Failed|4       |
|user1   |iphone        |2017-12-20 09:15:12  | Failed|7       |
+--------+--------------+---------------------+-------+--------+

similarly as stated in the comments below

There are 86400 seconds in 1 day. I wanted to look back 1 day. Similarly 3600 seconds is 1 hour. And 604,800 seconds in 1 week

you can change the day function to hours and weeks as below and use them in window rangeBetween

def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800

I hope the answer is helpful

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • wait, how do u define the time window of 1 hour/1 week?. Its need to be modular. Can you explain what Long.MinValue and 0 mean? – annonymous_guy Apr 29 '18 at 18:01
  • There are 86400 seconds in 1 day. I wanted to look back 1 day. Similarly 3600 seconds is 1 hour. And 604,800 seconds in 1 week. I wanna be able to look back up in time like that with the data. if your code does that lmk – annonymous_guy Apr 30 '18 at 14:54
  • @data-maniac, thanks for the clarification. :) I have updated my answer and I guess I have answered exactly as you've intended – Ramesh Maharjan Apr 30 '18 at 16:52