月曜日, 1月 18, 2016

Spark - StatsCounter

StatsCounterを使えば簡単に統計用の基本的な値を計算できる.

import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by neko32 on 2016/01/18.
  */
object Stats {

  def main(args:Array[String]) = {
    val conf = new SparkConf().setMaster("local").setAppName("stats")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("/user/neko32/spark/study7/nums.txt")
    val stats = rdd.flatMap(l => l.split(":")(1).split(" ").map(_.toInt)).stats
    println(s"mean:${stats.mean}, max:${stats.max}, min:${stats.min}, var:${stats.variance}")
  }
}

Spark - パーティション単位でのマップ

PairRDDのpartitionBy()を使ってパーティション毎の要素の合計を計算をする例.
入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.

lineA:73 44 58 62
lineAB:88 21 20
lineABC:50 21 99 82 25 62

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
  * Created by neko32 on 2016/01/17.
  */
object CalcPart {

  def main(args:Array[String]) = {
    val conf = new SparkConf().setMaster("local").setAppName("Takoran")
    val sc = new SparkContext(conf)
    val data =  sc.textFile("/user/neko32/spark/study7/nums.txt")
          .map(l => {
            val splitted = l.split(":")
            (splitted(0), splitted(1).split(" "))
          })
    data.partitionBy(new OddEvenPartitioner(2)).persist

    data.foreach(elem => {
      val buf = new StringBuilder
      buf ++= "key[" + elem._1 + "]:"
      elem._2.foreach(n => buf ++= (n + " "))
      println(buf.toString)
    })

    val x = data.mapPartitions(iter => {
        val nums = iter.map(x => (x._1, x._2.foldLeft(0)(_ + _.toInt)))
        nums
      }
    )

    x foreach println
    for(x <- x collect) {
      println(s"${x._1}:${x._2}")
    }
  }
}

class OddEvenPartitioner(nums:Int) extends Partitioner {
  override def numPartitions: Int = nums

  override def getPartition(key: Any): Int = {
    val n = key.asInstanceOf[String].length
    n % 2
  }

  override def equals(others: Any) = others match {
    case o:OddEvenPartitioner => o.numPartitions == this.numPartitions
    case _ => false
  }
}

金曜日, 1月 15, 2016

Scala - JSONの作成

以下の例はPlay frameworkのJSONライブラリを使ってJSONを作った例.
Scalaの型から作られたJSONをJson.parse()を使ってまたJSON化もしている.

import play.api.libs.json._
/**
  * Created by neko32 on 2016/01/15.
  */
object JsonSample {

  def main(args:Array[String]) = {
    val mine = Json.obj("name" -> "tanuchan", "age" -> 5, "Nekoz" -> Json.arr("Chatora", "Mikeneko", "Snowball"))
    println(mine.toString)
  }

}

SBTには以下を追加.
libraryDependencies ++= Seq("com.typesafe.play" %% "play-json" % "2.4.6")

水曜日, 1月 13, 2016

Spark - JDBC経由でデータを取得

以下の例はSparkJDBCを使ってRDB(ここではMySQL. MySQLは使いやすくていいね!)からデータを取得しJDBCRDDを構築する.

import java.sql.{ResultSet, DriverManager}

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by neko32 on 2016/01/13.
  */
object MySQLSpark {

  def getConn() = {
   Class.forName("com.mysql.jdbc.Driver").newInstance
    DriverManager.getConnection("jdbc:mysql://localhost/neko32?user=takoneko888&password=XXXX")
}
  def main(args:Array[String]):Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("MySql")
    val sc = new SparkContext(conf)

    val data = new JdbcRDD(sc, getConn, "select * from test where id >= ? and id <= ?",
                            upperBound = 2,
                            lowerBound = 1,
                            numPartitions = 1,
                            mapRow = (r:ResultSet) => (r.getInt("id"), r.getString("name")))
    println(data.collect.toList)

  }
}

