3

I wanted to ask whats the best way to achieve per key auto increment numerals after sorting, for eg. :

raw file:

 1,a,b,c,1,1
 1,a,b,d,0,0
 1,a,b,e,1,0
 2,a,e,c,0,0
 2,a,f,d,1,0

post-output (the last column is the position number after grouping on first three fields and reverse sorting on last two values)

 1,a,b,c,1,1,1
 1,a,b,d,0,0,3
 1,a,b,e,1,0,2
 2,a,e,c,0,0,2
 2,a,f,d,1,0,1

I am using solution that uses groupbykey but that is running into some issues (possibly bug with pyspark/spark?), wondering if there is a better way to achieve this.

My solution:

 A = sc.textFile("train.csv")
   .filter(lambda x:not isHeader(x))
   .map(split)
   .map(parse_train)
   .filter(lambda x: not x is None)

 B = A.map(lambda k:((k.first_field,k.second_field,k.first_field,k.third_field),(k[0:5])))
   .groupByKey()

 B.map(sort_n_set_position)
   .flatMap(lambda line: line)

where sort and set position iterates over the iterator and performs sorting and adding last column

zero323
  • 322,348
  • 103
  • 959
  • 935
fahadtiger
  • 33
  • 3

2 Answers2

1

Since you have big keys (all 3 first values), I'll assume you will not have a ton of rows per key. Given this, I would just use groupByKey([numTasks]) and then use normal code to sort and add your index to each row on the resulting iterables.

Daniel Langdon
  • 5,899
  • 4
  • 28
  • 48
  • Thanks Daniel, I am already trying to use the groupByKey in the code above and rely on the normal code inside sort_n_set_position that is called right after for the ordering/numeral generation. Please let me know if i am missing something - thanks again – fahadtiger Oct 19 '15 at 15:34
  • Sorry, I seem to have misread. Seems pretty much the sane way to do things to me. What are the "some issues (possibly bug" you are running into? – Daniel Langdon Oct 19 '15 at 17:26
  • Thanks, I get this message when doing groupbykey unpack requires a string argument of length 4 - interestingly I am not doing any conversions (reading all fields as strings) and making sure to toss out any lines where the key fields aren't there/line is empty. – fahadtiger Oct 19 '15 at 18:32
  • Mmm, seem to be specific to Python (I use Spark mostly with Scala). There are several questions for that error: http://stackoverflow.com/questions/2611858/struct-error-unpack-requires-a-string-argument-of-length-4 – Daniel Langdon Oct 19 '15 at 23:58
  • Thanks Daniel, the interesting thing is all else works if i don't use the groupbykey, further i am not using any custom reads, just relying on sc.textfile and stringio :) – fahadtiger Oct 21 '15 at 06:36
  • I would check this part: `lambda k:((k.first_field,k.second_field,k.first_field,k.third_field),(k[0:5])`. The error seems related to unpacking a set of fields (the wrong number of them), could be that the groupBy has nothing to do with it. Unless you can provide a better stack trace. – Daniel Langdon Oct 21 '15 at 12:34
1

A little bit different approach combining spark-csv, DataFrames and window functions. I assume that header line is x1,x2,x4,x4,x5,x6 for brevity:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber, col

df = (sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("train.csv"))

w = (Window()
    .partitionBy(col("x1"), col("x2"), col("x3"))
    .orderBy(col("x5").desc(), col("x6").desc()))

df_with_rn = df.select(col("*"), rowNumber().over(w).alias("x7"))
df_with_rn.show()

## +---+---+---+---+---+---+---+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+---+---+---+---+---+
## |  2|  a|  e|  c|  0|  0|  1|
## |  2|  a|  f|  d|  1|  0|  1|
## |  1|  a|  b|  c|  1|  1|  1|
## |  1|  a|  b|  e|  1|  0|  2|
## |  1|  a|  b|  d|  0|  0|  3|
## +---+---+---+---+---+---+---+

If you want a plain RDD as an output you can simply map as follows:

df_with_rn.map(lambda r: r.asDict())
zero323
  • 322,348
  • 103
  • 959
  • 935
  • thanks, this is cool, i like the partition by except i wasn't sure if all rows for same partition would be on same node though i believe the built-in select/rownumber would take care of it? Can you also comment on may be the efficiency of this vs mine ^_^, thanks – fahadtiger Oct 21 '15 at 06:42
  • This is covered by `partitionBy` clause. Regarding performance... Unless your data is already partitioned by required values any approach requires shuffling. There can be some gain from an optimizer but the biggest benefit of using DataFrames is execution natively on JVM. – zero323 Oct 21 '15 at 06:53
  • Thanks for answering the questions zero323! – fahadtiger Oct 22 '15 at 07:16