0

I am using spark-sql-2.4.1v with Java 8. I have a scenario where I will be passed the columns names as list/Seq, for those columns only i need to do perform certain operations like sum, avg, percentages etc.

In my scenario, let's say I have column1, column2, column3 columns. First time I will pass column1 name.

Will pull/select "column1" data and perform some operation based on "column1". Second time I will pass column2 name, but earlier column1 not pulled this time so my dataset does not contain "column1" hence earlier conditions are breaking with error "AnalysisException: cannot resolve 'column1' given input columns".

Hence I need to check the columns, if some column exists then only perform that column related operations else ignore those operations.

How to do this in Spark?

Sample data which is in database.

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12 , 20),
  ("21", "score", "school", "2018-03-31", 13 , 13 , 21),
  ("22", "rate", "school", "2018-03-31", 11 , 14, 22),
  ("21", "rate", "school", "2018-03-31", 13 , 12, 23)
 )
    val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3")
.select("id", "code", "entity", "date", "column2") /// these are passed for each run....this set will keep changing.



  Dataset<Row> enrichedDs = df
             .withColumn("column1_org",col("column1"))
             .withColumn("column1",
                     when(col("column1").isNotNull() , functions.callUDF("lookUpData",col("column1").cast(DataTypes.StringType)))
                  );

The above logic is only applicable when in select columns "column1" is available. This is failing in the second set as "column1" is not select, so I need some understanding why this only applicable when selected columns as "column1" is available. I need some logic to achieve this.

halfer
  • 19,824
  • 17
  • 99
  • 186
BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 1
    Cannot understand the meaning of pull column data. What is your test case and what's wrong? – Lamanus Aug 13 '20 at 13:27
  • @Lamanus my data is in db table , pull mean here selecting those data...my select is not blind select .....each time select columns will be different ...but logic of non-select columns code logic will be these ....when select set one columns ...set two column logic failing as those columns are not selected...my concern before executing logic need to check if those columns there in dataframe else skip those logic based on columns...let me know if you need any more details – BdEngineer Aug 13 '20 at 13:42

2 Answers2

0

check if this is helpful-

you can filter out columns, and process only valid columns

df.show(false)
    /**
      * +---+-----+------+----------+-------+-------+-------+
      * |id |code |entity|date      |column1|column2|column3|
      * +---+-----+------+----------+-------+-------+-------+
      * |20 |score|school|2018-03-31|14     |12     |20     |
      * |21 |score|school|2018-03-31|13     |13     |21     |
      * |22 |rate |school|2018-03-31|11     |14     |22     |
      * |21 |rate |school|2018-03-31|13     |12     |23     |
      * +---+-----+------+----------+-------+-------+-------+
      */
    // list of columns
    val cols = Seq("column1", "column2" ,"column3", "column4")
    val processColumns = cols.filter(df.columns.contains).map(sqrt)
    df.select(processColumns: _*).show(false)

    /**
      * +------------------+------------------+-----------------+
      * |SQRT(column1)     |SQRT(column2)     |SQRT(column3)    |
      * +------------------+------------------+-----------------+
      * |3.7416573867739413|3.4641016151377544|4.47213595499958 |
      * |3.605551275463989 |3.605551275463989 |4.58257569495584 |
      * |3.3166247903554   |3.7416573867739413|4.69041575982343 |
      * |3.605551275463989 |3.4641016151377544|4.795831523312719|
      * +------------------+------------------+-----------------+
      */
Som
  • 6,193
  • 1
  • 11
  • 22
  • thank you for prompt answer I update the question again ...last paras ..can you have a look at it plz. – BdEngineer Aug 13 '20 at 16:07
  • it has some useful stuff https://www.thetopsites.net/article/58775028.shtml but its in Scala. – BdEngineer Aug 17 '20 at 07:49
  • When i do like below df.withColumn("score_org", when( df.schema().fieldNames().contains(col("score")) , col("score"))) ) Getting error " Cannot invoke contains(Column) on the array type String[] – BdEngineer Aug 17 '20 at 10:32
  • issue para-phrased here can you advice me on the same https://stackoverflow.com/questions/63450135/applying-when-condition-only-when-column-exists-in-the-dataframe – BdEngineer Aug 17 '20 at 11:49
0

Not sure if i fully understand your requirement, but are you simply trying to perform some conditional operation depending on what columns are available in your dataframe which is not know prior to execution?

if so, Dataframe.columns returns a list of columns which you can parse and select accordingly

i.e

df.columns.foreach { println }
Chris
  • 474
  • 3
  • 7
  • thanks Chris but this wont help me much. can you check "Sample data which is in database." section once. – BdEngineer Aug 17 '20 at 07:26
  • issue para-phrased here can you advice me on the same https://stackoverflow.com/questions/63450135/applying-when-condition-only-when-column-exists-in-the-dataframe – BdEngineer Aug 17 '20 at 11:49