火曜日, 1月 12, 2016

Spark - CSVファイルの書き込み

以下の例はRDDの内容(Neko case class)をCSVファイルとしてHDFSに書き込む例.

package tanuneko

import java.io.StringWriter

import com.opencsv.CSVWriter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConverters._

/**
  * Created by neko32 on 2016/01/11.
  */
object CSVWrite {

  case class Neko(name:String, age:Int)

  def main(args:Array[String]):Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WriteJson")
    val sc = new SparkContext(conf)
    val hadoopConf = new Configuration
    val fs = FileSystem.get(hadoopConf)
    val outFile = "/user/neko32/spark/study4/csvwrite"

    val nekoz = sc.parallelize(List(Neko("Tora", 8), Neko("Mikeyo", 4)))
    val stred = nekoz.map(n => List(n.age.toString, n.name).toArray)
    val nekoz2 = stred.mapPartitions { n =>
      val writer = new StringWriter
      val csvWriter = new CSVWriter(writer)
      csvWriter.writeAll(n.toList.asJava)
      Iterator(writer.toString)
    }.cache
    nekoz2 foreach println
    if(fs.exists(new Path(outFile))) {
      fs.delete(new Path(outFile), true)
    }
    nekoz2 saveAsTextFile(outFile)
  }

}

日曜日, 1月 10, 2016

Spark - ファイル名をキー,内容をバリューとしてファイルを読み込み

SparkContextのwholeTextFiles()を使うと,ファイル名をキー,内容をバリューのRDDを作ることが出来る.以下の例では,HDFS上に空白区切りで数字が羅列されている複数のファイルを一括で読み込んでそれぞれの算術平均を求めている.

val in = sc.wholeTextFiles("/user/neko32/spark/study3")
val rez = in mapValues {
    case d =>
    val nums = d.trim.split(" ").map(_.toInt)
    nums.sum / nums.size
}
rez foreach println

上記のコードをファイルに保存した後spark-shellから以下のコマンドでロード.

:load ./avgcal.scala

木曜日, 1月 07, 2016

Spark メモ - CSVからkey - non-key ペア変換

HDFS上のCSV風テキストを最終的に主キー - 非キー別タプルに変換する例.


// read input file separated by comma
// suppose this input file consists of 4 cols (tradeid, version, instrument and trader name)
val myrdd = sc.textFile("/user/neko32/spark/study1/contract.txt")

// transform string to list
val lines = myrdd.map(x => x.split(",").toList)

// transform line list to tuple
val myIn = lines.map(x => (x(0), x(1), x(2), x(3)))

// transform line as tupleN to key-val pairs
val byKey = myIn.map{case (c,v,i,n) => (c,v) -> (i,n)}

// reduce by key
val reducedByKey = byKey.reduceByKey{ case (x,y) => (x._1 + y._1, x._2 + y._2) }

日曜日, 1月 03, 2016

scala - 外部コマンドの実行

Scalaの外部コマンド実行はJavaのProcessBuilderほぼ同じ.
/**
  * Created by neko32 on 2016/01/03.
  */
