Scala で Iteratee と Enumerator を書く練習

とりあえず使い方だけメモ

package com.github.cooldaemon.scalaz_test

import scalaz._

object TryIteratee {
  import Scalaz._
  import IterV._

  def run {
    implicit val ListEnum = new Enumerator[Seq] {
      def apply[E, A](e: Seq[E], i: IterV[E, A]): IterV[E, A] = {
        e match {
          case Seq() => i
          case x::xs => i.fold(
            done = (_, _) => i,
            cont = k => apply(xs, k(El(x)))
          )
        }
      }
    }

    implicit val StreamEnum = new Enumerator[Stream] {
      def apply[E, A](e: Stream[E], i: IterV[E, A]): IterV[E, A] = {
        e match {
          case Stream() => i
          case x #:: xs => i.fold(
            done = (_, _) => i,
            cont = k => apply(xs, k(El(x)))
          )
        }
      }
    }

    val seq = Seq.range(1, 5)
    
    head(seq).run assert_=== Some(1)
    
    val m = for {
      x <- head[Int]
      y <- head[Int]
    } yield (x, y)

    m(seq).run assert_=== (Some(1), Some(2))

    def dropAndPrintAll[E] :IterV[E, Unit] = {
      def step(s: Input[E]) :IterV[E, Unit] = {
        s(
          el    = {e =>
            println(e)
            Cont(step)
          },
          empty = Cont(step),
          eof   = Done((), EOF[E])
        )
      }
      Cont(step)
    }
    println("--<自作 Iteratee>--")
    dropAndPrintAll(seq).run
    
    def peekAndPrintAll[E] :IterV[E, Unit] = {
      def step(initS: Input[E])(s: Input[E]): IterV[E, Unit] = {
        s(
          el    = {e =>
            println(e)
            Cont(step(initS))
          },
          empty = Cont(step(initS)),
          eof   = Done((), initS)
        )
      }
      Cont({s => step(s)(s)})
    }

    val m2 = for {
      _ <- peekAndPrintAll[Int]
      x <- head[Int]
    } yield x

    println("--<Iteratee の合成>--")
    m2(seq).run assert_=== Some(1)
    
    println("--<Enumerator を後から追加>--")
    val m3 = dropAndPrintAll(seq)
    val m4 = m3(Stream.range(5, 10)) // Seq に Stream を追加しても良い
    m4.run
  }
}

解説は後で。例とは言え Iteratee の中に副作用が存在する所がダサい。

疑問点その1

下記を末尾再帰にするには…

      def apply[E, A](e: Seq[E], i: IterV[E, A]): IterV[E, A] = {
        e match {
          case Seq() => i
          case x::xs => i.fold(
            done = (_, _) => i,
            cont = k => apply(xs, k(El(x)))
          )
        }
      }

下記のようにする必要がある。何でだろー?

      @annotation.tailrec def apply[E, A](e: Seq[E], i: IterV[E, A]): IterV[E, A] = {
        val next: Option[(Seq[E], IterV[E, A])] = e match {
          case Seq() => None
          case x::xs => i.fold(
            done = (_, _) => None,
            cont = k => some((xs, k(El(x))))
          )
        }
        next match {
          case None => i
          case Some((es, is)) => apply(es, is)
        }
      }

疑問点その2

Haskell みたいに Enumerator 同士を合成する方法が解らない

Scalaz.concurrent

この記事は、Scala Advent Calendar JP 2011 の14日目です。無理を言って、二日間も枠を頂いてしまいました。

今回は、Scalaz 6.0.3 の concurrent について解説を行います。個人的には Scalaz.concurrent の主役は Promise だと思っているので、早速、その解説から行いたいのですが、Promise を理解する為には、Promise と同一パッケージ内に存在する Strategy, Effect, Actor の理解が不可欠であるため、その解説から順番に行っていきます。とは言え、これらは非常にコード量が少ないので簡単に理解できると思います。

御託はいいから、さっさとコード読ませろ!という方は、こちらをご確認ください。これを読んで意味が理解できる方は、以下の解説を読む必要はありません。

以降、掲載する動作確認用のコードには、全て下記が import されている事とします。説明の都合上、Scalaz.concurrent のコードも併記していきますが、そちらとは無関係です。

import scalaz._
import scalaz.concurrent._
import Scalaz._
import java.lang.Thread.currentThread

Strategy

まずは、Strategy の定義から眺めていきます。下記をご覧ください。

trait Strategy {
  def apply[A](a: => A): () => A
}

abstract class StrategyLow {
  /**
   * A strategy that evaluates its argument in the current thread.
   */
  implicit val Sequential: Strategy = new Strategy {
    def apply[A](a: => A) = {
      val v = a
      () => v
    }
  }

//..snip..

object Strategy extends StrategyLow {

//..snip..

scalaz.concurrent.Strategy のソースコードから Strategy Trait と、その実装の中から一番単純なものを抜粋しました。

Strategy Trait だけ眺めると意味が解りませんが、その実装である Sequential を眺めると、apply は、受け取った関数を評価し、その結果を返す無名関数を返すものだと解ります。

続けて同じソースコード内の他の Strategy Trait の実装も見てみましょう。

