Mapping an input file to different RDDs

I have a text file consisting of columns of integers. Assuming that I have N number of columns, I need to have N-1 number of PairRDDs. Each PairRDD has one of the 0 to N-2 columns of my file as Key and the last column as Value. Number of columns in my file varies each time I run the program so I don't know the number of RDDs before the run.

Code below gives task not serializable error.

val inputFile = sc.textFile(path).persist();
for (dim <- 0 to (numberOfColumns - 2)){            
  val temp = inputFile.map(line => {
    val lines = line.split(',')
    (lines(dim), lines(numberOfColumns - 1))
  })
}

I appreciate any help for solving this issue.

2 answers

  • answered 2018-01-14 11:10 anuj saxena

    Move the operation you are performing outside the calling class to a serialized class or to an object:

    class RDDOperation extends Serializable {
    
      def perform(inputFile: RDD[String], numberOfColumns: Int) = {
        for (dim <- 0 to (numberOfColumns - 2)) yield {
          inputFile.map(line => {
            val lines = line.split(',')
            (lines(dim), lines(numberOfColumns - 1))
          })
        }
      }
    }
    

    The reason for the exception is that RDD's elements are partitioned across the nodes of the cluster. So when we use map/flatMap on an RDD all the operation happen on multiple nodes so the operation being performed inside map must be serialized. Hence moving it to a serialized class or an object will make it serialized.

    And prefer returning values in scala functions hence I have used yield here which will return a collection of pairRDDs.

    Also, you can refactor the perform method which doesn't depend on numberOfColumns like this:

    def perform(inputFile: RDD[String]): RDD[(String, String)] = {
        inputFile.flatMap{ line =>
          val lines = line.split(',').toList
    
          lines.reverse match {
            case lastColumn :: _ => lines.flatMap{
              case column if column != lastColumn => Some((column, lastColumn))
              case _ => None
            }
            case _ => List.empty[(String, String)]
          }
        }
      }
    

  • answered 2018-01-14 11:10 Hoori M.

    In my code, I had references to the global fields of the class. So Spark had to send the whole class instance to the executors to access those fields and my class was not serializable.

    I copied all the global fields as local variables in my method so just the local variables were sent to the executors and the problem got solved.