Scala STM を試してみる その3 (Actor vs Actor + STM)
前々回、前回に引き続き ScalaSTM を試してみました。
結論から言うと、私の環境では Worker の数が 300 を超えた辺りから、カウンタ用の Actor への問い合わせがタイムアウトしまくるので、Actor + STM の方が良好な結果が得られました。
今回テストに使用したコードは下記の通りです。
import scala.concurrent.stm._ import scala.actors.{Actor, Exit} import scala.actors.Actor.State.{New, Terminated} import System.currentTimeMillis trait EnsureStartActor { def ensureStart(actor: Actor) { actor.getState match { case New => actor.start case Terminated => actor.restart case _ => } } } object SampleForSTM3 extends EnsureStartActor { def main (args: Array[String]) { test( "actor only " -> new Supervisor with UseActor, "actor with stm" -> new Supervisor with UseSTM ) } def test (testCases: (String, Supervisor)*) { 100 to 400 by 50 foreach { workersCount => println("--<%s>--" format workersCount) testCases foreach { testCase => val (message, actor) = testCase ensureStart(actor) val workers = actor.startChildren(workersCount) val startTime = currentTimeMillis val results = workers map { _.increment() } map { future => future() } actor.stop println("%s [%s]ms".format(message, currentTimeMillis - startTime)) println(results) } } } } sealed abstract class SupervisorMessage case class StartChildren(workersCount: Int) extends SupervisorMessage case object Stop extends SupervisorMessage trait WorkerFactory { def makeWorker(): Worker = new Worker() def getCounter(): Option[Actor] = None } trait UseSTM extends WorkerFactory { override def makeWorker(): Worker = new Worker() with CanIncreaseCountWithSTM } trait UseActor extends WorkerFactory { override def makeWorker(): Worker = new Worker() with CanIncreaseCountWithActor override def getCounter(): Option[Actor] = Some(CounterInActor) } class Supervisor() extends Actor with EnsureStartActor with WorkerFactory { trapExit = true def act() = loop { react { case Exit(child: Actor, 'normal) => case Exit(child: Actor, reason) => restartChild(child) case StartChildren(workersCount) => startCounter() reply(startWorkers(workersCount)) case Stop => exit("stop") case unknown => println("Supervisor: unknown message [%s], ignoring" format unknown) } } def restartChild(child: Actor) { link(child) child.restart } def startCounter() { getCounter() match { case Some(counter) => ensureStart(counter) link(counter) case None => } } def startWorkers(workersCount: Int): Array[Worker] = { 1 to workersCount map {count => val worker = makeWorker() link(worker) worker.start worker } toArray } def startChildren(workersCount: Int): List[Worker] = { val arrayWorker = this !? StartChildren(workersCount) match { case workers: Array[Worker] => workers case _ => Array() } arrayWorker.toList } def stop = this ! Stop } trait CanIncreaseCount { def increase(): Int = 1 } trait CanIncreaseCountWithSTM extends CanIncreaseCount { override def increase(): Int = CounterInSTM.increment() } trait CanIncreaseCountWithActor extends CanIncreaseCount { override def increase(): Int = CounterInActor.increment() } case object Increment class Worker() extends Actor with CanIncreaseCount { def act() = loop { react { case Increment => reply(increase()) case unknown => // println("Worker: unknown message [%s], ignoring" format unknown) } } def increment(): Future[Int] = this !! (Increment, { case n: Int => n case _ => 0 }) } object CounterInSTM { val refCounter = Ref(0) def increment(): Int = atomic { implicit txn => refCounter() = refCounter() + 1 refCounter() } } object CounterInActor extends Actor { var counter = 0 def act() = loop { react { case Increment => counter += 1 reply(counter) case unknown => // println("CounterInActor: unknown message [%s], ignoring" format unknown) } } def increment(): Int = { this !? (500, Increment) match { case Some(n) if n.isInstanceOf[Int] => n.asInstanceOf[Int] case _ => // println("timeout") 0 } /* this !? Increment match { case n: Int => n case _ => 0 } */ } }
結果に関しては、また別の機会に。