  implicit val Naive: Strategy = new Strategy {
    import scala.concurrent.ops.future
    def apply[A](a: => A) = future {a}
  }

受け取った関数を future 内で評価…即ち、毎回、新規に Thread を起動し、その上で関数を評価しています。

試しに使ってみましょう。

import Strategy.Sequential
val seq = Sequential {currentThread.getName}
println("seq = " + seq())

import Strategy.Naive
val fut = Naive {currentThread.getName}
println("fut = " + fut())

上記を REPL 等に入力すると、下記のような結果が得られます。

seq = run-main
fut = Thread-4

scalaz.concurrent.Strategy 内には、Sequential や Naive の他に、幾つか有益な Strategy Trait の実装があるので、是非、ご確認ください。ちなみに、一番お世話になるのは下記でしょうか。

  import java.util.concurrent.ExecutorService

  implicit def Executor(implicit s: ExecutorService) = new Strategy {
    import java.util.concurrent.Callable
    def apply[A](a: => A) = {
      val fut = s.submit(new Callable[A] {
        def call = a
      })
      () => fut.get
    }
  }

使う場合は下記の通りです。

import Strategy.Executor
import java.util.concurrent.{TimeUnit, Executors}

implicit val executorService = Executors.newFixedThreadPool(2)
val fut = Executor.apply {currentThread.getName}
println("fut = " + fut())

executorService.shutdown
executorService.awaitTermination(60L, TimeUnit.SECONDS)

出力結果は下記の通りです。

fut = pool-4-thread-1

Executor を利用する際、上記のような苦労をせずとも scalaz.concurrent.Strategy には下記の通り DefaultStrategy が定義されているので、通常はそちらを使う事になると思います。

object Strategy extends StrategyLow {
  lazy val DefaultExecutorService: ExecutorService = {
    import Executors._
    newFixedThreadPool(Runtime.getRuntime.availableProcessors, new ThreadFactory {
      def newThread(r: Runnable) = {
        val t = defaultThreadFactory.newThread(r)
        t.setDaemon(true)
        t
      }
    })
  }

