Scalaz.concurrent

この記事は、Scala Advent Calendar JP 2011 の14日目です。無理を言って、二日間も枠を頂いてしまいました。

今回は、Scalaz 6.0.3 の concurrent について解説を行います。個人的には Scalaz.concurrent の主役は Promise だと思っているので、早速、その解説から行いたいのですが、Promise を理解する為には、Promise と同一パッケージ内に存在する Strategy, Effect, Actor の理解が不可欠であるため、その解説から順番に行っていきます。とは言え、これらは非常にコード量が少ないので簡単に理解できると思います。

御託はいいから、さっさとコード読ませろ!という方は、こちらをご確認ください。これを読んで意味が理解できる方は、以下の解説を読む必要はありません。

以降、掲載する動作確認用のコードには、全て下記が import されている事とします。説明の都合上、Scalaz.concurrent のコードも併記していきますが、そちらとは無関係です。

import scalaz._
import scalaz.concurrent._
import Scalaz._
import java.lang.Thread.currentThread

Strategy

まずは、Strategy の定義から眺めていきます。下記をご覧ください。

trait Strategy {
  def apply[A](a: => A): () => A
}

abstract class StrategyLow {
  /**
   * A strategy that evaluates its argument in the current thread.
   */
  implicit val Sequential: Strategy = new Strategy {
    def apply[A](a: => A) = {
      val v = a
      () => v
    }
  }

//..snip..

object Strategy extends StrategyLow {

//..snip..

scalaz.concurrent.Strategy のソースコードから Strategy Trait と、その実装の中から一番単純なものを抜粋しました。

Strategy Trait だけ眺めると意味が解りませんが、その実装である Sequential を眺めると、apply は、受け取った関数を評価し、その結果を返す無名関数を返すものだと解ります。

続けて同じソースコード内の他の Strategy Trait の実装も見てみましょう。

  implicit val Naive: Strategy = new Strategy {
    import scala.concurrent.ops.future
    def apply[A](a: => A) = future {a}
  }

受け取った関数を future 内で評価…即ち、毎回、新規に Thread を起動し、その上で関数を評価しています。

試しに使ってみましょう。

import Strategy.Sequential
val seq = Sequential {currentThread.getName}
println("seq = " + seq())

import Strategy.Naive
val fut = Naive {currentThread.getName}
println("fut = " + fut())

上記を REPL 等に入力すると、下記のような結果が得られます。

seq = run-main
fut = Thread-4

scalaz.concurrent.Strategy 内には、Sequential や Naive の他に、幾つか有益な Strategy Trait の実装があるので、是非、ご確認ください。ちなみに、一番お世話になるのは下記でしょうか。

  import java.util.concurrent.ExecutorService

  implicit def Executor(implicit s: ExecutorService) = new Strategy {
    import java.util.concurrent.Callable
    def apply[A](a: => A) = {
      val fut = s.submit(new Callable[A] {
        def call = a
      })
      () => fut.get
    }
  }

使う場合は下記の通りです。

import Strategy.Executor
import java.util.concurrent.{TimeUnit, Executors}

implicit val executorService = Executors.newFixedThreadPool(2)
val fut = Executor.apply {currentThread.getName}
println("fut = " + fut())

executorService.shutdown
executorService.awaitTermination(60L, TimeUnit.SECONDS)

出力結果は下記の通りです。

fut = pool-4-thread-1

Executor を利用する際、上記のような苦労をせずとも scalaz.concurrent.Strategy には下記の通り DefaultStrategy が定義されているので、通常はそちらを使う事になると思います。

object Strategy extends StrategyLow {
  lazy val DefaultExecutorService: ExecutorService = {
    import Executors._
    newFixedThreadPool(Runtime.getRuntime.availableProcessors, new ThreadFactory {
      def newThread(r: Runnable) = {
        val t = defaultThreadFactory.newThread(r)
        t.setDaemon(true)
        t
      }
    })
  }

