import org.apache.spark.{SparkContext, SparkConf} /** * Created by neko32 on 2016/01/18. */ object Stats { def main(args:Array[String]) = { val conf = new SparkConf().setMaster("local").setAppName("stats") val sc = new SparkContext(conf) val rdd = sc.textFile("/user/neko32/spark/study7/nums.txt") val stats = rdd.flatMap(l => l.split(":")(1).split(" ").map(_.toInt)).stats println(s"mean:${stats.mean}, max:${stats.max}, min:${stats.min}, var:${stats.variance}") } }
月曜日, 1月 18, 2016
Spark - StatsCounter
StatsCounterを使えば簡単に統計用の基本的な値を計算できる.
Spark - パーティション単位でのマップ
PairRDDのpartitionBy()を使ってパーティション毎の要素の合計を計算をする例.
入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.
入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.
lineA:73 44 58 62 lineAB:88 21 20 lineABC:50 21 99 82 25 62
import org.apache.spark.{Partitioner, SparkConf, SparkContext} /** * Created by neko32 on 2016/01/17. */ object CalcPart { def main(args:Array[String]) = { val conf = new SparkConf().setMaster("local").setAppName("Takoran") val sc = new SparkContext(conf) val data = sc.textFile("/user/neko32/spark/study7/nums.txt") .map(l => { val splitted = l.split(":") (splitted(0), splitted(1).split(" ")) }) data.partitionBy(new OddEvenPartitioner(2)).persist data.foreach(elem => { val buf = new StringBuilder buf ++= "key[" + elem._1 + "]:" elem._2.foreach(n => buf ++= (n + " ")) println(buf.toString) }) val x = data.mapPartitions(iter => { val nums = iter.map(x => (x._1, x._2.foldLeft(0)(_ + _.toInt))) nums } ) x foreach println for(x <- x collect) { println(s"${x._1}:${x._2}") } } } class OddEvenPartitioner(nums:Int) extends Partitioner { override def numPartitions: Int = nums override def getPartition(key: Any): Int = { val n = key.asInstanceOf[String].length n % 2 } override def equals(others: Any) = others match { case o:OddEvenPartitioner => o.numPartitions == this.numPartitions case _ => false } }
金曜日, 1月 15, 2016
Scala - JSONの作成
以下の例はPlay frameworkのJSONライブラリを使ってJSONを作った例.
Scalaの型から作られたJSONをJson.parse()を使ってまたJSON化もしている.
SBTには以下を追加.
Scalaの型から作られたJSONをJson.parse()を使ってまたJSON化もしている.
import play.api.libs.json._ /** * Created by neko32 on 2016/01/15. */ object JsonSample { def main(args:Array[String]) = { val mine = Json.obj("name" -> "tanuchan", "age" -> 5, "Nekoz" -> Json.arr("Chatora", "Mikeneko", "Snowball")) println(mine.toString) } }
SBTには以下を追加.
libraryDependencies ++= Seq("com.typesafe.play" %% "play-json" % "2.4.6")
水曜日, 1月 13, 2016
Spark - JDBC経由でデータを取得
以下の例はSparkJDBCを使ってRDB(ここではMySQL. MySQLは使いやすくていいね!)からデータを取得しJDBCRDDを構築する.
import java.sql.{ResultSet, DriverManager} import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by neko32 on 2016/01/13. */ object MySQLSpark { def getConn() = { Class.forName("com.mysql.jdbc.Driver").newInstance DriverManager.getConnection("jdbc:mysql://localhost/neko32?user=takoneko888&password=XXXX") } def main(args:Array[String]):Unit = { val conf = new SparkConf().setMaster("local").setAppName("MySql") val sc = new SparkContext(conf) val data = new JdbcRDD(sc, getConn, "select * from test where id >= ? and id <= ?", upperBound = 2, lowerBound = 1, numPartitions = 1, mapRow = (r:ResultSet) => (r.getInt("id"), r.getString("name"))) println(data.collect.toList) } }
火曜日, 1月 12, 2016
Spark - CSVファイルの書き込み
以下の例はRDDの内容(Neko case class)をCSVファイルとしてHDFSに書き込む例.
package tanuneko import java.io.StringWriter import com.opencsv.CSVWriter import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConverters._ /** * Created by neko32 on 2016/01/11. */ object CSVWrite { case class Neko(name:String, age:Int) def main(args:Array[String]):Unit = { val conf = new SparkConf().setMaster("local").setAppName("WriteJson") val sc = new SparkContext(conf) val hadoopConf = new Configuration val fs = FileSystem.get(hadoopConf) val outFile = "/user/neko32/spark/study4/csvwrite" val nekoz = sc.parallelize(List(Neko("Tora", 8), Neko("Mikeyo", 4))) val stred = nekoz.map(n => List(n.age.toString, n.name).toArray) val nekoz2 = stred.mapPartitions { n => val writer = new StringWriter val csvWriter = new CSVWriter(writer) csvWriter.writeAll(n.toList.asJava) Iterator(writer.toString) }.cache nekoz2 foreach println if(fs.exists(new Path(outFile))) { fs.delete(new Path(outFile), true) } nekoz2 saveAsTextFile(outFile) } }
日曜日, 1月 10, 2016
Spark - ファイル名をキー,内容をバリューとしてファイルを読み込み
SparkContextのwholeTextFiles()を使うと,ファイル名をキー,内容をバリューのRDDを作ることが出来る.以下の例では,HDFS上に空白区切りで数字が羅列されている複数のファイルを一括で読み込んでそれぞれの算術平均を求めている.
上記のコードをファイルに保存した後spark-shellから以下のコマンドでロード.
val in = sc.wholeTextFiles("/user/neko32/spark/study3") val rez = in mapValues { case d => val nums = d.trim.split(" ").map(_.toInt) nums.sum / nums.size } rez foreach println
上記のコードをファイルに保存した後spark-shellから以下のコマンドでロード.
:load ./avgcal.scala
木曜日, 1月 07, 2016
Spark メモ - CSVからkey - non-key ペア変換
HDFS上のCSV風テキストを最終的に主キー - 非キー別タプルに変換する例.
// read input file separated by comma // suppose this input file consists of 4 cols (tradeid, version, instrument and trader name) val myrdd = sc.textFile("/user/neko32/spark/study1/contract.txt") // transform string to list val lines = myrdd.map(x => x.split(",").toList) // transform line list to tuple val myIn = lines.map(x => (x(0), x(1), x(2), x(3))) // transform line as tupleN to key-val pairs val byKey = myIn.map{case (c,v,i,n) => (c,v) -> (i,n)} // reduce by key val reducedByKey = byKey.reduceByKey{ case (x,y) => (x._1 + y._1, x._2 + y._2) }
日曜日, 1月 03, 2016
scala - 外部コマンドの実行
Scalaの外部コマンド実行はJavaのProcessBuilderほぼ同じ.
しかしながらscalaはパイプを用意に実現できたりとより強力である.
/** * Created by neko32 on 2016/01/03. */ object RunCommand { def main(args:Array[String]):Unit = { import sys.process._ // execute external command val result = "javac -version".! println(result) // return stdout val result2 = "javac -version".!! println(result2) // via Process or Seq command can be triggered val result3 = Seq("javac", "-version").!! println(result3) val result4 = Process("javac -version").!! println(result4) // dump stderr val result5 = "javac -takorin".lineStream_!.mkString println(result5) // redirect output to a file val result6 = ("javac -version" #> new File("C:\\tmp\\javav")).! println(result6) // this step may not work on Windows assume(new File("C:\\tmp\\javav").exists() && Source.fromFile("C:\\tmp\\javav").mkString.length() > 0, "file should exists!") } }
しかしながらscalaはパイプを用意に実現できたりとより強力である.
scala> ("ls /cygdrive/c/tmp/" #| "grep -i rachmaninov").!!.trim res1: String = Rachmaninov_piano3.jpg Rachmaninov2.jpg
scala - CSVファイルの読み込み
CSVファイルを読み込んで多重配列に保存するコードのメモ..
import resource._ import scala.io.Source object CSVReadRunner extends CSVRead { def main(args:Array[String]):Unit = { val vals = read("C:\\tmp\\tankoneko.csv") println("done.") } } /** * Created by neko32 on 2016/01/03. */ trait CSVRead { def read(path:String):Array[Array[String]] = { val arr = Array.ofDim[String](4,2) for { out <- managed(Source.fromFile(path)) } { for{ (line,i) <- out.getLines.zipWithIndex} { println(s"($i)@ILOOP") for{ (elem,j) <- line.split(",").map(_.trim).zipWithIndex} { println(s"($i,$j)@JLOOP") arr(i)(j) = elem } } } arr } }
scala - try with resource相当
Scalaにおいてtry-with-resourceのようなARMを使うには,私の知っている限りscala-ARMを使う必要がある.
以下の例はmanagedを使ってwriterをmanagedを使って自動で閉じ,scala.io.Sourceで読みだした後,自前のcloseAfterのローンパターンで閉じている.(Alvin Alexander氏のScala Cookbookを参考とさせていただいた)(
Source.from([file])が返すBufferedSourceはcloseメソッドを持っているのでcloseAfterの型要件を満たしている.
scala-ARMを使うには以下の行をbuild.sbtに追加.
以下の例はmanagedを使ってwriterをmanagedを使って自動で閉じ,scala.io.Sourceで読みだした後,自前のcloseAfterのローンパターンで閉じている.(Alvin Alexander氏のScala Cookbookを参考とさせていただいた)(
Source.from([file])が返すBufferedSourceはcloseメソッドを持っているのでcloseAfterの型要件を満たしている.
import java.io.{BufferedWriter, File, FileWriter} import resource.managed import scala.io.Source /** * Created by neko32 on 2016/01/03. */ object FileIO { def main(args:Array[String]):Unit = { val path = "C:\\tmp\\testneko.txt" val file = new File(path) if(file.exists) file.delete for { out <- data-blogger-escaped-bufferedwriter="" data-blogger-escaped-closeafter="" data-blogger-escaped-file="" data-blogger-escaped-filewriter="" data-blogger-escaped-managed="" data-blogger-escaped-n="" data-blogger-escaped-new="" data-blogger-escaped-nmachiko="" data-blogger-escaped-nmikeyo="" data-blogger-escaped-npowder="" data-blogger-escaped-orachan="" data-blogger-escaped-ource.fromfile="" data-blogger-escaped-out.flush="" data-blogger-escaped-out.write="" data-blogger-escaped-path="" data-blogger-escaped-src=""> val texts = src.getLines() for((m,n) <- data-blogger-escaped-a="" data-blogger-escaped-b="" data-blogger-escaped-close="" data-blogger-escaped-closeafter="" data-blogger-escaped-closelater="" data-blogger-escaped-def="" data-blogger-escaped-f:="" data-blogger-escaped-m="" data-blogger-escaped-n="" data-blogger-escaped-nit="" data-blogger-escaped-object="" data-blogger-escaped-println="" data-blogger-escaped-resource:="" data-blogger-escaped-s="" data-blogger-escaped-texts.zipwithindex=""> B):B = { try { f(resource) } finally { resource.close() } } }
scala-ARMを使うには以下の行をbuild.sbtに追加.
libraryDependencies ++= Seq("org.specs2" %% "specs2-core" % "3.6.5" % "test")
Scala - マップのキー・バリュー交換
scalaでは,mapのキー・バリュースワップを容易に実現出来る.
val myMap = Map("Tora" -> "Chatora", "Mikeyo" -> "Mike", "Powder" -> "Mike") val reversed = for((x,y) <- myMap) yield(y,x) println(reversed)
登録:
投稿 (Atom)