  implicit lazy val DefaultStrategy: Strategy = Executor(DefaultExecutorService)
}

作成した Thread を Daemonize している所が特徴と言えば特徴です。

Strategy Trait の実装は簡単に定義できるので、例えば、Scala 標準の Remote Actor を利用し、物理的に離れたノード上で関数を実行する Strategy を新たに定義する事もできます。ご興味がある方はチャレンジしてみてください。

Effect

scalaz.concurrent.Effect のソースコードは、あまりにも短いので解説を省きます。

使い方は以下の通りです。

import Strategy.Naive
val ef1 = effect[String] { message =>
  println(currentThread.getName + " : " + message)
}
(1 to 3) foreach {n => ef1 ! "run my effect"}

implicit な Strategy が無ければ DefaultStrategy が使用されるのですが、確認用の出力が解り易いので Naive を利用しています。
出力結果は下記の通りです。

Thread-6 : run my effect
Thread-7 : run my effect
Thread-8 : run my effect

メッセージ送信の度に違う Thread 上で関数が評価されている事が解ると思います。

最後に Effect と関数の合成の例を挙げます。

val ef2 = ef1 contramap { (n: Int) =>
  println(currentThread.getName + " : " + n.toString)
  "run my effect " + n.toString
}
(1 to 3) foreach {n => ef2 ! n}

Effect[String] と Int => String を合成して Effect[Int] を作成しています。
出力結果は下記の通りです。

Thread-10 : 1
Thread-11 : 2
Thread-12 : run my effect 1
Thread-13 : run my effect 2
Thread-14 : 3
Thread-15 : run my effect 3

始めに Int => String が評価され、次に ef1 が評価されている事が解ると思います。

Actor

ちょっと息切れ*1してきたので scalaz.concurrent.Actor のソースコード解説は無しで…いきなり使い方の解説を行います。とは言え、scalaz.concurrent.Actor のソースコードも短いので、是非、ご一読ください。

使い方は下記の通りです。

import Strategy.Naive
val a1 = actor { (message: String) =>
  println(currentThread.getName + " : " + message)
}
(1 to 3) foreach {n => a1 ! "run my actor"}

出力結果は下記の通りです。

Thread-4 : run my actor
Thread-4 : run my actor
Thread-4 : run my actor

メッセージを一旦メールボックスに貯めているので、Naive を使用していてもタイミングによっては、同一 Thread 上で複数のメッセージが処理されます。

Scala 標準の Actor と異なり、リンクによる相互監視の機能等はありませんが、Actor 作成時にエラーハンドラを渡せます。

val a2 = actor(
  { (message: String) =>
    throw new Exception(currentThread.getName + " : " + message)
  },
  { (e: Throwable) =>
    println("exception: " + e.getMessage)
  }
)
(1 to 3) foreach {n => a2 ! "run my actor"}

出力結果は下記の通りです。

exception: Thread-11 : run my actor
exception: Thread-11 : run my actor
exception: Thread-11 : run my actor

Effect と同様に関数と合成が可能です。

val a3 = a1 contramap { (n: Int) =>
  println(currentThread.getName + " : " + n.toString)
  "run my actor " + n.toString
}
(1 to 3) foreach {n => a3 ! n}

出力結果は下記の通りです。

Thread-5 : 1
Thread-6 : run my actor 1
Thread-5 : 2
Thread-7 : run my actor 2
Thread-5 : 3
Thread-8 : run my actor 3

Promise

Promise は Scala 標準の Future と同じく未来の計算結果を表現するために使用するコンテナです。scalaz.concurrent.Promise のソースコード解説も無しで、使い方の解説だけ行います。

早速、下記をご覧下さい。相変わらず解りやすさ優先で Naive を使用します。

import Strategy.Naive

def m = {currentThread.getName}
val p = new Promise[String]()
p.fulfill(m)
println("p = " + p.get)

空の Promise を作成し、fulfill で Promise に関数を与え、get で fulfill で与えた関数の結果を取得しています。
get は、fulfill に与えた関数の評価が終了するまで処理をブロックします。

出力結果は下記の通りです。

p = Thread-4094

fulfill を使用せず、下記にように Promise 作成時に関数を与える事もできます。

val p1 = promise(m)
println("p1 = " + p1.get)

val p2 = m.pure[Promise]
println("p2 = " + p2.get)

出力結果は下記の通りです。

p1 = Thread-4095
p2 = Thread-4096

ここまで get で Main Thread をブロックしてきましたが、下記のように Effect や Actor に結果を引き渡す事もできます。*2

val a = actor[String] { message =>
  println("message = " + message + " : " + currentThread.getName)
}
promise(m) to a

出力結果は下記の通りです。

message = Thread-4096 : Thread-4098

to は、Main Thread をブロックしません。to は、以下で説明する map/flatMap の内部で利用されています。

さて、ここからが本番です。今までの解説は、全て以下を解説するために存在していたと言っても過言ではありません。

Promise の結合

Promise は、map/flatMap/join/filter 等が使えます。

map
val p = promise(m) map {message => message + " : " + currentThread.getName}
println("p = " + p.get)
p = Thread-13 : Thread-15
flatMap
val p = promise(m) flatMap {message => promise(message + " : " + currentThread.getName)}
println("p = " + p.get)
p = Thread-19 : Thread-21

ちょっと横道に逸れますが、関数は Promise を返す関数に変換できます。

def m(message: String) = {message + " : " + currentThread.getName}
val f = {m(_)}.promise
println("result = " + f("foo").get)
result = foo : Thread-24

それらを flatMap で繋ぐ事ができます。

val p = f("foo") flatMap f
println("p = " + p.get)
p = foo : Thread-25 : Thread-27

何か値を入れるとコンテナに包んで返す関数と言えば Kleisli ですよね。

val g = f >=> f >=> f
println("kleisli = " + g("foo").get)
Kleisli = foo : Thread-30 : Thread-33 : Thread-36
join
val p = promise(f("foo")).join
println("p = " + p.get)
p = foo : Thread-40
filter
val filterFun = {(message: String) =>
  println(message + " : " + currentThread.getName)
  message match {
    case "bar" => true
    case _ => false
  }
}

val p1 = promise("bar") filter filterFun
println("p1 = " + p1.get)

val p2 = promise("baz") filter filterFun
p2.break
try {p2.get} catch {case e => println("p2 = " + e.toString)}
bar : Thread-47
p1 = bar
baz : Thread-51
p2 = scalaz.concurrent.Promise$BrokenException
for

map/flatMap/filter が定義されているので for が使えます。

val p = for (
  m1 <- f("foo");
  m2 <- f(m1);
  m3 <- f(m2)
) yield m3
println("p = " + p.get)
p = foo : Thread-54 : Thread-56 : Thread-59

それぞれ別 Thread 上で評価されていますが、シーケンシャルに処理されている所がポイントです。例えば、f の処理時間が 1000ms であるなら、上記の for 全体の評価には 3000ms 必要となります。

applicative
val p = (f("foo") |@| f("bar") |@| f("baz")) {_ + " | " + _ + " | " + _}
println("p = " + p.get)
p = foo : Thread-69 | bar : Thread-70 | baz : Thread-71

for と異なり評価の順序を保証しないため、並行的に処理されます。例えば、f の処理時間が 1000ms であっても、上記の評価は 1000ms となります。*3
Promise にとって Applicative Style は、単に見た目の問題だけではなく、処理の効率を良くするためにも利用できるので、シーケンシャルな for*4 との使い分けをしっかり行う必要があります。

traverse
val seq: Seq[Int] = (1 to 3).toSeq

val p1 = seq traverse { n =>
  def m = {currentThread.getName + " : " + n.toString}
  promise(m)
}
println("p1 = " + p1.get)
p1 = List(Thread-92 : 1, Thread-96 : 2, Thread-106 : 3)

p1 の型が Promise[Seq[String]] となる所がポイントです。

Seq[Promise[String]] を Promise[Seq[String]] に変換する事もできます。

val ps: Seq[Promise[String]] = seq map {n =>
  def m = {currentThread.getName + " : " + n.toString}
  promise(m)
}
val p2 = ps.sequence
println("p2 = " + p2.get)
p2 = List(Thread-154 : 1, Thread-155 : 2, Thread-156 : 3)

traverse の亜種(?)で、渡す関数の戻り値の型が Promise でなくとも良い parMap もあります。

val p3 = seq parMap { n =>
  currentThread.getName + " : " + n.toString
}
println("p3 = " + p3.get)
p3 = List(Thread-218 : 1, Thread-217 : 2, Thread-216 : 3)

他に parBind や parZipWith もあります。

val p4 = seq parBind { n =>
  Seq(currentThread.getName + " : " + n.toString)
}
println("p4 = " + p4.get)
p4 = List(Thread-277 : 1, Thread-276 : 2, Thread-275 : 3)
val p5 = seq.parZipWith(seq) { (x, y) =>
  currentThread.getName + " : " + x.toString + " - " + y.toString
}
println("p5 = " + p5.get)
p5 = List(Thread-337 : 1 - 1, Thread-338 : 1 - 2, Thread-339 : 1 - 3, Thread-340 : 2 - 1, Thread-341 : 2 - 2, Thread-342 : 2 - 3, Thread-343 : 3 - 1, Thread-344 : 3 - 2, Thread-345 : 3 - 3)

いやー、本当に便利ですね。

その他

例の如く、突っ込み大歓迎です。

*1:早くモンハン3Gをプレイしたかったのです。ごめんなさい。

*2:to は、Effect 及び Actor を未来に送る…とも表現できますよね

*3:勿論、Strategy や様々な条件で、1000ms よりも遅くなります

*4:これはこれで、コールバック関数のネスト地獄を回避するために必要

Scala STM CommitBarrier

この記事は、Scala Advent Calendar JP 2011 の13日目です。

今回は、Scala STM 0.4 から導入された CommitBarrier の解説を行います。

CommitBarrier の基本

まずは、下記のコードをご覧ください。

import org.scalatest.fixture.FixtureFunSuite

import scala.concurrent.stm._
import scala.concurrent.ops._
import java.util.concurrent.CountDownLatch

class CommitBarrierSuite extends FixtureFunSuite {
  case class F(cb: CommitBarrier, ms: Seq[CommitBarrier.Member], rs: Seq[Ref[String]], gate: CountDownLatch)
  type FixtureParam = F

