Scala STM を試してみる その4 (Actor vs Actor + STM その2)
その1、その2、その3 に引き続き、今回は reply が不要なパターン*1で試してみました。
import scala.concurrent.stm._ import scala.actors.{Actor, Exit} import scala.actors.Actor.actor import scala.actors.Actor.State.{New, Terminated} import System.currentTimeMillis import java.io.{File, PrintWriter} trait EnsureStartActor { def ensureStart(actor: Actor) { actor.getState match { case New => actor.start case Terminated => actor.restart case _ => } } } object SampleForSTM4 extends EnsureStartActor { def main (args: Array[String]) { Tester.start } } object Tester extends Actor with EnsureStartActor { def act() { test( "actor only " -> new Supervisor with UseActor, "actor with stm" -> new Supervisor with UseSTM ) LoggerInActor.close() LoggerInSTM.close() } def test (testCases: (String, Supervisor)*) { val tryCount = 10 val testData = 1 to 256 map {n => "a"} mkString 100 to 400 by 50 foreach { workersCount => println("--<actor = %s, try count = %s>--".format(workersCount, tryCount)) testCases foreach { testCase => val (message, actor) = testCase ensureStart(actor) val workers = actor.startChildren(workersCount) Counter.enroll(this, workersCount * tryCount) val startTime = currentTimeMillis workers foreach {_.write(tryCount, testData)} receive { case Finish => case unknown => println("Tester: unknown message [%s], ignoring" format unknown) } println("%s [%s]ms".format(message, currentTimeMillis - startTime)) actor.stop } } } } trait WorkerFactory { def makeWorker(): Worker = new Worker() def getLogger(): Option[Actor] = None } trait UseSTM extends WorkerFactory { override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM } trait UseActor extends WorkerFactory { override def makeWorker(): Worker = new Worker() with CanLoggingWithActor override def getLogger(): Option[Actor] = Some(LoggerInActor) } class Supervisor() extends Actor with EnsureStartActor with WorkerFactory { case class StartChildren(workersCount: Int) case object Stop trapExit = true def act() = loop { react { case Exit(child: Actor, 'normal) => case Exit(child: Actor, reason) => restartChild(child) case StartChildren(workersCount) => startLogger() reply(startWorkers(workersCount)) case Stop => exit("stop") case unknown => println("Supervisor: unknown message [%s], ignoring" format unknown) } } def restartChild(child: Actor) { link(child) child.restart } def startLogger() { getLogger() match { case Some(logger) => ensureStart(logger) link(logger) case None => } } def startWorkers(workersCount: Int): Array[Worker] = { 1 to workersCount map {count => val worker = makeWorker() link(worker) worker.start worker } toArray } def startChildren(workersCount: Int): List[Worker] = { val arrayWorker = this !? StartChildren(workersCount) match { case workers: Array[Worker] => workers case _ => Array() } arrayWorker.toList } def stop = this ! Stop } trait CanLogging { def writeLog(line: String) {} } trait CanLoggingWithSTM extends CanLogging { override def writeLog(line: String) {LoggerInSTM.write(line)} } trait CanLoggingWithActor extends CanLogging { override def writeLog(line: String) {LoggerInActor.write(line)} } class Worker() extends Actor with CanLogging { case class WriteLog(tryCount: Int, line: String) def act() = loop { react { case WriteLog(tryCount, line) => 1 to tryCount foreach {n => writeLog(line)} case unknown => println("Worker: unknown message [%s], ignoring" format unknown) } } def write(tryCount: Int, line: String) { this ! WriteLog(tryCount, line) } } trait HasWriter { var hasWriter: Option[PrintWriter] = None def open(name: String) { hasWriter = Some(new PrintWriter(new File("./%s.log" format name))) } def close(): Unit = hasWriter match { case Some(writer) => writer.close case _ => } def puts(line: String): Unit = hasWriter match { case Some(writer) => writer.println(line) case _ => } } object LoggerInSTM extends HasWriter { open("stm") val refLock = Ref(false) def write(line: String) { atomic { implicit txn => val lock = refLock() if (lock) retry refLock() = true } puts(line) Counter.increment atomic { implicit txn => refLock() = false } } } object LoggerInActor extends Actor with HasWriter { case class WriteLog(line: String) open("actor") def act() = loop { react { case WriteLog(line) => puts(line) Counter.increment case unknown => println("LoggerInActor: unknown message [%s], ignoring" format unknown) } } def write(line: String) { this ! WriteLog(line) } } case object Finish object Counter { var hasReceiver: Option[Actor] = None var replyCount = 0 val refCounter = Ref(0) def increment() { atomic { implicit txn => refCounter() = refCounter() + 1 } sendFinish() } def sendFinish() { if (replyCount != 0 && replyCount <= refCounter.single()) { hasReceiver match { case Some(receiver) => receiver ! Finish case _ => } } } def enroll(setReceiver: Actor, setReplyCount: Int) { hasReceiver = Some(setReceiver) replyCount = setReplyCount atomic { implicit txn => refCounter() = 0 } } }
--<actor = 100, try count = 10>-- actor only [206]ms actor with stm [146]ms --<actor = 150, try count = 10>-- actor only [184]ms actor with stm [232]ms --<actor = 200, try count = 10>-- actor only [247]ms actor with stm [54]ms --<actor = 250, try count = 10>-- actor only [166]ms actor with stm [58]ms --<actor = 300, try count = 10>-- actor only [223]ms actor with stm [92]ms --<actor = 350, try count = 10>-- actor only [112]ms actor with stm [47]ms --<actor = 400, try count = 10>-- actor only [106]ms actor with stm [62]ms
結果は毎回違うので何とも言えませんが、多分… STM を使った方が性能が良いです。
内部で終了を判定するカウンタを利用していますが、これを Actor で作ると変な挙動*2を示したので STM に切り替えたら正常に動作するようになりました。また、起動する Actor を 3000 とかにすると、やはり変な挙動になるので 400 に抑えてます。
Erlang と同じノリで多数の Actor を起動したり、多数のメッセージを送ると安定しないので、Scala では、なるべく STM で代用できるところは STM を使った方が良さそうという結論に達しました。
STM は順序を保証しないので、例えば、ログを残す順序が大切な場合は、Actor を使う事になります。*3