入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.
lineA:73 44 58 62 lineAB:88 21 20 lineABC:50 21 99 82 25 62
import org.apache.spark.{Partitioner, SparkConf, SparkContext} /** * Created by neko32 on 2016/01/17. */ object CalcPart { def main(args:Array[String]) = { val conf = new SparkConf().setMaster("local").setAppName("Takoran") val sc = new SparkContext(conf) val data = sc.textFile("/user/neko32/spark/study7/nums.txt") .map(l => { val splitted = l.split(":") (splitted(0), splitted(1).split(" ")) }) data.partitionBy(new OddEvenPartitioner(2)).persist data.foreach(elem => { val buf = new StringBuilder buf ++= "key[" + elem._1 + "]:" elem._2.foreach(n => buf ++= (n + " ")) println(buf.toString) }) val x = data.mapPartitions(iter => { val nums = iter.map(x => (x._1, x._2.foldLeft(0)(_ + _.toInt))) nums } ) x foreach println for(x <- x collect) { println(s"${x._1}:${x._2}") } } } class OddEvenPartitioner(nums:Int) extends Partitioner { override def numPartitions: Int = nums override def getPartition(key: Any): Int = { val n = key.asInstanceOf[String].length n % 2 } override def equals(others: Any) = others match { case o:OddEvenPartitioner => o.numPartitions == this.numPartitions case _ => false } }
0 件のコメント:
コメントを投稿