Scala STM を試してみる その4 (Actor vs Actor + STM その2)

その1その2その3 に引き続き、今回は reply が不要なパターン*1で試してみました。

import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.actor
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis
import java.io.{File, PrintWriter}

trait EnsureStartActor {
  def ensureStart(actor: Actor) {
    actor.getState match {
      case New        => actor.start
      case Terminated => actor.restart
      case _          =>
    }
  }
}

object SampleForSTM4 extends EnsureStartActor {
  def main (args: Array[String]) {
    Tester.start
  }
}

object Tester extends Actor with EnsureStartActor {
  def act() {
    test(
      "actor only    " -> new Supervisor with UseActor,
      "actor with stm" -> new Supervisor with UseSTM
    )
    LoggerInActor.close()
    LoggerInSTM.close()
  }

  def test (testCases: (String, Supervisor)*) {
    val tryCount = 10
    val testData = 1 to 256 map {n => "a"} mkString

    100 to 400 by 50 foreach { workersCount =>
      println("--<actor = %s, try count = %s>--".format(workersCount, tryCount))
      testCases foreach { testCase =>
        val (message, actor) = testCase
        ensureStart(actor)

        val workers = actor.startChildren(workersCount) 
        Counter.enroll(this, workersCount * tryCount)

        val startTime = currentTimeMillis

        workers foreach {_.write(tryCount, testData)}
        receive {
          case Finish  =>
          case unknown =>
            println("Tester: unknown message [%s], ignoring" format unknown)
        }

        println("%s [%s]ms".format(message, currentTimeMillis - startTime))
        actor.stop
      }
    }
  }
}

trait WorkerFactory {
  def makeWorker(): Worker = new Worker()
  def getLogger(): Option[Actor] = None
}

trait UseSTM extends WorkerFactory {
  override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM
}

trait UseActor extends WorkerFactory {
  override def makeWorker(): Worker = new Worker() with CanLoggingWithActor
  override def getLogger(): Option[Actor] = Some(LoggerInActor)
}

class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
  case class  StartChildren(workersCount: Int)
  case object Stop

  trapExit = true

  def act() = loop {
    react {
      case Exit(child: Actor, 'normal) =>
      case Exit(child: Actor, reason) =>
        restartChild(child)
      case StartChildren(workersCount) =>
        startLogger()
        reply(startWorkers(workersCount))
      case Stop =>
        exit("stop")
      case unknown =>
        println("Supervisor: unknown message [%s], ignoring" format unknown)
    }
  }

  def restartChild(child: Actor) {
    link(child)
    child.restart
  }

  def startLogger() {
    getLogger() match {
      case Some(logger) =>
        ensureStart(logger)
        link(logger)
      case None =>
    }
  }

  def startWorkers(workersCount: Int): Array[Worker] = {
    1 to workersCount map {count =>
      val worker = makeWorker()
      link(worker)
      worker.start
      worker
    } toArray
  }

  def startChildren(workersCount: Int): List[Worker] = {
    val arrayWorker = this !? StartChildren(workersCount) match {
      case workers: Array[Worker] => workers
      case _                      => Array()
    }
    arrayWorker.toList
  }

  def stop = this ! Stop
}

trait CanLogging {
  def writeLog(line: String) {}
}

trait CanLoggingWithSTM extends CanLogging {
  override def writeLog(line: String) {LoggerInSTM.write(line)}
}

trait CanLoggingWithActor extends CanLogging {
  override def writeLog(line: String) {LoggerInActor.write(line)}
}

class Worker() extends Actor with CanLogging {
  case class WriteLog(tryCount: Int, line: String)

  def act() = loop {
    react {
      case WriteLog(tryCount, line) =>
        1 to tryCount foreach {n => writeLog(line)}
      case unknown =>
        println("Worker: unknown message [%s], ignoring" format unknown)
    }
  }

  def write(tryCount: Int, line: String) {
    this ! WriteLog(tryCount, line)
  }
}

trait HasWriter {
  var hasWriter: Option[PrintWriter] = None

  def open(name: String) {
    hasWriter = Some(new PrintWriter(new File("./%s.log" format name)))
  }

  def close(): Unit = hasWriter match {
    case Some(writer) => writer.close
    case _ =>
  }

  def puts(line: String): Unit = hasWriter match {
    case Some(writer) => writer.println(line)
    case _ =>
  }
}

object LoggerInSTM extends HasWriter {
  open("stm")

  val refLock = Ref(false)

  def write(line: String) {
    atomic { implicit txn =>
      val lock = refLock()
      if (lock) retry
      refLock() = true
    }

    puts(line)
    Counter.increment

    atomic { implicit txn =>
      refLock() = false
    }
  }
}

object LoggerInActor extends Actor with HasWriter {
  case class WriteLog(line: String)

  open("actor")

  def act() = loop {
    react {
      case WriteLog(line) =>
        puts(line)
        Counter.increment
      case unknown =>
        println("LoggerInActor: unknown message [%s], ignoring" format unknown)
    }
  }

  def write(line: String) {
    this ! WriteLog(line)
  }
}

case object Finish
object Counter {
  var hasReceiver: Option[Actor] = None
  var replyCount = 0
  val refCounter = Ref(0)

  def increment() {
    atomic { implicit txn =>
      refCounter() = refCounter() + 1
    }
    sendFinish()
  }

  def sendFinish() {
    if (replyCount != 0 && replyCount <= refCounter.single()) {
      hasReceiver match {
        case Some(receiver) => receiver ! Finish
        case _ =>
      }
    }
  }

  def enroll(setReceiver: Actor, setReplyCount: Int) {
    hasReceiver = Some(setReceiver)
    replyCount  = setReplyCount
    atomic { implicit txn =>
      refCounter() = 0
    }
  }
}
--<actor = 100, try count = 10>--
actor only     [206]ms
actor with stm [146]ms
--<actor = 150, try count = 10>--
actor only     [184]ms
actor with stm [232]ms
--<actor = 200, try count = 10>--
actor only     [247]ms
actor with stm [54]ms
--<actor = 250, try count = 10>--
actor only     [166]ms
actor with stm [58]ms
--<actor = 300, try count = 10>--
actor only     [223]ms
actor with stm [92]ms
--<actor = 350, try count = 10>--
actor only     [112]ms
actor with stm [47]ms
--<actor = 400, try count = 10>--
actor only     [106]ms
actor with stm [62]ms

結果は毎回違うので何とも言えませんが、多分… STM を使った方が性能が良いです。
内部で終了を判定するカウンタを利用していますが、これを Actor で作ると変な挙動*2を示したので STM に切り替えたら正常に動作するようになりました。また、起動する Actor を 3000 とかにすると、やはり変な挙動になるので 400 に抑えてます。
Erlang と同じノリで多数の Actor を起動したり、多数のメッセージを送ると安定しないので、Scala では、なるべく STM で代用できるところは STM を使った方が良さそうという結論に達しました。
STM は順序を保証しないので、例えば、ログを残す順序が大切な場合は、Actor を使う事になります。*3

*1:ログをファイルに保存

*2:reply 使ってないのに途中でメッセージが途切れました

*3:もしくは、atomic に入る直前に記録する時間を取得しておいて、参照する際にログを並べ替えるとか…。面倒だ。