火曜日, 1月 12, 2016

Spark - CSVファイルの書き込み

以下の例はRDDの内容(Neko case class)をCSVファイルとしてHDFSに書き込む例.

package tanuneko

import java.io.StringWriter

import com.opencsv.CSVWriter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConverters._

/**
  * Created by neko32 on 2016/01/11.
  */
object CSVWrite {

  case class Neko(name:String, age:Int)

  def main(args:Array[String]):Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WriteJson")
    val sc = new SparkContext(conf)
    val hadoopConf = new Configuration
    val fs = FileSystem.get(hadoopConf)
    val outFile = "/user/neko32/spark/study4/csvwrite"

    val nekoz = sc.parallelize(List(Neko("Tora", 8), Neko("Mikeyo", 4)))
    val stred = nekoz.map(n => List(n.age.toString, n.name).toArray)
    val nekoz2 = stred.mapPartitions { n =>
      val writer = new StringWriter
      val csvWriter = new CSVWriter(writer)
      csvWriter.writeAll(n.toList.asJava)
      Iterator(writer.toString)
    }.cache
    nekoz2 foreach println
    if(fs.exists(new Path(outFile))) {
      fs.delete(new Path(outFile), true)
    }
    nekoz2 saveAsTextFile(outFile)
  }

}

0 件のコメント: