1

I am using spark-sql-2.4.1v with java8. I have a scenario where I need to perform certain operation if columns presents in the given dataframe column list

I have Sample data frame as below, the columns of dataframe would differ based on external query executed on the database table.

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12 , 20),
  ("21", "score", "school", "2018-03-31", 13 , 13 , 21),
  ("22", "rate", "school", "2018-03-31", 11 , 14, 22),
  ("21", "rate", "school", "2018-03-31", 13 , 12, 23)
 )

val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3"..."columnN")

as show above dataframe "data" columns are not fixed and would vary and would have "column1", "column2" ,"column3"..."columnN" ...

So depend on the column availability i need to perform some operations for the same i am trying to use "when-clause" , when a column present then i have to perform certain operation on the specified column else move on to the next operation..

I am trying below two ways using "when-cluase"

First-way :

 Dataset<Row> resultDs =  df.withColumn("column1_avg", 
                     when( df.schema().fieldNames().contains(col("column1")) , avg(col("column1"))))
                     )
 

Second-way :

  Dataset<Row> resultDs =  df.withColumn("column2_sum", 
                     when( df.columns().contains(col("column2")) , sum(col("column1"))))
                     )

Error:

Cannot invoke contains(Column) on the array type String[]

so how to handle this scenario using java8 code ?

BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • pls show expected output – thebluephantom Aug 17 '20 at 12:10
  • @thebluephantom expected output is dynamic depend on the column ... i.e. if "column1" exists then I will do avg on column1 , if column2 exists i will do sum on column etc.... tricky part here is if columns does not have "column1" operation should not fail hence i need to check if the column exists or not – BdEngineer Aug 17 '20 at 12:26
  • The functions `sum` and `avg` are *aggregate* functions. They would normally return a single row. So it's imperative that you **edit your question**, and show examples of what you expect as the output. You can do two examples - one when column1 exists and one when it doesn't. Show both input ds and output ds. – RealSkeptic Aug 19 '20 at 09:24

1 Answers1

2

You can create a column having all the column names. then you can check if the column is present or not and process if it is available-

 df.withColumn("columns_available", array(df.columns.map(lit): _*))
      .withColumn("column1_org",
      when( array_contains(col("columns_available"),"column1") , col("column1")))
      .withColumn("x",
        when( array_contains(col("columns_available"),"column4") , col("column1")))
      .withColumn("column2_new",
        when( array_contains(col("columns_available"),"column2") , sqrt("column2")))
      .show(false)
Som
  • 6,193
  • 1
  • 11
  • 22
  • thank you so much ... but one small doubt if there is no specified column exists like below df.withColumn("columns_available", array(df.columns.map(lit): _*)) .withColumn("column1_org", when( array_contains(col("columns_available"),"columnN") , col("column2"))) .show(false) , it should not show that column i.e. "column1_org" how to achieve it ? – BdEngineer Aug 17 '20 at 12:40
  • that you can't achieve with `withColumn`. why are you adding `withColumn` in the first place if the column is not available. Can you specify the motive of this really odd requirement? – Som Aug 17 '20 at 15:21
  • I have requirement based on certain column available i need to perform certain operation , if column is not available that operation not performed as operation is specific to certain column. For some column it could be null too if invalid data exists... but if I add null as here then it might be treated as invalid data but its not the case here... – BdEngineer Aug 18 '20 at 06:16
  • what does df.columns.map(lit): _* this do here ? When i do it in java array(Arrays.asList(df.columns()).stream().map(s -> new Column(s)).toArray(Column[]::new)) its actually fetching columns value instead of columns – BdEngineer Aug 18 '20 at 07:50
  • can you tell me what is wrong with this broadcast variable accessing ? https://stackoverflow.com/questions/64003697/spark-broadcast-variable-map-giving-null-value – BdEngineer Sep 22 '20 at 05:47