Scala STM CommitBarrier

この記事は、Scala Advent Calendar JP 2011 の13日目です。

今回は、Scala STM 0.4 から導入された CommitBarrier の解説を行います。

CommitBarrier の基本

まずは、下記のコードをご覧ください。

import org.scalatest.fixture.FixtureFunSuite

import scala.concurrent.stm._
import scala.concurrent.ops._
import java.util.concurrent.CountDownLatch

class CommitBarrierSuite extends FixtureFunSuite {
  case class F(cb: CommitBarrier, ms: Seq[CommitBarrier.Member], rs: Seq[Ref[String]], gate: CountDownLatch)
  type FixtureParam = F

  def withFixture(test: OneArgTest): Unit = {
    val cb = CommitBarrier(300)
    test(F(
      cb,
      Seq.tabulate(2) {n => cb.addMember()},
      Seq.tabulate(3) {n => Ref("start")},
      new CountDownLatch(1)
    ))
  }

  test("競合なし") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f1"
        Thread.sleep(100)
        'finish
      }
      assert(result1 === Right('finish))
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val startTime = System.currentTimeMillis
      val result2 = f.ms(1).atomic { implicit txn =>
        f.rs(1)() = f.rs(1)() + "-f2"
        'finish
      } // fut1 の f.ms(0) の atomic が終わるまで待つ
      val elapseTime = System.currentTimeMillis - startTime;
      assert(result2 === Right('finish))
      assert(100 < elapseTime)
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }
    
    assert(f.rs(0).single() === "start-f1")
    assert(f.rs(1).single() === "start-f2")

    val thrown = intercept[IllegalStateException] {
      f.cb.addMember()
    }
    assert(thrown.getMessage === "commit barrier has already committed")
  }
}

scalatest の詳細な説明は省きます。今後、上記の例にテストケースを追加していく形で CommitBarrier の説明を行っていきますが、withFixture 内で作られた case class F のインスタンスが各テストケースの引数として渡されるとだけ覚えておいてください。

という事で、始めのテストケースですが、これは、下記の事を確認しています。

  • 同一 CommitBarrier に属する Member の atomic 同士は、お互いの終了*1を待ち合わせる
  • CommitBarrier を用いた atomic の値は Either[CancelCause, "atomic に渡した関数の戻り値の型"] 型となる
  • 一度終了した CommitBarrier には、新たに Member を追加できない

f.ms(0).atomic には、終了するまで 100ms 必要とする関数を、f.ms(1).atomic には、すぐに終了する関数を渡していますが、f.ms(1) は、同一 CommitBarrier の他の全ての Member …即ち f.ms(0) の終了を待つので、引きずられて終了までに 100ms 以上の時間が掛かっています。

ちなみに、Member が一つでもブロックしてしまうと CommitBarrier 全体がブロックしてしまうので、下記のように CommitBarrier のインスタンスを得る際にタイムアウトの時間を指定できます。

    val cb = CommitBarrier(300)

上記のテストケースは、それぞれの atomic ブロックが異なる Ref を更新しているので、次は同じ Ref を更新してみます。下記のテストケースを追加してください。

  test("競合あり") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f1"
      }
      assert(result1.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val result2 = f.ms(1).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f2"
      }
      assert(result2.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.rs(0).single() === "start")
    
    val thrown = intercept[IllegalStateException] {
      f.cb.addMember()
    }
    assert(thrown.getMessage === "commit barrier has already rolled back")
  }

このテストケースは、同一 CommitBarrier に属する Member 間で競合が発生した際、Ref の更新は行われず、atomic の値は Left となる事を確認しています。

