Operate within a group by and populate additional columns

I have a dataframes as below:

+------+------+---+------+ |field1|field2|id |Amount| +------+------+---+------+ |A |B |002|10.0 | |A |B |003|12.0 | |A |B |005|15.0 | |C |B |002|20.0 | |C |B |003|22.0 | |C |B |005|25.0 | +------+------+---+------+

I need to convert it to :

+------+------+---+-------+---+-------+---+-------+ |field1|field2|002|002_Amt|003|003_Amt|005|005_Amt| +------+------+---+-------+---+-------+---+-------+ |A |B |002|10.0 |003|12.0 |005|15.0 | |C |B |002|20.0 |003|22.0 |005|25.0 | +------+------+---+-------+---+-------+---+-------+

Please advise!

1 answer

  • answered 2017-12-10 20:29 Ramesh Maharjan

    Your final dataframe column depends on id column so you need to store the distinct ids in a separate array.

    import org.apache.spark.sql.functions._
    val distinctIds = df.select(collect_list("id")).rdd.first().get(0).asInstanceOf[mutable.WrappedArray[String]].distinct
    

    Next step is to filter each of the distinctIds and join them

    val first = distinctIds.head
    var finalDF = df.filter($"id" === first).withColumnRenamed("id", first).withColumnRenamed("Amount", first+"_Amt")
    for(str <- distinctIds.tail){
      var tempDF = df.filter($"id" === str).withColumnRenamed("id", str).withColumnRenamed("Amount", str+"_Amt")
      finalDF = finalDF.join(tempDF, Seq("field1", "field2"), "left")
    }
    finalDF.show(false)
    

    You should have your desired output as

    +------+------+---+-------+---+-------+---+-------+
    |field1|field2|002|002_Amt|003|003_Amt|005|005_Amt|
    +------+------+---+-------+---+-------+---+-------+
    |A     |B     |002|10.0   |003|12.0   |005|15.0   |
    |C     |B     |002|20.0   |003|22.0   |005|25.0   |
    +------+------+---+-------+---+-------+---+-------+
    

    Var is never recommended for scala. So you can create a recursive function to do the above logic as below

    def getFinalDF(first: Boolean, array: List[String], df: DataFrame, tdf: DataFrame) : DataFrame = array match {
      case head :: tail => {
        if(first) {
          getFinalDF(false, tail, df, df.filter($"id" === head).withColumnRenamed("id", head).withColumnRenamed("Amount", head + "_Amt"))
        }
        else{
          val tempDF = df.filter($"id" === head).withColumnRenamed("id", head).withColumnRenamed("Amount", head+"_Amt")
          getFinalDF(false, tail, df, tdf.join(tempDF, Seq("field1", "field2"), "left"))
        }
      }
      case Nil => tdf
    }
    

    and call the recursive function as

    getFinalDF(true, distinctIds.toList, df, df).show(false)
    

    You should have the same output.