  implicit lazy val DefaultStrategy: Strategy = Executor(DefaultExecutorService)
}

作成した Thread を Daemonize している所が特徴と言えば特徴です。

Strategy Trait の実装は簡単に定義できるので、例えば、Scala 標準の Remote Actor を利用し、物理的に離れたノード上で関数を実行する Strategy を新たに定義する事もできます。ご興味がある方はチャレンジしてみてください。

Effect

scalaz.concurrent.Effect のソースコードは、あまりにも短いので解説を省きます。

使い方は以下の通りです。

import Strategy.Naive
val ef1 = effect[String] { message =>
  println(currentThread.getName + " : " + message)
}
(1 to 3) foreach {n => ef1 ! "run my effect"}

implicit な Strategy が無ければ DefaultStrategy が使用されるのですが、確認用の出力が解り易いので Naive を利用しています。
出力結果は下記の通りです。

Thread-6 : run my effect
Thread-7 : run my effect
Thread-8 : run my effect

メッセージ送信の度に違う Thread 上で関数が評価されている事が解ると思います。

最後に Effect と関数の合成の例を挙げます。

val ef2 = ef1 contramap { (n: Int) =>
  println(currentThread.getName + " : " + n.toString)
  "run my effect " + n.toString
}
(1 to 3) foreach {n => ef2 ! n}

Effect[String] と Int => String を合成して Effect[Int] を作成しています。
出力結果は下記の通りです。

Thread-10 : 1
Thread-11 : 2
Thread-12 : run my effect 1
Thread-13 : run my effect 2
Thread-14 : 3
Thread-15 : run my effect 3

始めに Int => String が評価され、次に ef1 が評価されている事が解ると思います。

Actor

ちょっと息切れ*1してきたので scalaz.concurrent.Actor のソースコード解説は無しで…いきなり使い方の解説を行います。とは言え、scalaz.concurrent.Actor のソースコードも短いので、是非、ご一読ください。

使い方は下記の通りです。

import Strategy.Naive
val a1 = actor { (message: String) =>
  println(currentThread.getName + " : " + message)
}
(1 to 3) foreach {n => a1 ! "run my actor"}

出力結果は下記の通りです。

Thread-4 : run my actor
Thread-4 : run my actor
Thread-4 : run my actor

メッセージを一旦メールボックスに貯めているので、Naive を使用していてもタイミングによっては、同一 Thread 上で複数のメッセージが処理されます。

Scala 標準の Actor と異なり、リンクによる相互監視の機能等はありませんが、Actor 作成時にエラーハンドラを渡せます。

val a2 = actor(
  { (message: String) =>
    throw new Exception(currentThread.getName + " : " + message)
  },
  { (e: Throwable) =>
    println("exception: " + e.getMessage)
  }
)
(1 to 3) foreach {n => a2 ! "run my actor"}

出力結果は下記の通りです。

exception: Thread-11 : run my actor
exception: Thread-11 : run my actor
exception: Thread-11 : run my actor

Effect と同様に関数と合成が可能です。

val a3 = a1 contramap { (n: Int) =>
  println(currentThread.getName + " : " + n.toString)
  "run my actor " + n.toString
}
(1 to 3) foreach {n => a3 ! n}

出力結果は下記の通りです。

Thread-5 : 1
Thread-6 : run my actor 1
Thread-5 : 2
Thread-7 : run my actor 2
Thread-5 : 3
Thread-8 : run my actor 3

Promise

Promise は Scala 標準の Future と同じく未来の計算結果を表現するために使用するコンテナです。scalaz.concurrent.Promise のソースコード解説も無しで、使い方の解説だけ行います。

早速、下記をご覧下さい。相変わらず解りやすさ優先で Naive を使用します。

import Strategy.Naive

def m = {currentThread.getName}
val p = new Promise[String]()
p.fulfill(m)
println("p = " + p.get)

空の Promise を作成し、fulfill で Promise に関数を与え、get で fulfill で与えた関数の結果を取得しています。
get は、fulfill に与えた関数の評価が終了するまで処理をブロックします。

出力結果は下記の通りです。

p = Thread-4094

fulfill を使用せず、下記にように Promise 作成時に関数を与える事もできます。

val p1 = promise(m)
println("p1 = " + p1.get)

val p2 = m.pure[Promise]
println("p2 = " + p2.get)

出力結果は下記の通りです。

p1 = Thread-4095
p2 = Thread-4096

ここまで get で Main Thread をブロックしてきましたが、下記のように Effect や Actor に結果を引き渡す事もできます。*2

val a = actor[String] { message =>
  println("message = " + message + " : " + currentThread.getName)
}
promise(m) to a

出力結果は下記の通りです。

message = Thread-4096 : Thread-4098

to は、Main Thread をブロックしません。to は、以下で説明する map/flatMap の内部で利用されています。

さて、ここからが本番です。今までの解説は、全て以下を解説するために存在していたと言っても過言ではありません。

Promise の結合

Promise は、map/flatMap/join/filter 等が使えます。

map
val p = promise(m) map {message => message + " : " + currentThread.getName}
println("p = " + p.get)
p = Thread-13 : Thread-15
flatMap
val p = promise(m) flatMap {message => promise(message + " : " + currentThread.getName)}
println("p = " + p.get)
p = Thread-19 : Thread-21

ちょっと横道に逸れますが、関数は Promise を返す関数に変換できます。

def m(message: String) = {message + " : " + currentThread.getName}
val f = {m(_)}.promise
println("result = " + f("foo").get)
result = foo : Thread-24

それらを flatMap で繋ぐ事ができます。

val p = f("foo") flatMap f
println("p = " + p.get)
p = foo : Thread-25 : Thread-27

何か値を入れるとコンテナに包んで返す関数と言えば Kleisli ですよね。

val g = f >=> f >=> f
println("kleisli = " + g("foo").get)
Kleisli = foo : Thread-30 : Thread-33 : Thread-36
join
val p = promise(f("foo")).join
println("p = " + p.get)
p = foo : Thread-40
filter
val filterFun = {(message: String) =>
  println(message + " : " + currentThread.getName)
  message match {
    case "bar" => true
    case _ => false
  }
}

val p1 = promise("bar") filter filterFun
println("p1 = " + p1.get)

val p2 = promise("baz") filter filterFun
p2.break
try {p2.get} catch {case e => println("p2 = " + e.toString)}
bar : Thread-47
p1 = bar
baz : Thread-51
p2 = scalaz.concurrent.Promise$BrokenException
for

map/flatMap/filter が定義されているので for が使えます。

val p = for (
  m1 <- f("foo");
  m2 <- f(m1);
  m3 <- f(m2)
) yield m3
println("p = " + p.get)
p = foo : Thread-54 : Thread-56 : Thread-59

それぞれ別 Thread 上で評価されていますが、シーケンシャルに処理されている所がポイントです。例えば、f の処理時間が 1000ms であるなら、上記の for 全体の評価には 3000ms 必要となります。

applicative
val p = (f("foo") |@| f("bar") |@| f("baz")) {_ + " | " + _ + " | " + _}
println("p = " + p.get)
p = foo : Thread-69 | bar : Thread-70 | baz : Thread-71

for と異なり評価の順序を保証しないため、並行的に処理されます。例えば、f の処理時間が 1000ms であっても、上記の評価は 1000ms となります。*3
Promise にとって Applicative Style は、単に見た目の問題だけではなく、処理の効率を良くするためにも利用できるので、シーケンシャルな for*4 との使い分けをしっかり行う必要があります。

traverse
val seq: Seq[Int] = (1 to 3).toSeq

val p1 = seq traverse { n =>
  def m = {currentThread.getName + " : " + n.toString}
  promise(m)
}
println("p1 = " + p1.get)
p1 = List(Thread-92 : 1, Thread-96 : 2, Thread-106 : 3)

p1 の型が Promise[Seq[String]] となる所がポイントです。

Seq[Promise[String]] を Promise[Seq[String]] に変換する事もできます。

val ps: Seq[Promise[String]] = seq map {n =>
  def m = {currentThread.getName + " : " + n.toString}
  promise(m)
}
val p2 = ps.sequence
println("p2 = " + p2.get)
p2 = List(Thread-154 : 1, Thread-155 : 2, Thread-156 : 3)

traverse の亜種(?)で、渡す関数の戻り値の型が Promise でなくとも良い parMap もあります。

val p3 = seq parMap { n =>
  currentThread.getName + " : " + n.toString
}
println("p3 = " + p3.get)
p3 = List(Thread-218 : 1, Thread-217 : 2, Thread-216 : 3)

他に parBind や parZipWith もあります。

val p4 = seq parBind { n =>
  Seq(currentThread.getName + " : " + n.toString)
}
println("p4 = " + p4.get)
p4 = List(Thread-277 : 1, Thread-276 : 2, Thread-275 : 3)
val p5 = seq.parZipWith(seq) { (x, y) =>
  currentThread.getName + " : " + x.toString + " - " + y.toString
}
println("p5 = " + p5.get)
p5 = List(Thread-337 : 1 - 1, Thread-338 : 1 - 2, Thread-339 : 1 - 3, Thread-340 : 2 - 1, Thread-341 : 2 - 2, Thread-342 : 2 - 3, Thread-343 : 3 - 1, Thread-344 : 3 - 2, Thread-345 : 3 - 3)

いやー、本当に便利ですね。

その他

例の如く、突っ込み大歓迎です。

*1:早くモンハン3Gをプレイしたかったのです。ごめんなさい。

*2:to は、Effect 及び Actor を未来に送る…とも表現できますよね

*3:勿論、Strategy や様々な条件で、1000ms よりも遅くなります

*4:これはこれで、コールバック関数のネスト地獄を回避するために必要