月曜日, 1月 18, 2016

Spark - パーティション単位でのマップ

PairRDDのpartitionBy()を使ってパーティション毎の要素の合計を計算をする例.
入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.

lineA:73 44 58 62
lineAB:88 21 20
lineABC:50 21 99 82 25 62

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
  * Created by neko32 on 2016/01/17.
  */
object CalcPart {

  def main(args:Array[String]) = {
    val conf = new SparkConf().setMaster("local").setAppName("Takoran")
    val sc = new SparkContext(conf)
    val data =  sc.textFile("/user/neko32/spark/study7/nums.txt")
          .map(l => {
            val splitted = l.split(":")
            (splitted(0), splitted(1).split(" "))
          })
    data.partitionBy(new OddEvenPartitioner(2)).persist

    data.foreach(elem => {
      val buf = new StringBuilder
      buf ++= "key[" + elem._1 + "]:"
      elem._2.foreach(n => buf ++= (n + " "))
      println(buf.toString)
    })

    val x = data.mapPartitions(iter => {
        val nums = iter.map(x => (x._1, x._2.foldLeft(0)(_ + _.toInt)))
        nums
      }
    )

    x foreach println
    for(x <- x collect) {
      println(s"${x._1}:${x._2}")
    }
  }
}

class OddEvenPartitioner(nums:Int) extends Partitioner {
  override def numPartitions: Int = nums

  override def getPartition(key: Any): Int = {
    val n = key.asInstanceOf[String].length
    n % 2
  }

  override def equals(others: Any) = others match {
    case o:OddEvenPartitioner => o.numPartitions == this.numPartitions
    case _ => false
  }
}

0 件のコメント: