1

I am using spark-sql-2.4.1v with Java 8. I need to calculate percentiles such as 25,75,90 for some given data.

I tried using percentile_approx() from Spark-sql to do this. But the results of percentile_approx() are not matching the fractional percentiles of excel sheet which uses PERCENTILE.INC().

Hence, I'm wondering how to fix or adjust the percentile_approx() function. Is there anyway to overwrite or write a custom function modifying percentile_approx() which calculates fractional percentiles correctly? How to write/modify percentile_approx()?

Given dataset:

val df = Seq(
    (10, "1/15/2018", 0.010680705, 10,0.619875458, "east"),
    (10, "1/15/2018", 0.006628853,  4,0.16039063, "west"),
    (10, "1/15/2018", 0.01378215,  20,0.082049528, "east"),
    (10, "1/15/2018", 0.810680705,  6,0.819875458, "west"),
    (10, "1/15/2018", 0.702228853, 30,0.916039063, "east"))     
  .toDF("id", "date", "revenue", "con_dist_1", "con_dist_2", "zone")


val percentiles = Seq(0.25, 0.75,0.90)  // Which percentiles to calculate
val cols = Seq("con_dist_1", "con_dist_2")  // The columns to use

I need to calculate the given percentiles for each zone for the given columns. How can this be achieved?

Expected results:

+---+---------+-----------+----+------------+--------------+--------------+-------------+
| id|     date|    revenue|zone|perctile_col|qunantile_0.25|qunantile_0.75|qunantile_0.9|
+---+---------+-----------+----+------------+--------------+--------------+-------------+
| 10|1/15/2018|0.006628853|west|  con_dist_1|           4.5|           5.5|          5.8|
| 10|1/15/2018|0.010680705|west|  con_dist_1|           4.5|           5.5|          5.8|
| 10|1/15/2018|0.010680705|east|  con_dist_1|            15|            25|         28.0|
| 10|1/15/2018| 0.01378215|east|  con_dist_1|            15|            25|         28.0|
| 10|1/15/2018|0.006628853|east|  con_dist_1|            15|            25|         28.0|
| 10|1/15/2018|0.006628853|west|  con_dist_2|   0.325261837|   0.655004251| 0.7539269752|
| 10|1/15/2018|0.010680705|west|  con_dist_2|   0.325261837|   0.655004251| 0.7539269752|
| 10|1/15/2018|0.010680705|east|  con_dist_2|   0.350962493|  0.4990442955|  0.749241156|
| 10|1/15/2018| 0.01378215|east|  con_dist_2|   0.350962493|  0.4990442955|  0.749241156|
| 10|1/15/2018|0.006628853|east|  con_dist_2|   0.350962493|  0.4990442955|  0.749241156|
+---+---------+-----------+----+------------+--------------+--------------+-------------+

You can verify the results with "definition 2" of this url https://www.translatorscafe.com/unit-converter/en-US/calculator/percentile/

halfer
  • 19,824
  • 17
  • 99
  • 186
BdEngineer
  • 2,929
  • 4
  • 49
  • 85

1 Answers1

1

A naive way of solving this using Spark would be to manually find the two closest values to the specified percentile value. Then the fractional part can easily be calculated.

In Scala this can be done as follows:

First we get the ranking of each row grouped by zone and divide by the maximum rank of each group.

val w = Window.partitionBy($"zone").orderBy($"date")
val df_zone = df.withColumn("zone_rn", row_number().over(w) - 1)
  .withColumn("zone_rn", $"zone_rn" / max($"zone_rn").over(w))

This gives:

+---+---------+-----------+----------+-----------+----+-------+
|id |date     |revenue    |con_dist_1|con_dist_2 |zone|zone_rn|
+---+---------+-----------+----------+-----------+----+-------+
|10 |1/15/2018|0.006628853|4         |0.16039063 |west|0.0    |
|10 |1/15/2018|0.810680705|6         |0.819875458|west|1.0    |
|10 |1/15/2018|0.010680705|10        |0.619875458|east|0.0    |
|10 |1/15/2018|0.01378215 |20        |0.082049528|east|0.5    |
|10 |1/15/2018|0.702228853|30        |0.916039063|east|1.0    |
+---+---------+-----------+----------+-----------+----+-------+

We loop over all the columns to consider and do a foldLeft over the percentiles to add the lower and upper bounds for each (lower_val and upper_val). We compute the fraction at the same time and then the quantile value by adding the fraction to the lower bound.

Finally, since we looped over the columns, we use reduce(_.union(_)) to bring everything back to a single dataframe.

val percentiles = Seq(0.25, 0.75, 0.90)     // Which percentiles to calculate
val cols = Seq("con_dist_1", "con_dist_2")  // The columns to use

