import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by neko32 on 2016/01/18.
*/
object Stats {
def main(args:Array[String]) = {
val conf = new SparkConf().setMaster("local").setAppName("stats")
val sc = new SparkContext(conf)
val rdd = sc.textFile("/user/neko32/spark/study7/nums.txt")
val stats = rdd.flatMap(l => l.split(":")(1).split(" ").map(_.toInt)).stats
println(s"mean:${stats.mean}, max:${stats.max}, min:${stats.min}, var:${stats.variance}")
}
}
月曜日, 1月 18, 2016
Spark - StatsCounter
StatsCounterを使えば簡単に統計用の基本的な値を計算できる.
Spark - パーティション単位でのマップ
PairRDDのpartitionBy()を使ってパーティション毎の要素の合計を計算をする例.
入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.
入力ファイルは以下のようなキーバリューの対とする.また,バリューはスペース区切りの整数の列とする.
lineA:73 44 58 62 lineAB:88 21 20 lineABC:50 21 99 82 25 62
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
/**
* Created by neko32 on 2016/01/17.
*/
object CalcPart {
def main(args:Array[String]) = {
val conf = new SparkConf().setMaster("local").setAppName("Takoran")
val sc = new SparkContext(conf)
val data = sc.textFile("/user/neko32/spark/study7/nums.txt")
.map(l => {
val splitted = l.split(":")
(splitted(0), splitted(1).split(" "))
})
data.partitionBy(new OddEvenPartitioner(2)).persist
data.foreach(elem => {
val buf = new StringBuilder
buf ++= "key[" + elem._1 + "]:"
elem._2.foreach(n => buf ++= (n + " "))
println(buf.toString)
})
val x = data.mapPartitions(iter => {
val nums = iter.map(x => (x._1, x._2.foldLeft(0)(_ + _.toInt)))
nums
}
)
x foreach println
for(x <- x collect) {
println(s"${x._1}:${x._2}")
}
}
}
class OddEvenPartitioner(nums:Int) extends Partitioner {
override def numPartitions: Int = nums
override def getPartition(key: Any): Int = {
val n = key.asInstanceOf[String].length
n % 2
}
override def equals(others: Any) = others match {
case o:OddEvenPartitioner => o.numPartitions == this.numPartitions
case _ => false
}
}
金曜日, 1月 15, 2016
Scala - JSONの作成
以下の例はPlay frameworkのJSONライブラリを使ってJSONを作った例.
Scalaの型から作られたJSONをJson.parse()を使ってまたJSON化もしている.
SBTには以下を追加.
Scalaの型から作られたJSONをJson.parse()を使ってまたJSON化もしている.
import play.api.libs.json._
/**
* Created by neko32 on 2016/01/15.
*/
object JsonSample {
def main(args:Array[String]) = {
val mine = Json.obj("name" -> "tanuchan", "age" -> 5, "Nekoz" -> Json.arr("Chatora", "Mikeneko", "Snowball"))
println(mine.toString)
}
}
SBTには以下を追加.
libraryDependencies ++= Seq("com.typesafe.play" %% "play-json" % "2.4.6")
水曜日, 1月 13, 2016
Spark - JDBC経由でデータを取得
以下の例はSparkJDBCを使ってRDB(ここではMySQL. MySQLは使いやすくていいね!)からデータを取得しJDBCRDDを構築する.
import java.sql.{ResultSet, DriverManager}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by neko32 on 2016/01/13.
*/
object MySQLSpark {
def getConn() = {
Class.forName("com.mysql.jdbc.Driver").newInstance
DriverManager.getConnection("jdbc:mysql://localhost/neko32?user=takoneko888&password=XXXX")
}
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setMaster("local").setAppName("MySql")
val sc = new SparkContext(conf)
val data = new JdbcRDD(sc, getConn, "select * from test where id >= ? and id <= ?",
upperBound = 2,
lowerBound = 1,
numPartitions = 1,
mapRow = (r:ResultSet) => (r.getInt("id"), r.getString("name")))
println(data.collect.toList)
}
}
火曜日, 1月 12, 2016
Spark - CSVファイルの書き込み
以下の例はRDDの内容(Neko case class)をCSVファイルとしてHDFSに書き込む例.
package tanuneko
import java.io.StringWriter
import com.opencsv.CSVWriter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters._
/**
* Created by neko32 on 2016/01/11.
*/
object CSVWrite {
case class Neko(name:String, age:Int)
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WriteJson")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration
val fs = FileSystem.get(hadoopConf)
val outFile = "/user/neko32/spark/study4/csvwrite"
val nekoz = sc.parallelize(List(Neko("Tora", 8), Neko("Mikeyo", 4)))
val stred = nekoz.map(n => List(n.age.toString, n.name).toArray)
val nekoz2 = stred.mapPartitions { n =>
val writer = new StringWriter
val csvWriter = new CSVWriter(writer)
csvWriter.writeAll(n.toList.asJava)
Iterator(writer.toString)
}.cache
nekoz2 foreach println
if(fs.exists(new Path(outFile))) {
fs.delete(new Path(outFile), true)
}
nekoz2 saveAsTextFile(outFile)
}
}
日曜日, 1月 10, 2016
Spark - ファイル名をキー,内容をバリューとしてファイルを読み込み
SparkContextのwholeTextFiles()を使うと,ファイル名をキー,内容をバリューのRDDを作ることが出来る.以下の例では,HDFS上に空白区切りで数字が羅列されている複数のファイルを一括で読み込んでそれぞれの算術平均を求めている.
上記のコードをファイルに保存した後spark-shellから以下のコマンドでロード.
val in = sc.wholeTextFiles("/user/neko32/spark/study3")
val rez = in mapValues {
case d =>
val nums = d.trim.split(" ").map(_.toInt)
nums.sum / nums.size
}
rez foreach println
上記のコードをファイルに保存した後spark-shellから以下のコマンドでロード.
:load ./avgcal.scala
木曜日, 1月 07, 2016
Spark メモ - CSVからkey - non-key ペア変換
HDFS上のCSV風テキストを最終的に主キー - 非キー別タプルに変換する例.
// read input file separated by comma
// suppose this input file consists of 4 cols (tradeid, version, instrument and trader name)
val myrdd = sc.textFile("/user/neko32/spark/study1/contract.txt")
// transform string to list
val lines = myrdd.map(x => x.split(",").toList)
// transform line list to tuple
val myIn = lines.map(x => (x(0), x(1), x(2), x(3)))
// transform line as tupleN to key-val pairs
val byKey = myIn.map{case (c,v,i,n) => (c,v) -> (i,n)}
// reduce by key
val reducedByKey = byKey.reduceByKey{ case (x,y) => (x._1 + y._1, x._2 + y._2) }
日曜日, 1月 03, 2016
scala - 外部コマンドの実行
Scalaの外部コマンド実行はJavaのProcessBuilderほぼ同じ.
しかしながらscalaはパイプを用意に実現できたりとより強力である.
/**
* Created by neko32 on 2016/01/03.
*/
object RunCommand {
def main(args:Array[String]):Unit = {
import sys.process._
// execute external command
val result = "javac -version".!
println(result)
// return stdout
val result2 = "javac -version".!!
println(result2)
// via Process or Seq command can be triggered
val result3 = Seq("javac", "-version").!!
println(result3)
val result4 = Process("javac -version").!!
println(result4)
// dump stderr
val result5 = "javac -takorin".lineStream_!.mkString
println(result5)
// redirect output to a file
val result6 = ("javac -version" #> new File("C:\\tmp\\javav")).!
println(result6)
// this step may not work on Windows
assume(new File("C:\\tmp\\javav").exists() &&
Source.fromFile("C:\\tmp\\javav").mkString.length() > 0, "file should exists!")
}
}
しかしながらscalaはパイプを用意に実現できたりとより強力である.
scala> ("ls /cygdrive/c/tmp/" #| "grep -i rachmaninov").!!.trim
res1: String =
Rachmaninov_piano3.jpg
Rachmaninov2.jpg
scala - CSVファイルの読み込み
CSVファイルを読み込んで多重配列に保存するコードのメモ..
import resource._
import scala.io.Source
object CSVReadRunner extends CSVRead {
def main(args:Array[String]):Unit = {
val vals = read("C:\\tmp\\tankoneko.csv")
println("done.")
}
}
/**
* Created by neko32 on 2016/01/03.
*/
trait CSVRead {
def read(path:String):Array[Array[String]] = {
val arr = Array.ofDim[String](4,2)
for {
out <- managed(Source.fromFile(path))
} {
for{ (line,i) <- out.getLines.zipWithIndex} {
println(s"($i)@ILOOP")
for{ (elem,j) <- line.split(",").map(_.trim).zipWithIndex} {
println(s"($i,$j)@JLOOP")
arr(i)(j) = elem
}
}
}
arr
}
}
scala - try with resource相当
Scalaにおいてtry-with-resourceのようなARMを使うには,私の知っている限りscala-ARMを使う必要がある.
以下の例はmanagedを使ってwriterをmanagedを使って自動で閉じ,scala.io.Sourceで読みだした後,自前のcloseAfterのローンパターンで閉じている.(Alvin Alexander氏のScala Cookbookを参考とさせていただいた)(
Source.from([file])が返すBufferedSourceはcloseメソッドを持っているのでcloseAfterの型要件を満たしている.
scala-ARMを使うには以下の行をbuild.sbtに追加.
以下の例はmanagedを使ってwriterをmanagedを使って自動で閉じ,scala.io.Sourceで読みだした後,自前のcloseAfterのローンパターンで閉じている.(Alvin Alexander氏のScala Cookbookを参考とさせていただいた)(
Source.from([file])が返すBufferedSourceはcloseメソッドを持っているのでcloseAfterの型要件を満たしている.
import java.io.{BufferedWriter, File, FileWriter}
import resource.managed
import scala.io.Source
/**
* Created by neko32 on 2016/01/03.
*/
object FileIO {
def main(args:Array[String]):Unit = {
val path = "C:\\tmp\\testneko.txt"
val file = new File(path)
if(file.exists) file.delete
for {
out <- data-blogger-escaped-bufferedwriter="" data-blogger-escaped-closeafter="" data-blogger-escaped-file="" data-blogger-escaped-filewriter="" data-blogger-escaped-managed="" data-blogger-escaped-n="" data-blogger-escaped-new="" data-blogger-escaped-nmachiko="" data-blogger-escaped-nmikeyo="" data-blogger-escaped-npowder="" data-blogger-escaped-orachan="" data-blogger-escaped-ource.fromfile="" data-blogger-escaped-out.flush="" data-blogger-escaped-out.write="" data-blogger-escaped-path="" data-blogger-escaped-src="">
val texts = src.getLines()
for((m,n) <- data-blogger-escaped-a="" data-blogger-escaped-b="" data-blogger-escaped-close="" data-blogger-escaped-closeafter="" data-blogger-escaped-closelater="" data-blogger-escaped-def="" data-blogger-escaped-f:="" data-blogger-escaped-m="" data-blogger-escaped-n="" data-blogger-escaped-nit="" data-blogger-escaped-object="" data-blogger-escaped-println="" data-blogger-escaped-resource:="" data-blogger-escaped-s="" data-blogger-escaped-texts.zipwithindex=""> B):B = {
try {
f(resource)
} finally {
resource.close()
}
}
}
scala-ARMを使うには以下の行をbuild.sbtに追加.
libraryDependencies ++= Seq("org.specs2" %% "specs2-core" % "3.6.5" % "test")
Scala - マップのキー・バリュー交換
scalaでは,mapのキー・バリュースワップを容易に実現出来る.
val myMap = Map("Tora" -> "Chatora", "Mikeyo" -> "Mike", "Powder" -> "Mike")
val reversed = for((x,y) <- myMap) yield(y,x)
println(reversed)
木曜日, 12月 31, 2015
Scala - Sequenceマッチ
Scalaのmatch文は極めて強力で,そのうちの一つのリスト要素へのパターンマッチも便利なものの一つだろう.以下の例はMyCatへのコンストラクタマッチとリスト要素へのシーケンスパターンマッチの組み合わせの例.もしパターンマッチ内でジェネリック型も含めた型パターンマッチ等をしていて,かつJavaからそのscalaコードから作られたクラスを利用する場合は,Javaの型消去の特性に気を付けるように.
object Matcher {
def main(args:Array[String]):Unit = {
val nekoz = genCats()
nekoz.foreach{ x =>
x match {
case MyCat(a,b) if a > 5 => println("Adult neko.")
case _ => println("..")
}
}
nekoz match {
case l @ List(MyCat(8, _), _*) => println(s"okay, starting with 8 year old cat - $l")
case _ => println("..")
}
}
def genCats() = {
MyCat(8, "Tora") :: MyCat(4, "Mikeyo") :: List()
}
}
case class MyCat(age:Int, name:String)
水曜日, 12月 30, 2015
Scala - ワードカウント
SparkではreduceByKeyで簡単にワードカウントができるが,scalaではfoldLeftで実装出来る.
val x = """kiji,saru,saru#saru,inu,kiji,saru#inu,kiji,kiji#kiji#inu,inu#saru"""
val v = x.split("#").flatMap(_.split(",")).map(s => (s,1))
val z = v.foldLeft(Map[String,Int]())((accm, elem) => {
if(accm.contains(elem._1))
accm.updated(elem._1, elem._2 + accm(elem._1))
else
accm + elem
})
火曜日, 12月 29, 2015
scala - ディレクトリのファイル操作
妻のパソコンの,とあるフォルダにある画像ファイルのタイムスタンプが何故かおかしくなってしまった.ファイルの数が膨大ということもあり,scalaでさっとコードを書いて修正した.
import java.io.File
import java.time.{LocalDateTime, ZoneId}
import java.util.Date
import org.apache.commons.io.FileUtils
/**
* Created by neko32 on 2015/12/28.
*/
class FileDateChanger(preProcessDir:String, postProcessDir:String) {
implicit def dateToLDate(d:Date):LocalDateTime = {
d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime()
}
def process(): Unit = {
clean()
val files = new File(preProcessDir).listFiles().filter(f => dateToLDate(new Date(f.lastModified())).getYear() == 2007)
val totalNum = files.size
var at = 0
println(s"Total number of files - ${totalNum}")
files.foreach { f =>
var dat = new Date(f.lastModified())
var f2 = new File(postProcessDir + "\\" + f.getName())
println(s"processing .. ${f.getName()}[${new Date(f.lastModified())}]")
FileUtils.copyFile(f, f2)
f2.setLastModified(dat.plusYears(8).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli)
println(s"to ${new Date(f2.lastModified())}")
if(at % 10 == 0) println(s"processed ${at}")
at += 1
}
println(s"done. # of processed files [${at}]")
}
private def clean(): Unit = {
println(s"cleaning ${postProcessDir}")
new File(postProcessDir).listFiles().foreach(_.delete())
println("cleaning done.")
}
}
object Runner {
def main(args:Array[String]) = {
val pre = "C:\\tmp\\pre_process"
val post = "C:\\tmp\\post_process"
new FileDateChanger(pre, post).process()
}
}
日曜日, 12月 27, 2015
Scala - 乱数生成プロバイダを用いるstream
val r = new Random(System.currentTimeMillis())
def rand(max:Int):Stream[Int] = Stream.cons(r.nextInt(max), rand(max))
rand(100) take 10 foreach println
// assign index starting from 1. zipWithIndex()'s index starts from 0
rand(50) take 5 zip(Stream from 1) foreach println
金曜日, 12月 25, 2015
Scala - Enumeration
ScalaでEnumerationを継承してenumを定義する例.
object Weekdays extends Enumeration {
type WEEKDAY = Value
val MON, TUE, WED = Value
}
object MyMain {
def isMonday(w:Weekdays.Value) = {
w == Weekdays.MON
}
def main(args:Array[String]) = {
val w = Weekdays.MON
val w2 = Weekdays.WED
println(isMonday(w))
println(isMonday(w2))
}
}
木曜日, 12月 24, 2015
Scala - akka アクタ
Akka actorのサンプルメモ..
import akka.actor._
/**
* Created by neko32 on 2015/12/23.
*/
sealed abstract trait Message
case class Request(a:Int, b:Int, op:Operator) extends Message
case class Response(a:Int, b:Int, op:Operator, result:Int) extends Message
case class ExecStart(cmd:String) extends Message
case class ExecEnd(a:Any) extends Message
sealed abstract class Operator(a:Int, b:Int) {
def calc():Int
}
case class Plus(a:Int, b:Int) extends Operator(a,b) {
override def calc() = a + b
override def toString() = " + "
}
case class Minus(a:Int, b:Int) extends Operator(a,b) {
override def calc() = a - b
override def toString() = " - "
}
case class Multiply(a:Int, b:Int) extends Operator(a,b) {
override def calc() = a * b
override def toString() = " * "
}
case class Divide(a:Int, b:Int) extends Operator(a,b) {
require(b != 0, "zero divide not allowed")
override def calc() = a / b
override def toString() = " / "
}
class CalcWorker extends Actor {
override def receive = {
case Request(a,b,op) if op.isInstanceOf[Plus] => println("sending.."); sender ! Response(a, b, op, a + b)
case Request(a,b,op) if op.isInstanceOf[Minus] => sender ! Response(a, b, op, a - b)
case Request(a,b,op) if op.isInstanceOf[Multiply] => sender ! Response(a, b, op, a * b)
case Request(a,b,op) if op.isInstanceOf[Divide] => sender ! Response(a, b, op, a / b)
case ExecEnd => context.stop(self)
}
}
trait Requester extends Actor with ActorLogging {
override def receive = {
case Response(a, b, op, rez) =>
log.info(s"got result from calc agent .. ${a}${op}${b} = ${rez}")
case ExecStart(cmd) => run(cmd)
case ExecEnd => context.stop(self)
case _ => log.info("....")
}
def run(cmd:String)
}
class CalcRequester(calcAgent:ActorRef) extends Requester {
def toOp(vals:Array[String]):Operator = vals(2) match {
case s if s == "plus" => Plus(vals(0).toInt, vals(1).toInt)
case s if s == "minus" => Minus(vals(0).toInt, vals(1).toInt)
case s if s == "mult" => Multiply(vals(0).toInt, vals(1).toInt)
case s if s == "div" => Divide(vals(0).toInt, vals(1).toInt)
}
override def run(cmd:String) = {
// parse a,b
assume(cmd.count(_ == ',') == 2)
val vals = cmd.toLowerCase().split(",")
println("parsed input - " + vals(0) + "," + vals(1) + "," + vals(2))
val res = calcAgent ! Request(vals(0).toInt, vals(1).toInt, toOp(vals))
}
}
object MyApp {
def main(args:Array[String]) = {
val sys = ActorSystem("CalcSys")
val calcAgent = sys.actorOf(Props[CalcWorker], "CalcAgent")
val calcReq = sys.actorOf(Props(new CalcRequester(calcAgent)), "CalcReq")
calcReq ! ExecStart("3,8,plus")
Thread.sleep(3000)
calcReq ! ExecEnd
calcAgent ! ExecEnd
val f = sys.terminate()
}
}
水曜日, 12月 23, 2015
Scala - Mixin, traitそしてselfによる型間依存関係の表現
Scalaのmixinとtraitそしてselfアノテーションを利用することにより,Javaと比較してより明快に型間の依存関係を宣言出来る.Javaではミックスインが出来ない為,クラス間の依存関係はあるクラスのprivate or protectedフィールドとして表現されていることがご想像いただけるかと思う.
case class Neko(name:String,pattern:String, age:Int)
trait CatManage {
self: CatManage with DefNekoCheck =>
var nekoz:List[Neko]
def init():Unit
def lookupCat(name:String):Neko
}
trait DefNekoCheck {
def ? (neko:Neko) = neko != null
}
trait NekoCheck extends DefNekoCheck {
override def ? (neko:Neko) = neko.name == "NA"
}
trait NekozManage extends CatManage with NekoCheck {
override var nekoz = List[Neko]()
override def init():Unit = {
nekoz ::= Neko("Mikeyo", "Mike", 3)
nekoz ::= Neko("Tora", "Chatora", 8)
nekoz ::= Neko("Powder", "Mike", 8)
nekoz ::= Neko("Machiko", "Mike", 10)
}
override def lookupCat(name:String):Neko = {
val n = nekoz find(_.name == name)
n.getOrElse(Neko("NA", "NA", -1))
}
}
trait Act {
def start(f: => List[String])
}
trait App {
self: Act with CatManage =>
override def start(execPlan: => List[String]) = {
init
val l = execPlan
l.foreach{ name =>
lookupCat(name) match {
case x if x.name == "NA" => println(s"${name} not found..")
case x => println(x)
}
}
}
}
日曜日, 12月 20, 2015
Scala - visitorパターンのようなもの
ScalaではVisitorパターンを割合簡潔に書くことが出来る.以下の例はあるデータベース上で接続・検索を実行するモジュールで,この例では通常のRDB, HiveそしてインメモリDBをサポートしているとする.また,(乱暴ではあるが簡略のために)RDBとHiveserver2はコネクトとクエリを同一の手順,インメモリはコネクトを必要としないと仮に想定している.
package tanuneko
/**
* Created by neko32 on 2015/12/20.
*/
sealed abstract class DB(host:String, userId:String, passwd:String)
case class RegularRDB(host:String, userId:String, passwd:String) extends DB(host, userId, passwd)
case class InMemoryDB(userId:String, passwd:String) extends DB(null, userId, passwd)
case class HiveDB(host:String, userId:String, passwd:String) extends DB(host, userId, passwd)
case class ResultSet(data:String)
trait DBAction {
def connect():Unit = {}
def runQuery(str:String):ResultSet
}
class RegularDBAction(host:String, userId:String, passwd:String) extends DBAction {
override def connect(): Unit = {
println(s"connecting to ${host} with user[${userId}]")
}
override def runQuery(sql: String): ResultSet = {
println(s"send query[${sql} to Database[${host}]")
new ResultSet("returned data.")
}
}
class InMemoryDBAction(userId:String, passwd:String) extends DBAction {
override def runQuery(sql: String): ResultSet = {
println(s"[inmem] running query[${sql}]")
new ResultSet("returned data.")
}
}
object Runner {
def main(args:Array[String]):Unit = {
val userId = "neko"
val passwd = "miPasswd150X"
val host = "myhost.org:16500"
val sql = "select * from mydb.mytbl"
def runQueryAfterConnect = (db:DB) => db match {
case r:RegularRDB => {
val act = new RegularDBAction(r.host, r.userId, r.passwd)
act.connect()
println(s"result - ${act.runQuery(sql)}")
}
case i:InMemoryDB => {
val act = new InMemoryDBAction(i.userId, i.passwd)
println(s"[inmem] result - ${act.runQuery(sql)}")
}
case r:HiveDB => {
val act = new RegularDBAction(r.host, r.userId, r.passwd)
act.connect()
println(s"result - ${act.runQuery(sql)}")
}
}
val sybase = new RegularRDB(host, userId, passwd)
runQueryAfterConnect(sybase)
val inmem = new InMemoryDB(userId, passwd)
runQueryAfterConnect(inmem)
val hive = new HiveDB(host, userId, passwd)
runQueryAfterConnect(hive)
}
}
月曜日, 12月 14, 2015
scala - tailrecアノテーション
scalaではtailrecアノテーションを使って末尾再帰の最適化をすることができる.
関数は再起呼び出しで終わっている必要がある.
関数は再起呼び出しで終わっている必要がある.
@tailrec
def fact(i:BigInt, accm:BigInt):BigInt = {
i match {
case _ if i == 1 => accm
case _ => fact(i - 1, i * accm)
}
}
登録:
投稿 (Atom)