Scala STM を試してみる

ScalaSTM を試してみました。正直、ライブラリ内でどのように STM を実現しているか理解していませんが、とりあえずプリントデバックしてみました。

準備

面倒なので sbt, maven 共に使いませんでした。

% curl -O http://scala-tools.org/repo-releases/org/scala-tools/scala-stm_2.8.1/0.2/scala-stm_2.8.1-0.2.jar
% fsc-2.8 -classpath ./scala-stm_2.8.1-0.2.jar SampleForSTM.scala
% scala-2.8 -classpath .:./scala-stm_2.8.1-0.2.jar SampleForSTM

上記は、OSX + MacPorts の例であるため、適宜、ご自身の環境に読み替えてください。

サンプルコード

object SampleForSTM {
  import scala.concurrent.stm._
  import scala.actors.Actor
  import scala.actors.Actor.{actor, react, reply}

  def main (args: Array[String]) {
    doTest(
      "case1" -> case1,
      "case2" -> case2,
      "case3" -> case3,
      "case4" -> case4
    )
  }

  def doTest(testCases: (String, Ref[Boolean] => Unit)*) {
    testCases foreach { testCase =>
      println("--<%s start>--" format testCase._1)
      val refLock = Ref(false)
      testCase._2(refLock)
      println("--<%s end>--" format testCase._1)
    }
  }

  def ensureLockActor(refLock: Ref[Boolean]): Actor = actor {
    Thread.sleep(100)

    atomic { implicit txn =>
      println("actor2: Try take the lock.")
      val lock = refLock()
      if (lock) retry
      refLock() = true
    }

    println("actor2: Take the lock.")
    Thread.sleep(500)

    atomic { implicit txn =>
       refLock() = false
    }

    println("actor2: Release the lock.")
    react {case finish => reply(finish)}
  }

  def waitFinish(actors: Actor*) {
    actors foreach { actor =>
      actor !? 'finish
    }
  }

  def case1(refLock: Ref[Boolean]) {
    val actor1 = actor {
      atomic { implicit txn =>
        println("actor1: Try take the lock.") // side effects
        val lock = refLock()
        if (lock) {
          println("actor1: Retry take the lock.")
          retry
        }
        Thread.sleep(500)
        refLock() = true
        println("actor1: Maybe take the lock.")
      }

      println("actor1: Take the lock.")

      atomic { implicit txn =>
        refLock() = false
      }

      println("actor1: Release the lock.")
      react {case finish => reply(finish)}
    }

    val actor2 = ensureLockActor(refLock)
    waitFinish(actor1, actor2)
  }

  def case2(refLock: Ref[Boolean]) {
    val actor1 = actor {
      atomic { implicit txn =>
        println("actor1: Try take the lock.")
        val lock = refLock()
        if (lock) {
          println("actor1: Retry take the lock.")
          retry
        }
        refLock() = true
        println("actor1: Maybe take the lock.")
        Thread.sleep(500)
      }

      println("actor1: Take the lock.")

      atomic { implicit txn =>
        refLock() = false
      }

      println("actor1: Release the lock.")
      react {case finish => reply(finish)}
    }

    val actor2 = ensureLockActor(refLock)
    waitFinish(actor1, actor2)
  }

  def case3(refLock1: Ref[Boolean]) {
    val refLock2 = Ref(false)

    val actor1 = actor {
      atomic { implicit txn =>
        println("actor1: Try take the lock1.")
        val lock1 = refLock1()
        if (lock1) retry
        refLock1() = true

        atomic { implicit txn =>
          println("actor1: Try take the lock2.")
          val lock2 = refLock2()
          if (lock2) retry
          Thread.sleep(500)
          refLock2() = true
        }
      }

      println("actor1: Take the lock1 and lock2.")

      atomic { implicit txn =>
        refLock1() = false
        refLock2() = false
      }

      println("actor1: Release the lock1 and lock2.")
      react {case finish => reply(finish)}
    }

    val actor2 = actor {
      Thread.sleep(100)

      atomic { implicit txn =>
        println("actor2: Try take the lock1 and lock2.")
        val lock1 = refLock1()
        val lock2 = refLock2()
        if (lock1 || lock2) retry
        refLock1() = true
        refLock2() = true
      }

      println("actor2: Take the lock1 and lock2.")
      Thread.sleep(500)

      atomic { implicit txn =>
        refLock1() = false
        refLock2() = false
      }

      println("actor2: Release the lock1 and lock2.")
      react {case finish => reply(finish)}
    }

    waitFinish(actor1, actor2)
  }