val df_percentiles = cols.map{ c => 
    percentiles.foldLeft(df_zone){ case(df, p) =>  
      df.withColumn("perctile_col", lit(c))
        .withColumn("zone_lower", max(when($"zone_rn" <= p, $"zone_rn")).over(w))
        .withColumn("zone_upper", min(when($"zone_rn" >= p, $"zone_rn")).over(w))
        .withColumn("lower_val", max(when($"zone_lower" === $"zone_rn", col(c))).over(w))
        .withColumn("upper_val", min(when($"zone_upper" === $"zone_rn", col(c))).over(w))
        .withColumn("fraction", (lit(p) - $"zone_lower") / ($"zone_upper" - $"zone_lower"))
        .withColumn(s"quantile_$p", $"lower_val" + $"fraction" * ($"upper_val" - $"lower_val"))
  }
  .drop((cols ++ Seq("zone_rn", "zone_lower", "zone_upper", "lower_val", "upper_val", "fraction")): _*)
}.reduce(_.union(_))

Result:

+---+---------+-----------+----+------------+-------------+------------------+------------------+
| id|     date|    revenue|zone|perctile_col|quantile_0.25|     quantile_0.75|      quantile_0.9|
+---+---------+-----------+----+------------+-------------+------------------+------------------+
| 10|1/15/2018|0.006628853|west|  con_dist_1|          4.5|               5.5|               5.8|
| 10|1/15/2018|0.810680705|west|  con_dist_1|          4.5|               5.5|               5.8|
| 10|1/15/2018|0.010680705|east|  con_dist_1|         15.0|              25.0|              28.0|
| 10|1/15/2018| 0.01378215|east|  con_dist_1|         15.0|              25.0|              28.0|
| 10|1/15/2018|0.702228853|east|  con_dist_1|         15.0|              25.0|              28.0|
| 10|1/15/2018|0.006628853|west|  con_dist_2|  0.325261837|0.6550042509999999|      0.7539269752|
| 10|1/15/2018|0.810680705|west|  con_dist_2|  0.325261837|0.6550042509999999|      0.7539269752|
| 10|1/15/2018|0.010680705|east|  con_dist_2|  0.350962493|      0.4990442955|0.7492411560000001|
| 10|1/15/2018| 0.01378215|east|  con_dist_2|  0.350962493|      0.4990442955|0.7492411560000001|
| 10|1/15/2018|0.702228853|east|  con_dist_2|  0.350962493|      0.4990442955|0.7492411560000001|
+---+---------+-----------+----+------------+-------------+------------------+------------------+
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • 1
    @BdEngineer: That's great. I thought it wouldn't work since your comment said it does not work with floating types. Maybe you can add an answer here since it should be simpler than my approach here. – Shaido Apr 15 '20 at 09:46
  • 1
    @BdEngineer: `.reduce(_.union(_))` will union multiple dataframes together into a single one. What I did above was to handle each column (con_dist_1 and con_dist_2) separately. This results in two dataframes which then are merged together into a single one with `.reduce(_.union(_))`. – Shaido Apr 15 '20 at 09:48
  • 1
    @BdEngineer: `df_zone` is created in the first code block. The fold is done on `percentiles` to add all the `quantile_$p` columns (p is one of the percentiles e.g. 0.25). For the `case (df,p)` this is how `foldLeft` works. The start df is `df_zone` and in each iteration the dataframe from the previous iteration is used as `df` (p will change to the next one). All in all, the `foldLeft` will add all a "quantile_" column for each p in percentiles. Inside the block, you can make database calls if you want, there is no need to pass sparkSession, you can simple use it. – Shaido Apr 22 '20 at 01:13
  • @BdEngineer: There should be no need for you to pass sparkSession. You can just use it inside the foldLeft (it's not a dataframe operation so it's allowed). If you want more variables for each specific percentile then you can zip those to the percentile list to get a list of tuples. For example, `percentiles.zip(Seq(1,2,3)).foldLeft(df_zone){case (df, (p,value)) => ...}`. But if I understand your question correctly then there is no need to do this. Simply use any variable that you want inside the `foldLeft`. – Shaido Apr 22 '20 at 09:25
  • 1
    @BdEngineer: 1. The `df_zone` dataframe is shown in the second code box. This is the input dataframe since we want to construct the new quantile columns based on this dataframe, additionally, we want to keep the values for id, date and revenue intact. The `foldLeft` will just add new columns to this dataframe. 2. This is due to the list being a list of tuples, the first tuple in the list would be (p, value). We write it like this as the syntax for `foldLeft` which is `foldLeft[B](z: B)(op: (B, A) ⇒ B): B`. As you can see, `op` is defined as two values. (you can see it as a syntax restriction.) – Shaido Apr 23 '20 at 01:23
  • can you tell me what is wrong with this broadcast variable accessing ? https://stackoverflow.com/questions/64003697/spark-broadcast-variable-map-giving-null-value – BdEngineer Sep 22 '20 at 05:49