  def withFixture(test: OneArgTest): Unit = {
    val cb = CommitBarrier(300)
    test(F(
      cb,
      Seq.tabulate(2) {n => cb.addMember()},
      Seq.tabulate(3) {n => Ref("start")},
      new CountDownLatch(1)
    ))
  }

  test("競合なし") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f1"
        Thread.sleep(100)
        'finish
      }
      assert(result1 === Right('finish))
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val startTime = System.currentTimeMillis
      val result2 = f.ms(1).atomic { implicit txn =>
        f.rs(1)() = f.rs(1)() + "-f2"
        'finish
      } // fut1 の f.ms(0) の atomic が終わるまで待つ
      val elapseTime = System.currentTimeMillis - startTime;
      assert(result2 === Right('finish))
      assert(100 < elapseTime)
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }
    
    assert(f.rs(0).single() === "start-f1")
    assert(f.rs(1).single() === "start-f2")

    val thrown = intercept[IllegalStateException] {
      f.cb.addMember()
    }
    assert(thrown.getMessage === "commit barrier has already committed")
  }
}

scalatest の詳細な説明は省きます。今後、上記の例にテストケースを追加していく形で CommitBarrier の説明を行っていきますが、withFixture 内で作られた case class F のインスタンスが各テストケースの引数として渡されるとだけ覚えておいてください。

という事で、始めのテストケースですが、これは、下記の事を確認しています。

  • 同一 CommitBarrier に属する Member の atomic 同士は、お互いの終了*1を待ち合わせる
  • CommitBarrier を用いた atomic の値は Either[CancelCause, "atomic に渡した関数の戻り値の型"] 型となる
  • 一度終了した CommitBarrier には、新たに Member を追加できない

f.ms(0).atomic には、終了するまで 100ms 必要とする関数を、f.ms(1).atomic には、すぐに終了する関数を渡していますが、f.ms(1) は、同一 CommitBarrier の他の全ての Member …即ち f.ms(0) の終了を待つので、引きずられて終了までに 100ms 以上の時間が掛かっています。

ちなみに、Member が一つでもブロックしてしまうと CommitBarrier 全体がブロックしてしまうので、下記のように CommitBarrier のインスタンスを得る際にタイムアウトの時間を指定できます。

    val cb = CommitBarrier(300)

上記のテストケースは、それぞれの atomic ブロックが異なる Ref を更新しているので、次は同じ Ref を更新してみます。下記のテストケースを追加してください。

  test("競合あり") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f1"
      }
      assert(result1.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val result2 = f.ms(1).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f2"
      }
      assert(result2.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.rs(0).single() === "start")
    
