-1

I am using spark-sql-2.4.1version. I have a code something like below. I have scenario like below.

val superDataset = // load the whole data set of student marks records ... assume have 10 years data
val selectedYrsDataset  = superDataset.repartition("--GivenYears--") //i.e. GivenYears are 2010,2011

One the selectedYrsDataset   I need to calculate year wise toppers  on over all country-wise, state-wise, colleage-wise.

How to do this kind of use-case ? Is there any possibility of doing it dynamic parition i.e. in each-new-logic-step accordingly we add another partition ( column) in order to do repartition on already partitioned dataset , in such way to avoid major shuffling.

BdEngineer
  • 2,929
  • 4
  • 49
  • 85

1 Answers1

1

Sample Dataframe :

+----+-------+-----+-------+-----+
|year|country|state|college|marks|
+----+-------+-----+-------+-----+
|2019|  India|    A|     AC|   15|
|2019|  India|    A|     AC|   25|
|2019|  India|    A|     AC|   35|
|2019|  India|    A|     AD|   40|
|2019|  India|    B|     AC|   15|
|2019|  India|    B|     AC|   50|
|2019|  India|    B|     BC|   65|
|2019|    USA|    A|     UC|   15|
|2019|    USA|    A|     UC|   65|
|2019|    USA|    A|     UD|   45|
|2019|    USA|    B|     UC|   44|
|2019|    USA|    B|     MC|   88|
|2019|    USA|    B|     MC|   90|
|2020|  India|    A|     AC|   65|
|2020|  India|    A|     AC|   33|
|2020|  India|    A|     AC|   55|
|2020|  India|    A|     AD|   70|
|2020|  India|    B|     AC|   88|
|2020|  India|    B|     AC|   60|
|2020|  India|    B|     BC|   45|
|2020|    USA|    A|     UC|   85|
|2020|    USA|    A|     UC|   55|
|2020|    USA|    A|     UD|   32|
|2020|    USA|    B|     UC|   64|
|2020|    USA|    B|     MC|   78|
|2020|    USA|    B|     MC|   80|
+----+-------+-----+-------+-----+

In order to do multi dimensional aggregation you can do it in two ways i.e by using grouping sets or by using rollup in Spark. To read more about these multidimensional aggregation follow this link Multi-Dimensional Aggregation

The solution using rollup is provided as follows:

val ans_df = df.rollup("year","country","state","college").agg(max("marks").as("Marks"))

The result :

+----+-------+-----+-------+-----+
|year|country|state|college|Marks|
+----+-------+-----+-------+-----+
|2020|  India|    A|     AC|   65|
|2019|  India|    B|     BC|   65|
|2020|  India|    B|   null|   88|
|2019|    USA|    B|     UC|   44|
|2020|  India|    B|     AC|   88|
|2020|    USA| null|   null|   85|
|2019|  India|    A|     AC|   35|
|2019|    USA|    B|     MC|   90|
|2019|  India|    A|     AD|   40|
|2019|    USA|    A|     UD|   45|
|2019|    USA| null|   null|   90|
|2020|    USA|    A|     UD|   32|
|null|   null| null|   null|   90|
|2019|    USA|    B|   null|   90|
|2020|  India| null|   null|   88|
|2019|    USA|    A|   null|   65|
|2019|  India|    B|   null|   65|
|2019|    USA|    A|     UC|   65|
|2020|  India|    B|     BC|   45|
|2020|    USA|    B|     UC|   64|
+----+-------+-----+-------+-----+

Moreover, as asked spark makes sure of doing this operation in an optimal manner and makes use of the already partitioned data on doing a groupBy on an additional column.Example - On doing a groupBy on key (year,country,state,college) the data already grouped on key (year,country,state) will be used, thereby reducing significant computation.

  • Regarding the first question -> while calculating secondPartDs it will shuffle on already partitioned data , as spark which executes in a lazy manner first create a optimized plan and then executes the plan. So based upon the comment firstly firstPartDs then secondPartDs. – Rohan Gupta Mar 05 '20 at 06:36
  • 1
    For learning about DAG many online tutorials are available , and for tuning you can go through this link (https://spark.apache.org/docs/latest/tuning.html) .Moreover ,as I am a beginner myself and tuning a spark application takes into considerations a lot of factors , it's like the more you do the better you will understand. – Rohan Gupta Mar 05 '20 at 06:45
  • 1
    Rollup is not like window function , it is an extension of Grouping Sets available in SQL. Do mark the answer as solved if it helped thanks. – Rohan Gupta Mar 05 '20 at 06:50
  • can you tell me what is wrong with this broadcast variable accessing ? https://stackoverflow.com/questions/64003697/spark-broadcast-variable-map-giving-null-value – BdEngineer Sep 22 '20 at 05:49