入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.
lineA:73 44 58 62 lineAB:88 21 20 lineABC:50 21 99 82 25 62
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
/**
* Created by neko32 on 2016/01/17.
*/
object CalcPart {
def main(args:Array[String]) = {
val conf = new SparkConf().setMaster("local").setAppName("Takoran")
val sc = new SparkContext(conf)
val data = sc.textFile("/user/neko32/spark/study7/nums.txt")
.map(l => {
val splitted = l.split(":")
(splitted(0), splitted(1).split(" "))
})
data.partitionBy(new OddEvenPartitioner(2)).persist
data.foreach(elem => {
val buf = new StringBuilder
buf ++= "key[" + elem._1 + "]:"
elem._2.foreach(n => buf ++= (n + " "))
println(buf.toString)
})
val x = data.mapPartitions(iter => {
val nums = iter.map(x => (x._1, x._2.foldLeft(0)(_ + _.toInt)))
nums
}
)
x foreach println
for(x <- x collect) {
println(s"${x._1}:${x._2}")
}
}
}
class OddEvenPartitioner(nums:Int) extends Partitioner {
override def numPartitions: Int = nums
override def getPartition(key: Any): Int = {
val n = key.asInstanceOf[String].length
n % 2
}
override def equals(others: Any) = others match {
case o:OddEvenPartitioner => o.numPartitions == this.numPartitions
case _ => false
}
}
0 件のコメント:
コメントを投稿