object RunCommand {
  def main(args:Array[String]):Unit = {
    import sys.process._
    // execute external command
    val result = "javac -version".!
    println(result)
    // return stdout
    val result2 = "javac -version".!!
    println(result2)
    // via Process or Seq command can be triggered
    val result3 = Seq("javac", "-version").!!
    println(result3)
    val result4 = Process("javac -version").!!
    println(result4)
    // dump stderr
    val result5 = "javac -takorin".lineStream_!.mkString
    println(result5)
    // redirect output to a file
    val result6 = ("javac -version" #> new File("C:\\tmp\\javav")).!
    println(result6)
    // this step may not work on Windows
    assume(new File("C:\\tmp\\javav").exists() &&
           Source.fromFile("C:\\tmp\\javav").mkString.length() > 0, "file should exists!")
  }
}

しかしながらscalaはパイプを用意に実現できたりとより強力である.
scala> ("ls /cygdrive/c/tmp/" #| "grep -i rachmaninov").!!.trim
res1: String =
Rachmaninov_piano3.jpg
Rachmaninov2.jpg

scala - CSVファイルの読み込み

CSVファイルを読み込んで多重配列に保存するコードのメモ..

import resource._

import scala.io.Source

object CSVReadRunner extends CSVRead {
  def main(args:Array[String]):Unit = {
    val vals = read("C:\\tmp\\tankoneko.csv")
    println("done.")
  }
}

/**
  * Created by neko32 on 2016/01/03.
  */
trait CSVRead {
  def read(path:String):Array[Array[String]] = {
    val arr = Array.ofDim[String](4,2)
    for {
      out <- managed(Source.fromFile(path))
    } {
      for{ (line,i) <- out.getLines.zipWithIndex} {
        println(s"($i)@ILOOP")
        for{ (elem,j) <- line.split(",").map(_.trim).zipWithIndex} {
          println(s"($i,$j)@JLOOP")
          arr(i)(j) = elem
        }
      }
    }
    arr
  }
}

scala - try with resource相当

Scalaにおいてtry-with-resourceのようなARMを使うには,私の知っている限りscala-ARMを使う必要がある.
以下の例はmanagedを使ってwriterをmanagedを使って自動で閉じ,scala.io.Sourceで読みだした後,自前のcloseAfterのローンパターンで閉じている.(Alvin Alexander氏のScala Cookbookを参考とさせていただいた)(
Source.from([file])が返すBufferedSourceはcloseメソッドを持っているのでcloseAfterの型要件を満たしている.
import java.io.{BufferedWriter, File, FileWriter}

import resource.managed

import scala.io.Source

/**
  * Created by neko32 on 2016/01/03.
  */
object FileIO {
  def main(args:Array[String]):Unit = {
    val path = "C:\\tmp\\testneko.txt"
    val file = new File(path)
    if(file.exists) file.delete

    for {
      out <- data-blogger-escaped-bufferedwriter="" data-blogger-escaped-closeafter="" data-blogger-escaped-file="" data-blogger-escaped-filewriter="" data-blogger-escaped-managed="" data-blogger-escaped-n="" data-blogger-escaped-new="" data-blogger-escaped-nmachiko="" data-blogger-escaped-nmikeyo="" data-blogger-escaped-npowder="" data-blogger-escaped-orachan="" data-blogger-escaped-ource.fromfile="" data-blogger-escaped-out.flush="" data-blogger-escaped-out.write="" data-blogger-escaped-path="" data-blogger-escaped-src="">
        val texts = src.getLines()
        for((m,n) <- data-blogger-escaped-a="" data-blogger-escaped-b="" data-blogger-escaped-close="" data-blogger-escaped-closeafter="" data-blogger-escaped-closelater="" data-blogger-escaped-def="" data-blogger-escaped-f:="" data-blogger-escaped-m="" data-blogger-escaped-n="" data-blogger-escaped-nit="" data-blogger-escaped-object="" data-blogger-escaped-println="" data-blogger-escaped-resource:="" data-blogger-escaped-s="" data-blogger-escaped-texts.zipwithindex=""> B):B = {
    try {
      f(resource)
    } finally {
      resource.close()
    }
  }
}

scala-ARMを使うには以下の行をbuild.sbtに追加.
libraryDependencies ++= Seq("org.specs2" %% "specs2-core" % "3.6.5" % "test")

Scala - マップのキー・バリュー交換

scalaでは,mapのキー・バリュースワップを容易に実現出来る.


    val myMap = Map("Tora" -> "Chatora", "Mikeyo" -> "Mike", "Powder" -> "Mike")
    val reversed = for((x,y) <- myMap) yield(y,x)
    println(reversed)