0

I know you can forward/backward fill in missing values with next non-missing values with last function combined with a window function.

But I have a data looks like:

Area,Date,Population
A, 1/1/2000, 10000
A, 2/1/2000, 
A, 3/1/2000, 
A, 4/1/2000, 10030
A, 5/1/2000, 

In this example, for May population, I like to fill in 10030 which is easy. But for Feb and Mar, I would like to fill in value is mean of 10000 and 10030, not 10000 or 10030.

Do you know how to implement this?

Thanks,

Yi Du
  • 455
  • 2
  • 8
  • 17
  • 1
    Does this answer your question? [Groupby fill missing values in dataframe based on average of previous values available and next value available](https://stackoverflow.com/questions/63095958/groupby-fill-missing-values-in-dataframe-based-on-average-of-previous-values-ava) – Som Aug 20 '20 at 13:58

2 Answers2

1

Get the next and previous value and compute the mean as below-

df2.show(false)
    df2.printSchema()
    /**
      * +----+--------+----------+
      * |Area|Date    |Population|
      * +----+--------+----------+
      * |A   |1/1/2000|10000     |
      * |A   |2/1/2000|null      |
      * |A   |3/1/2000|null      |
      * |A   |4/1/2000|10030     |
      * |A   |5/1/2000|null      |
      * +----+--------+----------+
      *
      * root
      * |-- Area: string (nullable = true)
      * |-- Date: string (nullable = true)
      * |-- Population: integer (nullable = true)
      */

    val w1 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    val w2 = Window.partitionBy("Area").orderBy("Date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
    df2.withColumn("previous", last("Population", ignoreNulls = true).over(w1))
      .withColumn("next", first("Population", ignoreNulls = true).over(w2))
      .withColumn("new_Population", (coalesce($"previous", $"next") + coalesce($"next", $"previous")) / 2)
      .drop("next", "previous")
      .show(false)

    /**
      * +----+--------+----------+--------------+
      * |Area|Date    |Population|new_Population|
      * +----+--------+----------+--------------+
      * |A   |1/1/2000|10000     |10000.0       |
      * |A   |2/1/2000|null      |10015.0       |
      * |A   |3/1/2000|null      |10015.0       |
      * |A   |4/1/2000|10030     |10030.0       |
      * |A   |5/1/2000|null      |10030.0       |
      * +----+--------+----------+--------------+
      */
Som
  • 6,193
  • 1
  • 11
  • 22
0

Here is my try.

w1 and w2 are used to partition the window and w3 and w4 are used to fill the preceding and following values. After that, you can give the condition to calculate how fill the Population.

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.partitionBy('Area').orderBy('Date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('Area', 'partition1').orderBy('Date')
w4 = Window.partitionBy('Area', 'partition2').orderBy(f.desc('Date'))

df.withColumn('check', f.col('Population').isNotNull().cast('int')) \
  .withColumn('partition1', f.sum('check').over(w1)) \
  .withColumn('partition2', f.sum('check').over(w2)) \
  .withColumn('first', f.first('Population').over(w3)) \
  .withColumn('last',  f.first('Population').over(w4)) \
  .withColumn('fill', f.when(f.col('first').isNotNull() & f.col('last').isNotNull(), (f.col('first') + f.col('last')) / 2).otherwise(f.coalesce('first', 'last'))) \
  .withColumn('Population', f.coalesce('Population', 'fill')) \
  .orderBy('Date') \
  .select(*df.columns).show(10, False)

+----+--------+----------+
|Area|Date    |Population|
+----+--------+----------+
|A   |1/1/2000|10000.0   |
|A   |2/1/2000|10015.0   |
|A   |3/1/2000|10015.0   |
|A   |4/1/2000|10030.0   |
|A   |5/1/2000|10030.0   |
+----+--------+----------+
Lamanus
  • 12,898
  • 4
  • 21
  • 47