では、もう少し複雑にしてみましょう。

  test("競合あり2") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        Thread.sleep(100) // ここがポイント!!
        f.rs(0)() = f.rs(0)() + "-f1"
        'finish
      }
      assert(result1 === Right('finish))
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val result2 = f.ms(1).atomic { implicit txn =>
        f.rs(1)() = f.rs(0)() + "-f2"
        'finish
      }
      assert(result2 === Right('finish))
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.rs(0).single() === "start-f1")
    assert(f.rs(1).single() === "start-f2")
  }

このテストケースでは、f.rs(0) を元に、f.rs(0) 自身と f.rs(1) の更新を行っていますが、先に f.rs(1) を更新して f.ms(1) を Commited 状態にしてしまい、後から f.rs(0) を更新して f.ms(0) を Commited 状態にしています。

更に続けて、次のテストケースも追加してみましょう。

  test("競合あり3") { f =>
    val fut1 = future {
      f.gate.await()
      val result1 = f.ms(0).atomic { implicit txn =>
        f.rs(0)() = f.rs(0)() + "-f1"
        'finish
      }
      assert(result1.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }

    val fut2 = future {
      f.gate.await()
      val result2 = f.ms(1).atomic { implicit txn =>
        Thread.sleep(100) // ここがポイント!!
        f.rs(1)() = f.rs(0)() + "-f2"
        'finish
      }
      assert(result2.left.get.isInstanceOf[CommitBarrier.MemberCycle])
      'finish
    }
    
    f.gate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.rs(0).single() === "start")
    assert(f.rs(1).single() === "start")
  }

このテストケースは、一つ前のテストケースと Thread.sleep の位置が異なり、先に f.rs(0) を更新してから、f.rs(0) を元に f.rs(1) の更新を行います。このようにすると、f.ms(1).atomic 開始時と Thread.sleep 後の f.rs(0) の値が異なるため、f.ms(1).atomic は失敗し、結果、f.ms(0).atomic も失敗となります。

ちなみに、概ね f.ms(0).atomic と f.ms(0).atomic の値は Left(CommitBarrier.MemberCycle) となりますが、何度かテストを繰り返していると Left(Timeout) になる事があります。これは、f.ms(1).atomic に渡された関数の再評価回数が異なり、場合によっては、始めに設定したタイムアウト 300ms を超えてしまうためなのですが、何故、再評価の回数が異なるのか?そもそも、何故、再評価されてしまうのか?*2という所は調査中です。

キャンセル

CommitBarrier の基本が明かになった所で、キャンセルの説明を行います。説明の簡易化のため、テストスイートを新たに作成します。

import org.scalatest.fixture.FixtureFunSuite

import scala.concurrent.stm._
import scala.concurrent.ops._

class CancelSuite extends FixtureFunSuite {
  case class F(cb: CommitBarrier, ref: Ref[String])
  type FixtureParam = F

  def withFixture(test: OneArgTest): Unit = test(F(
    CommitBarrier(60000),
    Ref("start")
  ))

  test("キャンセル") { f =>
    val startingGate = new java.util.concurrent.CountDownLatch(1)
    val ms = Seq.tabulate(2) {n => f.cb.addMember()}

    val fut1 = future {
      startingGate.await()
      val result1 = ms(0).atomic { implicit txn =>
        f.ref() = f.ref() + "-f1"
        Thread.sleep(100)
        ms(0).cancel(CommitBarrier.UserCancel("cancel"))
        'finish
      }
      assert(result1 === Left(CommitBarrier.UserCancel("cancel")))
      'finish
    }

    val r2 = Ref("start")
    val fut2 = future {
      startingGate.await()
      val result2 = ms(1).atomic { implicit txn =>
        r2() = r2() + "-f2"
        'finish
      } // ms(0) の待ち合わせ
      assert(result2 === Right('finish))
      'finish
    }
 
    startingGate.countDown()

    Seq(fut1, fut2).foreach { fut =>
      assert(fut() === 'finish)
    }

    assert(f.ref.single() === "start")
    assert(r2.single() === "start-f2")
  }
}

簡略化と言いつつ、Fixture が減った影響で見難くなってますが…後のテストケースの為に我慢してください。このテストケースは、下記の事を確認しています。

  • CommitBarrier の Member は、atomic 内で処理をキャンセルできる
  • キャンセルされた Member は、他の Member の commit に影響を与えない

競合や例外が発生した場合、前述した通り、同じ CommitBarrier に属する他の Member に影響を及ぼしますが、キャンセルの場合は、影響を及ぼしません。

次に、下記のテストケースを追加してください。

  test("CommitBarrier の再利用") { f =>
    val m = f.cb.addMember()

    val result1 = m.atomic { implicit txn =>
      f.ref() = "atomic1"
      m.cancel(CommitBarrier.UserCancel("cancel"))
      'finish
    }
 
    assert(result1 === Left(CommitBarrier.UserCancel("cancel")))
    assert(f.ref.single() === "start")

    val m2 = f.cb.addMember()
    val result2 = m2.atomic { implicit txn =>
      f.ref() = "atomic2"
      'finish
    }

    assert(result2 === Right('finish))
    assert(f.ref.single() === "atomic2")
    
    val thrown = intercept[IllegalStateException] {
      val m3 = f.cb.addMember()
    }
    assert(thrown.getMessage === "commit barrier has already committed")
  }
 

このテストケースでは、CommitBarrier に属している Member が全てキャンセルされた場合、その CommitBarrier は、まだ処理が終わっていないと見なされるため addMember が可能である事を確認しています。

では、atomic ブロックがネストしている場合は、どうなるでしょうか?下記のテストケースを追加してください。

  test("atomic のネスト(m.atomic が内側)") { f =>
    val m = f.cb.addMember()
    
    atomic { implicit txn =>
      f.ref() = "outer"
      m.atomic { implicit txn =>
        f.ref() = f.ref() + "-inner"
        m.cancel(CommitBarrier.UserCancel("cancel"))
      }
    }

    assert(f.ref.single() === "outer")
  }

このテストケースは、通常の atomic の内で CommitBarrier の Member をキャンセルした場合、通常の外側の atomic で操作した Ref のみ commit される事を確認しています。

次に、ネストしている atomic を反対にしてみましょう。下記のテストケースを追加してください。

  test("atomic のネスト(m.atomic が外側)") { f =>
    val m = f.cb.addMember()
    
    m.atomic { implicit txn =>
      f.ref() = "outer"
      atomic { implicit txn =>
        f.ref() = f.ref() + "-inner"
        m.cancel(CommitBarrier.UserCancel("cancel"))
      }
    }

    assert(f.ref.single() === "start")
  }

このテストケースでは、内側にある通常の atomic の中から外側の Member をキャンセルした場合、Ref の操作が全て無効となる事を確認しています。

その他

  • CommitBarrier に対する操作…例えば addMember 等は、CommitBarrier 毎に synchronized されているので注意してください
  • Thread と Member は、一対一で利用される事を想定してるようです
  • 上記で試した以外に、例外が発生した場合にどうなるのか?等は、こちらでご確認ください

例の如く、突っ込み大歓迎です。

*1:正常終了とは限らない。Cancel や例外でも atomic は終了する。

*2:f.rs(0) を Read した時点で、commit できない事は解りきっているので、即失敗で良いんじゃないかと思っている