  def case4(refLock: Ref[Boolean]) {
    val actor1 = actor {
      val Some(whichBlock) = atomic { implicit txn =>
        println("actor1: Try take the lock.")
        val lock = refLock()
        if (lock) {       // (3)
          println("actor1: Retry take the lock.")
          retry
        }
        Thread.sleep(500) // (1)
        println("actor1: Before update ref to true.")
        refLock() = true  // (2)
        println("actor1: After update ref to true.")
        Some("atomic1")
      } orAtomic { implicit txn =>
        println("actor1: Could not take the lock.")
        Some("atomic2")
      }

      println("actor1: Eval block is " + whichBlock)
      println("actor1: Lock is " + refLock.single())

      atomic { implicit txn =>
        if (whichBlock == "atomic1" && refLock()) {
          println("actor1: Release the lock.")
          refLock() = false
        }
      }

      react {case finish => reply(finish)}
    }

    val actor2 = ensureLockActor(refLock)
    waitFinish(actor1, actor2)
  }
}

実行結果

--<case1 start>--
actor1: Try take the lock.
actor2: Try take the lock.
actor2: Take the lock.
actor1: Try take the lock.
actor1: Retry take the lock.
actor2: Release the lock.
actor1: Try take the lock.
actor1: Maybe take the lock.
actor1: Take the lock.
actor1: Release the lock.
--<case1 end>--
--<case2 start>--
actor1: Try take the lock.
actor1: Maybe take the lock.
actor2: Try take the lock.
actor2: Take the lock.
actor1: Try take the lock.
actor1: Retry take the lock.
actor2: Release the lock.
actor1: Try take the lock.
actor1: Maybe take the lock.
actor1: Take the lock.
actor1: Release the lock.
--<case2 end>--
--<case3 start>--
actor1: Try take the lock1.
actor1: Try take the lock2.
actor2: Try take the lock1 and lock2.
actor2: Take the lock1 and lock2.
actor1: Try take the lock1.
actor2: Release the lock1 and lock2.
actor1: Try take the lock1.
actor1: Try take the lock2.
actor1: Take the lock1 and lock2.
actor1: Release the lock1 and lock2.
--<case3 end>--
--<case4 start>--
actor1: Try take the lock.
actor2: Try take the lock.
actor2: Take the lock.
actor1: Before update ref to true.
actor1: Try take the lock.
actor1: Retry take the lock.
actor1: Could not take the lock.
actor1: Eval block is atomic2
actor1: Lock is true
actor2: Release the lock.
--<case4 end>--

説明

ご覧の通り atomic に渡すブロックは、何度も評価されます。特別な理由が無い限り、atomic には副作用が存在しないブロックを渡すのが良いでしょう。
ScalaSTM ― Quick Start の「Be careful about rollback」の例のように StringBuilder などを使いたい場合、atomic に渡すブロックの中で new しておくと、再評価の都度 new されるので安全です。
各テストケースの説明は、また後日、改めて追記します。
以下、各テストケースの説明となります。

case1
    val actor1 = actor {
      atomic { implicit txn =>
        println("actor1: Try take the lock.") // side effects
        val lock = refLock()
        if (lock) {       // (3)
          println("actor1: Retry take the lock.")
          retry
        }
        Thread.sleep(500) // (1)
        refLock() = true  // (2)
        println("actor1: Maybe take the lock.")
      }

      println("actor1: Take the lock.")

      atomic { implicit txn =>
        refLock() = false
      }

      println("actor1: Release the lock.")
      react {case finish => reply(finish)}
    }
--<case1 start>--
actor1: Try take the lock.
actor2: Try take the lock.
actor2: Take the lock.
actor1: Try take the lock.
actor1: Retry take the lock.
actor2: Release the lock.
actor1: Try take the lock.
actor1: Maybe take the lock.
actor1: Take the lock.
actor1: Release the lock.
--<case1 end>--

処理順は下記の通り。

  1. actor1 が (1) でスリープしている間に actor2 がロックを獲得する
  2. actor1 が (2) で代入に失敗し、atomic に渡したブロックが再評価される
  3. 既に actor2 にロックが獲得されているので (3) の条件が真となり retry が評価さえる
  4. actor2 がロックを解放する
  5. actor1 がロックを獲得する

ここで解る事は下記の通り。

  • atomic に渡したブロックは何度も再評価されるので、副作用は混ぜるな危険
  • ブロック評価の開始時と値が異なる場合、代入に失敗する。代入に失敗するとブロックを再評価
  • retry を評価するとブロックを強制的に再評価
case2
    val actor1 = actor {
      atomic { implicit txn =>
        println("actor1: Try take the lock.")
        val lock = refLock()
        if (lock) {
          println("actor1: Retry take the lock.")
          retry
        }
        refLock() = true
        println("actor1: Maybe take the lock.")
        Thread.sleep(500) // (1)
      }

      println("actor1: Take the lock.")

      atomic { implicit txn =>
        refLock() = false
      }

      println("actor1: Release the lock.")
      react {case finish => reply(finish)}
    }