    val thrown = intercept[IllegalStateException] {
      f.cb.addMember()
    }
    assert(thrown.getMessage === "commit barrier has already rolled back")
  }

このテストケースは、同一 CommitBarrier に属する Member 間で競合が発生した際、Ref の更新は行われず、atomic の値は Left となる事を確認しています。

では、もう少し複雑にしてみましょう。

  test("競合あり2") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        Thread.sleep(100) // ここがポイント!!
        f.rs(0)() = f.rs(0)() + "-f1"
        'finish
      }
      assert(result1 === Right('finish))
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val result2 = f.ms(1).atomic { implicit txn =>
        f.rs(1)() = f.rs(0)() + "-f2"
        'finish
      }
      assert(result2 === Right('finish))
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.rs(0).single() === "start-f1")
    assert(f.rs(1).single() === "start-f2")
  }

このテストケースでは、f.rs(0) を元に、f.rs(0) 自身と f.rs(1) の更新を行っていますが、先に f.rs(1) を更新して f.ms(1) を Commited 状態にしてしまい、後から f.rs(0) を更新して f.ms(0) を Commited 状態にしています。

更に続けて、次のテストケースも追加してみましょう。

  test("競合あり3") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f1"
        'finish
      }
      assert(result1.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val result2 = f.ms(1).atomic { implicit txn =>
        Thread.sleep(100) // ここがポイント!!
        f.rs(1)() = f.rs(0)() + "-f2"
        'finish
      }
      assert(result2.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.rs(0).single() === "start")
    assert(f.rs(1).single() === "start")
  }

このテストケースは、一つ前のテストケースと Thread.sleep の位置が異なり、先に f.rs(0) を更新してから、f.rs(0) を元に f.rs(1) の更新を行います。このようにすると、f.ms(1).atomic 開始時と Thread.sleep 後の f.rs(0) の値が異なるため、f.ms(1).atomic は失敗し、結果、f.ms(0).atomic も失敗となります。

ちなみに、概ね f.ms(0).atomic と f.ms(0).atomic の値は Left(CommitBarrier.MemberCycle) となりますが、何度かテストを繰り返していると Left(Timeout) になる事があります。これは、f.ms(1).atomic に渡された関数の再評価回数が異なり、場合によっては、始めに設定したタイムアウト 300ms を超えてしまうためなのですが、何故、再評価の回数が異なるのか?そもそも、何故、再評価されてしまうのか?*2という所は調査中です。

キャンセル

CommitBarrier の基本が明かになった所で、キャンセルの説明を行います。説明の簡易化のため、テストスイートを新たに作成します。

import org.scalatest.fixture.FixtureFunSuite

import scala.concurrent.stm._
import scala.concurrent.ops._

class CancelSuite extends FixtureFunSuite {
  case class F(cb: CommitBarrier, ref: Ref[String])
  type FixtureParam = F

  def withFixture(test: OneArgTest): Unit = test(F(
    CommitBarrier(60000),
    Ref("start")
  ))

  test("キャンセル") { f =>
    val startingGate = new java.util.concurrent.CountDownLatch(1)
    val ms = Seq.tabulate(2) {n => f.cb.addMember()}

    val fut1 = future {
      startingGate.await()
      val result1 = ms(0).atomic { implicit txn =>
        f.ref() = f.ref() + "-f1"
        Thread.sleep(100)
        ms(0).cancel(CommitBarrier.UserCancel("cancel"))
        'finish
      }
      assert(result1 === Left(CommitBarrier.UserCancel("cancel")))
      'finish
    }

    val r2 = Ref("start")
    val fut2 = future {
      startingGate.await()
      val result2 = ms(1).atomic { implicit txn =>
        r2() = r2() + "-f2"
        'finish
      } // ms(0) の待ち合わせ
      assert(result2 === Right('finish))
      'finish
    }
 
    startingGate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.ref.single() === "start")
    assert(r2.single() === "start-f2")
  }
}

簡略化と言いつつ、Fixture が減った影響で見難くなってますが…後のテストケースの為に我慢してください。このテストケースは、下記の事を確認しています。

  • CommitBarrier の Member は、atomic 内で処理をキャンセルできる
  • キャンセルされた Member は、他の Member の commit に影響を与えない

競合や例外が発生した場合、前述した通り、同じ CommitBarrier に属する他の Member に影響を及ぼしますが、キャンセルの場合は、影響を及ぼしません。

次に、下記のテストケースを追加してください。

  test("CommitBarrier の再利用") { f =>
    val m = f.cb.addMember()

    val result1 = m.atomic { implicit txn =>
      f.ref() = "atomic1"
      m.cancel(CommitBarrier.UserCancel("cancel"))
      'finish
    }
 
    assert(result1 === Left(CommitBarrier.UserCancel("cancel")))
    assert(f.ref.single() === "start")

    val m2 = f.cb.addMember()
    val result2 = m2.atomic { implicit txn =>
      f.ref() = "atomic2"
      'finish
    }

    assert(result2 === Right('finish))
    assert(f.ref.single() === "atomic2")
    
    val thrown = intercept[IllegalStateException] {
      val m3 = f.cb.addMember()
    }
    assert(thrown.getMessage === "commit barrier has already committed")
  }
 

このテストケースでは、CommitBarrier に属している Member が全てキャンセルされた場合、その CommitBarrier は、まだ処理が終わっていないと見なされるため addMember が可能である事を確認しています。

では、atomic ブロックがネストしている場合は、どうなるでしょうか?下記のテストケースを追加してください。

  test("atomic のネスト(m.atomic が内側)") { f =>
    val m = f.cb.addMember()
    
    atomic { implicit txn =>
      f.ref() = "outer"
      m.atomic { implicit txn =>
        f.ref() = f.ref() + "-inner"
        m.cancel(CommitBarrier.UserCancel("cancel"))
      }
    }

    assert(f.ref.single() === "outer")
  }

このテストケースは、通常の atomic の内で CommitBarrier の Member をキャンセルした場合、通常の外側の atomic で操作した Ref のみ commit される事を確認しています。

次に、ネストしている atomic を反対にしてみましょう。下記のテストケースを追加してください。

  test("atomic のネスト(m.atomic が外側)") { f =>
    val m = f.cb.addMember()
    
    m.atomic { implicit txn =>
      f.ref() = "outer"
      atomic { implicit txn =>
        f.ref() = f.ref() + "-inner"
        m.cancel(CommitBarrier.UserCancel("cancel"))
      }
    }

    assert(f.ref.single() === "start")
  }

このテストケースでは、内側にある通常の atomic の中から外側の Member をキャンセルした場合、Ref の操作が全て無効となる事を確認しています。

その他

  • CommitBarrier に対する操作…例えば addMember 等は、CommitBarrier 毎に synchronized されているので注意してください
  • Thread と Member は、一対一で利用される事を想定してるようです
  • 上記で試した以外に、例外が発生した場合にどうなるのか?等は、こちらでご確認ください

例の如く、突っ込み大歓迎です。

*1:正常終了とは限らない。Cancel や例外でも atomic は終了する。

*2:f.rs(0) を Read した時点で、commit できない事は解りきっているので、即失敗で良いんじゃないかと思っている

scalaz.Bifunctor と Either

前回メモし忘れていた Either の小ネタを紹介。
例の如く scalaz の import は必須。

import scalaz._
import Scalaz._

Either を取得した後、普通は、処理の分岐を下記のように記述する。

1.right[String] match {
  case Right(n :Int) => n + 1
  case Left(s: String) => "[" + s + "]"
}

scalaz には Bifunctor があるので、下記のようにも書ける。

val fr: PartialFunction[Int, Int] = {case n => n + 1}
val fl: PartialFunction[String, String] = {case s => "[" + s + "]"}
fl <-: 1.right[String] :-> fr

別に PartialFunction である必要はなく Function でも良い。
下記に、幾つか例を示す。

(fl <-: 1.right[String] :-> fr) assert_=== 2.right[String]
(fl <-: 1.right[String] :-> fr :-> fr) assert_=== 3.right[String]
(fl <-: "foo".left[Int] :-> fr) assert_=== "[foo]".left[Int]
(fl <-: fl <-: "foo".left[Int] :-> fr) assert_=== "[[foo]]".left[Int]

scalaz.Bifunctor は、Either だけではなく Tuple2 や Validation でも使える。

(fl <-: 1.success[String] :-> fr) assert_=== 2.success[String]
(fl <-: ("foo", 1) :-> fr) assert_=== ("[foo]", 2)

Tuple2 の時は両方評価される。ちょっと Allow の *** に似てるかも。

Either と scalaz.Validation

下記で取り上げられているネタについてメモを残す。

Either と Scalaz

Either は flatMap メソッドを持たないので for 式では使えない。for 式内で Right で処理を進めたいなら right メソッドで、Left で処理を進めたいなら left メソッドで *Projection を取得する必要がある。

def r(n: Int): Either[String, Int] = Right(n)
for {
  x <- r(1).right;
  y <- r(x).right
} yield x+y

とはいえ、多くの場合 Right で処理を進めたい場合が多く、Left で処理を進めたい事は稀である。心情的には、下記のように書きたい。

def r(n: Int): Either[String, Int] = Right(n)
for {x <- r(1); y <- r(x)} yield x+y

そこで scalaz の出番となる。下記は、問題なく動作する。

import scalaz._
import Scalaz._
def r(n: Int): Either[String, Int] = Right(n)
for {
  x <- r(1);
  y <- r(x)
} yield x+y

以降のサンプルコードは、下記のように scalaz が import 済みである事を必須とする。

import scalaz._
import Scalaz._

scalaz を使っているので、下記のようにも書ける。

for {
  x <- 1.right[String];
  y <- x.right[String]
} yield x+y

これを、flatMap と map を使った形式に書き換えると下記の通り。

1.right[String] flatMap {x => x.right[String] map {y => x + y}}

scalaz では下記のエイリアスも使える。

1.right[String] >>= {x => x.right[String] &#8728; {y => x + y}}

&#8728; は ∘ です

scalaz.Validation

実は、for 式の中で使う分には、Either と変わらない。

(for {x <- 1.success[String]; y <- x.success[String]} yield x+y) assert_=== 2.success[String]
(for {x <- "foo".fail[Int];   y <- x.success[String]} yield x+y) assert_=== "foo".fail[Int]

scalaz.Validation の flatMap と map のソースコードは、現在の所、下記の通りとなっているので、上記は当たり前。

//..snip..
  def map[B](f: A => B): Validation[E, B] = this match {
    case Success(a) => Success(f(a))
    case Failure(e) => Failure(e)
  }
//..snip..
  def flatMap[EE >: E, B](f: A => Validation[EE, B]): Validation[EE, B] = this match {
    case Success(a) => f(a)
    case Failure(e) => Failure(e)
  }
//..snip..

では、Either と Validation の違いは何なのか?下記のようなコードを記述した際に違いが現れる。

(1.success[String] <*> (1.success[String] map ((_: Int) + (_: Int)).curried)) assert_=== 2.success[String]
("foo".fail[Int] <*> (1.success[String] <*> ("bar".fail[Int] map ((_: Int) + (_: Int) + (_: Int)).curried))) assert_=== "barfoo".fail[Int]

全てが Success の際には map で繋いだ関数が評価され、Failure の場合は内包されている値が積み重なっている。bar が前にあるが、評価順は左から右。
Validation は、Monad であるならば Applicative でもあるので Validation[A => B] を引数に取る <*> メソッドを持つ。
実際に Failure を積み重ねている箇所は Apply.scala 内の下記。

  implicit def ValidationApply[X: Semigroup]: Apply[({type λ[α]=Validation[X, α]})#λ] = new Apply[({type λ[α]=Validation[X, α]})#λ] {
    def apply[A, B](f: Validation[X, A => B], a: Validation[X, A]) = (f, a) match {
      case (Success(f), Success(a)) => success(f(a))
      case (Success(_), Failure(e)) => failure(e)
      case (Failure(e), Success(_)) => failure(e)
      case (Failure(e1), Failure(e2)) => failure(e1 &#8889; e2)
    }
  }

&#8889; は ⊹ です
Failure側(左側)の X を半群(Semigroup)で束縛した新しい型コンストラクタを型引数として与えた Apply を new している。半群なので、当然、二項演算 ⊹(|+| でも良い) を持っている。
ちなみに「({type λ[α]=Foo[X, α]})#λ」は、X を束縛した新しい無名型コンストラクタを作る定型文として覚えると scalaz のコードを読みやすくなる。

幾つか、書き換えの例を示す。

("foo".fail[Int].<***>(1.success[String], "bar".fail[Int]) {_ + _ + _}) assert_=== "foobar".fail[Int]
(("foo".fail[Int] |@| 1.success[String] |@| "bar".fail[Int]) {_ + _ + _}) assert_=== "foobar".fail[Int]
(("foo".fail[Int] &#8859; 1.success[String] &#8859; "bar".fail[Int]) {_ + _ + _}) assert_=== "foobar".fail[Int]

&#8859; は ⊛ です
上記の場合、評価順は <*> 同様に左から右。結果は <*> と逆順。こちらの方が見た目も結果も解りやすい。何故、こような結果になるかは… MA.scala の定義を参照の事(手抜き)。
このようなコードの書き方(Applicative スタイルと言う)は、下記の注意点がある。

  • 連結した式全てが評価される
  • 式の評価結果を次の式へ渡せない

これらの事から Validation は、正に Validation 処理に適していると言える(w;

蛇足だが、Either の Apply は、下記の通り。

//..snip..
trait Applys {
  def FunctorBindApply[Z[_]](implicit t: Functor[Z], b: Bind[Z]) = new Apply[Z] {
    def apply[A, B](f: Z[A => B], a: Z[A]): Z[B] = {
      lazy val fv = f
      lazy val fa = a
      b.bind(fv, (g: A => B) => t.fmap(fa, g(_: A)))
    }
  }
}
//..snip..
object Apply extends ApplyLow {
//..snip..
  implicit def EitherApply[X]: Apply[({type λ[α]=Either[X, α]})#λ] = FunctorBindApply[({type λ[α]=Either[X, α]})#λ]
//..snip..
}

Either の fmap は下記の通り。

implicit def EitherFunctor[X]: Functor[({type λ[α]=Either[X, α]})#λ] = new Functor[({type λ[α]=Either[X, α]})#λ] {
  def fmap[A, B](r: Either[X, A], f: A => B) = r match {
    case Left(a) => Left(a)
    case Right(a) => Right(f(a))
  }
}

Either の bind は、下記の通り。

implicit def EitherBind[X]: Bind[({type λ[α]=Either[X, α]})#λ] = new Bind[({type λ[α]=Either[X, α]})#λ] {
  def bind[A, B](r: Either[X, A], f: A => Either[X, B]) = r.fold(Left(_), f)
}

って事で(何が?)下記の通りとなる(手抜き)。

("foo".left[Int] <*> (1.right[String] <*> ("bar".left[Int] map ((_: Int) + (_: Int) + (_: Int)).curried))) assert_=== "bar".left[Int]
(("foo".left[Int] |@| 1.right[String] |@| "bar".left[Int]) {_ + _ + _}) assert_=== "foo".left[Int]

for 式で使った場合と異なり、左から右へ全ての式が評価された後に、結果がまとめられる。

記号について

scalaz ではメソッド名に記号が用いられているが、はてダ に投稿する際に &#8728; のように変換されてしまう事がある。これ、どうにかならないかな?
どうやらコードを貼付けた箇所のみ、そうなる様子

scalaz.StateT で scalaz.State と Option を組み合わせる

scalaz の ExampleState に StateT のサンプルが無かったので試してみた。
気が向いたら、そもそも State モナドって何よ?という所から説明書くかも。

package com.github.cooldaemon.try_scalaz

import scalaz._

object TryState {
  import Scalaz._

  def run {
    def initO = stateT[Option, Int, Int]((s: Int) =>
      if (s % 2 == 0) Some((s, s)) else None
    )

    initO ! 1 assert_=== None
    initO ! 2 assert_=== Some(2)

    def putO(s: Int) = stateT[Option, Int, Unit](_ =>
      if (s % 2 == 0) Some((s, ())) else None
    )

    putO(1) ~> 0 assert_=== None
    putO(2) ~> 0 assert_=== Some(2)

    val res1 = for {
      x <- initO;
      _ <- putO(4)
    } yield x

    res1 ! 1  assert_=== None
    res1 ! 2  assert_=== Some(2)
    res1 ~> 2 assert_=== Some(4)

    def modifyO(f: Int => Int) =
      initO flatMap (s => stateT[Option, Int, Unit](_ =>
        if (s % 2 == 0) Some((f(s), ())) else None
      ))

    modifyO(s => s + 1) ~> 1 assert_=== None
    modifyO(s => s + 1) ~> 2 assert_=== Some(3)

    def getsO = stateT[Option, Int, Int]((s: Int) =>
      Some((s, s))
    )

    val res2 = for {
      _ <- modifyO(s => s + 2);
      _ <- modifyO(s => s + 1);
      x <- getsO
    } yield x

    res2 ! 1 assert_=== None
    res2 ! 2 assert_=== Some(5)

    val res3 = for {
      _ <- res1;
      x <- res2
    } yield x

    res3 ! 1 assert_=== None
    res3 ! 2 assert_=== Some(7)
  }
}

続:gen_server のコールバックモジュール内で badarith が発生すると supervisor ごと落とされる

二年半前に gen_server のコールバックモジュール内で badarith が発生すると supervisor ごと落とされる というメモを残したにも関わらず、すっかり内容を忘れてしまい、変な Process Design の poolboy*1 の poolboy_sup が安全か検証をするのに時間が掛かったのでメモを残す。

もともと、Erlang ML で下記のようなやり取りがあった。
erlang-questions: Supervisor does not restart Gen Server
ざっくり要約すると Erlang Shell が停止してしまい、道連れで Supervisor が停止している。回避策は try catch …という内容。
これをすっかり忘れており「badarith 例外発生 = 全 Supervisor 停止」と勘違いして覚えていたのが、そもそもの勘違いの発端。
実際は、badarith が発生すると、"何故か" Erlang Shell が停止してしまい、Erlang Shell と link 関係にある Supervisor が "何故か" 停止している…が正しい。

下記に試した内容を列挙する。

badarith を発生させるプロセスの起動方法 結果
Erlang Shell -> Supervisor -> Worker(gen_server) Supervisor が Worker を再起動した後、erlang:apply/2 が停止し、link 先の Supervisor が道連れに停止
Erlang Shell -> Application -> Supervisor -> Worker(gen_server) Worker 再起動
Erlang Shell (spawn) -> システムプロセス(trap_exit = true) -> worker(gen_server) システムプロセスが Worker 停止のメッセージ受信
Erlang Shell (spawn_link) -> システムプロセス(trap_exit = true) -> worker(gen_server) システムプロセスが Worker 停止と erlang:apply/2 停止のメッセージ受信

Supervisor には、start は無く start_link しかない。Erlang Shell 上から start_link を実行すると、起動した Supervisor は Erlang Shell と link 関係になってしまう。この辺りが、関係しているようにも思えるが…下記の疑問が残る。

  • worker(gen_server) と直接 link 関係には無い Erlang Shell が、何故、worker(gen_server) の badarith を補足しているのか?
  • Supervisor ではないシステムプロセスは、Erlang Shell の EXIT メッセージを補足するのに、Supervisor の EXIT を補足する handle_info/2 は、何故、評価されていないのか?(print debug で確認済み)

疑問は残るが、とりあえず Erlang Shell が停止しなければ、Worker で badarith が発生しても平気であるため、下記の回避策が考えられる(おすすめ順)。

  • Application 化する(一番、おすすめ)
  • ML 推奨の try catch(throw や exit を補足するならともかく、error を補足するのは、ちょっと…)
  • Erlang Shell 上で catch_exception(true). を評価しておく(これの副作用は、現在、調査中)

既に Erlang 歴 3 年目なのに、こんな所で躓いているから、いつまでも Erlang 界隈の底辺なんだろうなぁ…

*1:後日、poolboy の紹介を書く予定