Scala STM を試してみる その3 (Actor vs Actor + STM)

前々回前回に引き続き ScalaSTM を試してみました。
結論から言うと、私の環境では Worker の数が 300 を超えた辺りから、カウンタ用の Actor への問い合わせがタイムアウトしまくるので、Actor + STM の方が良好な結果が得られました。
今回テストに使用したコードは下記の通りです。

import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis

trait EnsureStartActor {
  def ensureStart(actor: Actor) {
    actor.getState match {
      case New        => actor.start
      case Terminated => actor.restart
      case _          =>
    }
  }
}

object SampleForSTM3 extends EnsureStartActor {
  def main (args: Array[String]) {
    test(
      "actor only    " -> new Supervisor with UseActor,
      "actor with stm" -> new Supervisor with UseSTM
    )
  }

  def test (testCases: (String, Supervisor)*) {
    100 to 400 by 50 foreach { workersCount =>
      println("--<%s>--" format workersCount)
      testCases foreach { testCase =>
        val (message, actor) = testCase
        ensureStart(actor)
        val workers = actor.startChildren(workersCount) 
        val startTime = currentTimeMillis

        val results = workers map {
          _.increment()
        } map {
          future => future()
        }

        actor.stop
        println("%s [%s]ms".format(message, currentTimeMillis - startTime))
        println(results)
      }
    }
  }
}

sealed abstract class SupervisorMessage
case class  StartChildren(workersCount: Int) extends SupervisorMessage
case object Stop                             extends SupervisorMessage

trait WorkerFactory {
  def makeWorker(): Worker = new Worker()
  def getCounter(): Option[Actor] = None
}

trait UseSTM extends WorkerFactory {
  override def makeWorker(): Worker = new Worker() with CanIncreaseCountWithSTM
}

trait UseActor extends WorkerFactory {
  override def makeWorker(): Worker = new Worker() with CanIncreaseCountWithActor
  override def getCounter(): Option[Actor] = Some(CounterInActor)
}

class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
  trapExit = true

  def act() = loop {
    react {
      case Exit(child: Actor, 'normal) =>
      case Exit(child: Actor, reason) =>
        restartChild(child)
      case StartChildren(workersCount) =>
        startCounter()
        reply(startWorkers(workersCount))
      case Stop =>
        exit("stop")
      case unknown =>
        println("Supervisor: unknown message [%s], ignoring" format unknown)
    }
  }

  def restartChild(child: Actor) {
    link(child)
    child.restart
  }

  def startCounter() {
    getCounter() match {
      case Some(counter) =>
        ensureStart(counter)
        link(counter)
      case None =>
    }
  }

  def startWorkers(workersCount: Int): Array[Worker] = {
    1 to workersCount map {count =>
      val worker = makeWorker()
      link(worker)
      worker.start
      worker
    } toArray
  }

  def startChildren(workersCount: Int): List[Worker] = {
    val arrayWorker = this !? StartChildren(workersCount) match {
      case workers: Array[Worker] => workers
      case _                      => Array()
    }
    arrayWorker.toList
  }

  def stop = this ! Stop
}

trait CanIncreaseCount {
  def increase(): Int = 1
}

trait CanIncreaseCountWithSTM extends CanIncreaseCount {
  override def increase(): Int = CounterInSTM.increment()
}

trait CanIncreaseCountWithActor extends CanIncreaseCount {
  override def increase(): Int = CounterInActor.increment()
}

case object Increment
class Worker() extends Actor with CanIncreaseCount {
  def act() = loop {
    react {
      case Increment =>
        reply(increase())
      case unknown =>
//        println("Worker: unknown message [%s], ignoring" format unknown)
    }
  }

  def increment(): Future[Int] = this !! (Increment, {
    case n: Int => n
    case _      => 0
  })
}

object CounterInSTM {
  val refCounter = Ref(0)

  def increment(): Int = atomic { implicit txn =>
    refCounter() = refCounter() + 1
    refCounter()
  }
}

object CounterInActor extends Actor {
  var counter = 0

  def act() = loop {
    react {
      case Increment =>
        counter += 1
        reply(counter)
      case unknown =>
//        println("CounterInActor: unknown message [%s], ignoring" format unknown)
    }
  }

  def increment(): Int = {
    this !? (500, Increment) match {
      case Some(n) if n.isInstanceOf[Int] =>
        n.asInstanceOf[Int]
      case _ =>
//        println("timeout")
        0
    }
/*
    this !? Increment match {
      case n: Int => n
      case _      => 0
    }
*/
  }
}

結果に関しては、また別の機会に。