--<case2 start>--
actor1: Try take the lock.
actor1: Maybe take the lock.
actor2: Try take the lock.
actor2: Take the lock.
actor1: Try take the lock.
actor1: Retry take the lock.
actor2: Release the lock.
actor1: Try take the lock.
actor1: Maybe take the lock.
actor1: Take the lock.
actor1: Release the lock.
--<case2 end>--

case1 との違いは、スリープの位置を代入後に移動した事のみ。ここから解る事は下記の通り。

  • ブロックの評価が終わるまで値の反映はされていない
  • ブロックの評価が終わった後、評価開始時と値が異なるのであれば再評価となる
case3
    val refLock2 = Ref(false)

    val actor1 = actor {
      atomic { implicit txn =>
        println("actor1: Try take the lock1.")
        val lock1 = refLock1()
        if (lock1) retry
        refLock1() = true

        atomic { implicit txn =>
          println("actor1: Try take the lock2.")
          val lock2 = refLock2()
          if (lock2) retry
          Thread.sleep(500) // (1)
          refLock2() = true // (2)
        }
      }

      println("actor1: Take the lock1 and lock2.")

      atomic { implicit txn =>
        refLock1() = false
        refLock2() = false
      }

      println("actor1: Release the lock1 and lock2.")
      react {case finish => reply(finish)}
    }

    val actor2 = actor {
      Thread.sleep(100)

      atomic { implicit txn =>
        println("actor2: Try take the lock1 and lock2.")
        val lock1 = refLock1()
        val lock2 = refLock2()
        if (lock1 || lock2) retry
        refLock1() = true
        refLock2() = true
      }

      println("actor2: Take the lock1 and lock2.")
      Thread.sleep(500)

      atomic { implicit txn =>
        refLock1() = false
        refLock2() = false
      }

      println("actor2: Release the lock1 and lock2.")
      react {case finish => reply(finish)}
    }
--<case3 start>--
actor1: Try take the lock1.
actor1: Try take the lock2.
actor2: Try take the lock1 and lock2.
actor2: Take the lock1 and lock2.
actor1: Try take the lock1.
actor2: Release the lock1 and lock2.
actor1: Try take the lock1.
actor1: Try take the lock2.
actor1: Take the lock1 and lock2.
actor1: Release the lock1 and lock2.
--<case3 end>--

Ref を二つに増やし、actor1 の atomic を入れ子にしています。
処理順は下記の通りです。

  1. actor1 がロックを一つ獲得する
  2. actor1 が (1) でスリープしてる間に actor2 がロックを二つ獲得する
  3. actor1 が (2) で代入に失敗し、一番外側のブロックの再評価を始める
  4. actor2 がロックを二つ解放する
  5. actor1 がロックを二つ獲得する

ここで解る事は下記の通り。

  • 複数の値を扱える
  • atomic を入れ子にできる
  • 内側のブロックでコミットに失敗した場合、外側のブロックから再評価
case4
    val actor1 = actor {
      val Some(whichBlock) = atomic { implicit txn =>
        println("actor1: Try take the lock.")
        val lock = refLock()
        if (lock) {
          println("actor1: Retry take the lock.")
          retry
        }
        Thread.sleep(500)
        println("actor1: Before update ref to true.")
        refLock() = true
        println("actor1: After update ref to true.")
        Some("atomic1")
      } orAtomic { implicit txn =>
        println("actor1: Could not take the lock.")
        Some("atomic2")
      }

      println("actor1: Eval block is " + whichBlock)
      println("actor1: Lock is " + refLock.single())

      atomic { implicit txn =>
        if (whichBlock == "atomic1" && refLock()) {
          println("actor1: Release the lock.")
          refLock() = false
        }
      }

      react {case finish => reply(finish)}
    }
--<case4 start>--
actor1: Try take the lock.
actor2: Try take the lock.
actor2: Take the lock.
actor1: Before update ref to true.
actor1: Try take the lock.
actor1: Retry take the lock.
actor1: Could not take the lock.
actor1: Eval block is atomic2
actor1: Lock is true
actor2: Release the lock.
--<case4 end>--

orAtomic を使い、ブロックを繋いでいます。 処理順は以下の通りです。

  1. actor1 が (1) でスリープしている間に actor2 がロックを獲得する
  2. actor1 が (2) で代入に失敗し、ブロックが再評価される
  3. 既に actor2 にロックが獲得されているので (3) の条件が真となり retry が評価される
  4. orAtomic で繋いだブロックが評価される

ここで解る事は下記の通り。

  • 代入失敗はブロックの再評価
  • retry を評価すると orAtomic で繋いだブロックが評価

今後について

  • atomic ブロック内での例外の扱いを確認
  • orAtomic 内での例外や retry の動作確認
  • Actor vs Actor + STM の速度比較