mercredi 29 juin 2016

Save a spark RDD using mapPartition with iterator

I have some intermediate data that I need to be stored in HDFS and local as well. I'm using Spark 1.6. In HDFS as intermediate form I'm getting data in /output/testDummy/part-00000 and /output/testDummy/part-00001. I want to save these partitions in local using Java/Scala so that I could save them as /users/home/indexes/index.nt(by merging both in local) or /users/home/indexes/index-0000.nt and /home/indexes/index-0001.nt separately.

Here is my code: Note: testDummy is same as test, output is with two partitions. I want to store them separately or combined but local with index.nt file. I prefer to store separately in two data-nodes. I'm using cluster and submit spark job on YARN. I also added some comments, how many times and what data I'm getting. How could I do? Any help is appreciated.

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

PS: I followed, this and this but not exactly same what I'm looking for, I did somehow but not getting anything in index.nt

Aucun commentaire:

Enregistrer un commentaire