Unite を始めてみた(ack と組み合わせて使う方法など…)

Vim の Plugin を更新がてら、Yokohama.vim #0 の頃から気になっていた Unite に入門してみました。

基本設定

一先ずインストール後に下記のようなキーマッピングにしてみました。

nmap ;; :Unite
nmap ;s :Unite source<CR>
nmap ;b :Unite buffer<CR>
nmap ;f :Unite file<CR>
nmap ;r :Unite ref/

';' を全て Unite に捧げてます。使用頻度の高い source, buffer, file 以外は、試行錯誤中です。

Unite file でカレントファイルと同じディレクトリのファイル一覧を表示してくれるので、':e' の使用頻度が減り、下記を削除できました。

autocmd BufNewFile,BufRead,BufEnter * exec ':lcd ' . expand("%:p:h") 

ちなみに、Unite + vim-ref の組み合わせは、最高に使い勝手が良いので、Vim をご使用の Erlanger や Perl Mongers は是非ご利用を…。

ack 用の設定

好みの問題ですが、grep より ack の方が好きなので、unite-qf と組み合わせて、下記のようにしてみました。

set grepprg=ack\ -a\ --nocolor
nmap ;q :Unite qf<CR>
nmap ;/ :grep  \| Unite qf<LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT>

11連発が格好悪いのですが… ";/" で ":grep | Unite qf" まで入力されるので、後は検索ワード(正規表現可)を入れると、ack の結果を Unite で表示します。Unite qf の箇所は、好みで cw20 とかでも良いと思います。
注意点として、ack は、カレントディレクトリ配下を検索範囲とするので、前述の autocmd で lcd を実行する設定を入れないと、カレントファイルが存在するディレクトリ以下を検索範囲にしません。


毎度の如く突っ込み大歓迎。

neocomplcache + snipMate をやめて neocomplcache 単体に切り替える

久しぶりに neocomplcache を git pull したら snipMate の multiple snippets*1 に対応していたので、neocomplcache 単体に切り替えてみました。
これで、snipMate_complete.vim は、お払い箱です。

.vimrc に追加した内容

始めに、snipMate の TAB キーを模倣するため、マッピングを追加*2

imap <expr><TAB> neocomplcache#sources#snippets_complete#expandable() ? "\<Plug>(neocomplcache_snippets_expand)" : pumvisible() ? "\<C-n>" : "\<TAB>"
smap <expr><TAB> neocomplcache#sources#snippets_complete#expandable() ? "\<Plug>(neocomplcache_snippets_expand)" : pumvisible() ? "\<C-n>" : "\<TAB>"

次に、snippets の保存ディレクトリを snipMate に合わせる*3

let g:neocomplcache_snippets_dir = $HOME . '/.vim/snippets'

最後に、snippets 内で `Filename()` を使いたかったので snipMate から移植

fun! Filename(...)
  let filename = expand('%:t:r')
  if filename == '' | return a:0 == 2 ? a:2 : '' | endif
  return !a:0 || a:1 == '' ? filename : substitute(a:1, '$1', filename, 'g')
endf

*1:同じトリガーキーを複数登録できる機能

*2:smap を加えないと、デフォルト値で良い場合に TAB で移動できない

*3:neocomplcache_snippets_dir は、カンマ区切りで複数指定可能

他のプロセスから送られてきたメッセージを誤受信しないよう制限する

刺身☆ブーメランのはてなダイアリー 2011/01/12(Wed) への返信です。
元コードの wait_server:client/3 は、receive で From を束縛しているので、この状態だとメッセージの選択受信になりません。From を Server_Pid に変更すると、ただしく動作すると思います*1
蛇足ですが、client を spawn する件と make_ref/0 を使う件に対応した添削コードを下記に掲載しておきます*2

-module(wait_server).

-export([spawn_server/0, spawn_client/3]).
-export([server/0, client/3]).
-export([test/0]).

spawn_server() ->
  spawn(wait_server, server, []).

spawn_client(Server, Time, Req) ->
  spawn(wait_server, client, [Server, Time, Req]).

server() ->
  receive
    {Client, Ref, Time, Req} ->
      timer:sleep(Time),
      Client ! {self(), Ref, Req};
    stop ->
      exit(normal);
    Unknown ->
      io:format("Server: received unknown message: ~p~n", [Unknown])
    after 60000 ->
      io:format("Server: timeout~n"),
      exit(normal)
  end,
  server().

client(Server, Time, Req) ->
  Ref = make_ref(),
  Server ! {self(), Ref, Time, Req},
  receive
    {Server, Ref, Res} ->
      io:format("Client: received ~p from ~p~n", [Res, Server]),
      {ok, Res}
    after Time + 100 ->
      io:format("Client: timeout~n"),
      {ng, timeout}
  end.

test() ->
  Data = {foo, bar, baz},
  Server = spawn_server(),
  {ok, Data} = client(Server, 100, Data),
  Client = spawn_client(Server, 100, Data),
  lists:foreach(
    fun (_) -> Client ! ignoring end,
    lists:seq(1, 10)
  ),
  timer:sleep(100),
  Server ! stop,
  ok.
1> wait_server:test().
Client: received {foo,bar,baz} from <0.33.0>
Client: received {foo,bar,baz} from <0.33.0>
ok
2> q().

*1:既に Server_Pid には値が束縛済みであるため、メッセージの選択受信となります

*2:多少、手抜きしてますが…

Scala STM を試してみる その4 (Actor vs Actor + STM その2)

その1その2その3 に引き続き、今回は reply が不要なパターン*1で試してみました。

import scala.concurrent.stm._
import scala.actors.{Actor, Exit}
import scala.actors.Actor.actor
import scala.actors.Actor.State.{New, Terminated}
import System.currentTimeMillis
import java.io.{File, PrintWriter}

trait EnsureStartActor {
  def ensureStart(actor: Actor) {
    actor.getState match {
      case New        => actor.start
      case Terminated => actor.restart
      case _          =>
    }
  }
}

object SampleForSTM4 extends EnsureStartActor {
  def main (args: Array[String]) {
    Tester.start
  }
}

object Tester extends Actor with EnsureStartActor {
  def act() {
    test(
      "actor only    " -> new Supervisor with UseActor,
      "actor with stm" -> new Supervisor with UseSTM
    )
    LoggerInActor.close()
    LoggerInSTM.close()
  }

  def test (testCases: (String, Supervisor)*) {
    val tryCount = 10
    val testData = 1 to 256 map {n => "a"} mkString

    100 to 400 by 50 foreach { workersCount =>
      println("--<actor = %s, try count = %s>--".format(workersCount, tryCount))
      testCases foreach { testCase =>
        val (message, actor) = testCase
        ensureStart(actor)

        val workers = actor.startChildren(workersCount) 
        Counter.enroll(this, workersCount * tryCount)

        val startTime = currentTimeMillis

        workers foreach {_.write(tryCount, testData)}
        receive {
          case Finish  =>
          case unknown =>
            println("Tester: unknown message [%s], ignoring" format unknown)
        }

        println("%s [%s]ms".format(message, currentTimeMillis - startTime))
        actor.stop
      }
    }
  }
}

trait WorkerFactory {
  def makeWorker(): Worker = new Worker()
  def getLogger(): Option[Actor] = None
}

trait UseSTM extends WorkerFactory {
  override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM
}

trait UseActor extends WorkerFactory {
  override def makeWorker(): Worker = new Worker() with CanLoggingWithActor
  override def getLogger(): Option[Actor] = Some(LoggerInActor)
}

class Supervisor() extends Actor with EnsureStartActor with WorkerFactory {
  case class  StartChildren(workersCount: Int)
  case object Stop

  trapExit = true

  def act() = loop {
    react {
      case Exit(child: Actor, 'normal) =>
      case Exit(child: Actor, reason) =>
        restartChild(child)
      case StartChildren(workersCount) =>
        startLogger()
        reply(startWorkers(workersCount))
      case Stop =>
        exit("stop")
      case unknown =>
        println("Supervisor: unknown message [%s], ignoring" format unknown)
    }
  }

  def restartChild(child: Actor) {
    link(child)
    child.restart
  }

  def startLogger() {
    getLogger() match {
      case Some(logger) =>
        ensureStart(logger)
        link(logger)
      case None =>
    }
  }

  def startWorkers(workersCount: Int): Array[Worker] = {
    1 to workersCount map {count =>
      val worker = makeWorker()
      link(worker)
      worker.start
      worker
    } toArray
  }

  def startChildren(workersCount: Int): List[Worker] = {
    val arrayWorker = this !? StartChildren(workersCount) match {
      case workers: Array[Worker] => workers
      case _                      => Array()
    }
    arrayWorker.toList
  }

  def stop = this ! Stop
}

trait CanLogging {
  def writeLog(line: String) {}
}

trait CanLoggingWithSTM extends CanLogging {
  override def writeLog(line: String) {LoggerInSTM.write(line)}
}

trait CanLoggingWithActor extends CanLogging {
  override def writeLog(line: String) {LoggerInActor.write(line)}
}

class Worker() extends Actor with CanLogging {
  case class WriteLog(tryCount: Int, line: String)

  def act() = loop {
    react {
      case WriteLog(tryCount, line) =>
        1 to tryCount foreach {n => writeLog(line)}
      case unknown =>
        println("Worker: unknown message [%s], ignoring" format unknown)
    }
  }

  def write(tryCount: Int, line: String) {
    this ! WriteLog(tryCount, line)
  }
}

trait HasWriter {
  var hasWriter: Option[PrintWriter] = None

  def open(name: String) {
    hasWriter = Some(new PrintWriter(new File("./%s.log" format name)))
  }

  def close(): Unit = hasWriter match {
    case Some(writer) => writer.close
    case _ =>
  }

  def puts(line: String): Unit = hasWriter match {
    case Some(writer) => writer.println(line)
    case _ =>
  }
}

object LoggerInSTM extends HasWriter {
  open("stm")

  val refLock = Ref(false)

  def write(line: String) {
    atomic { implicit txn =>
      val lock = refLock()
      if (lock) retry
      refLock() = true
    }

    puts(line)
    Counter.increment

    atomic { implicit txn =>
      refLock() = false
    }
  }
}

object LoggerInActor extends Actor with HasWriter {
  case class WriteLog(line: String)

  open("actor")

  def act() = loop {
    react {
      case WriteLog(line) =>
        puts(line)
        Counter.increment
      case unknown =>
        println("LoggerInActor: unknown message [%s], ignoring" format unknown)
    }
  }

  def write(line: String) {
    this ! WriteLog(line)
  }
}

case object Finish
object Counter {
  var hasReceiver: Option[Actor] = None
  var replyCount = 0
  val refCounter = Ref(0)

  def increment() {
    atomic { implicit txn =>
      refCounter() = refCounter() + 1
    }
    sendFinish()
  }

  def sendFinish() {
    if (replyCount != 0 && replyCount <= refCounter.single()) {
      hasReceiver match {
        case Some(receiver) => receiver ! Finish
        case _ =>
      }
    }
  }

  def enroll(setReceiver: Actor, setReplyCount: Int) {
    hasReceiver = Some(setReceiver)
    replyCount  = setReplyCount
    atomic { implicit txn =>
      refCounter() = 0
    }
  }
}
--<actor = 100, try count = 10>--
actor only     [206]ms
actor with stm [146]ms
--<actor = 150, try count = 10>--
actor only     [184]ms
actor with stm [232]ms
--<actor = 200, try count = 10>--
actor only     [247]ms
actor with stm [54]ms
--<actor = 250, try count = 10>--
actor only     [166]ms
actor with stm [58]ms
--<actor = 300, try count = 10>--
actor only     [223]ms
actor with stm [92]ms
--<actor = 350, try count = 10>--
actor only     [112]ms
actor with stm [47]ms
--<actor = 400, try count = 10>--
actor only     [106]ms
actor with stm [62]ms

結果は毎回違うので何とも言えませんが、多分… STM を使った方が性能が良いです。
内部で終了を判定するカウンタを利用していますが、これを Actor で作ると変な挙動*2を示したので STM に切り替えたら正常に動作するようになりました。また、起動する Actor を 3000 とかにすると、やはり変な挙動になるので 400 に抑えてます。
Erlang と同じノリで多数の Actor を起動したり、多数のメッセージを送ると安定しないので、Scala では、なるべく STM で代用できるところは STM を使った方が良さそうという結論に達しました。
STM は順序を保証しないので、例えば、ログを残す順序が大切な場合は、Actor を使う事になります。*3

*1:ログをファイルに保存

*2:reply 使ってないのに途中でメッセージが途切れました

*3:もしくは、atomic に入る直前に記録する時間を取得しておいて、参照する際にログを並べ替えるとか…。面倒だ。

Actor + CPS でコールバック関数を隠蔽する

勉強会第20回 - Scala勉強会 in 渋谷で久しぶりに発表するので下準備など。
ちょっとイロイロと手抜きコードですが… runTest メソッドだけ注目して下さい。switch メソッドを評価する度に Actor が切り替わります。

object Trampoline {
  import scala.actors.Actor
  import scala.actors.Actor.actor
  import scala.util.continuations.{reset, shift, cpsParam}

  def main (args: Array[String]) {
    TrampolineActorA.start
    TrampolineActorB.start

    reset(runTest())

    Thread.sleep(100)

    TrampolineActorA.stop
    TrampolineActorB.stop
  }

  def switch(otherSide: Actor): Unit @cpsParam[Unit, Unit] = {
    shift { k: (Unit => Unit) =>
      otherSide ! Execute(k)
    }
  }

  def runTest(): Unit @cpsParam[Unit, Unit] = {
    var n = 0
    println(n)

    switch(TrampolineActorA)
    n = n + 1
    println(n)

    switch(TrampolineActorB)
    n = n + 1
    println(n)

    switch(TrampolineActorA)
    n = n + 1
    println(n)

    switch(TrampolineActorB)
    n = n + 1
    println(n)
  }

  case class  Execute(f: Unit => Unit)
  case object Stop

  object TrampolineActorA extends Actor {
    def act() {
      loop {
        react {
          case Execute(f) => println("A"); f()
          case Stop       => exit()
          case unknown    => println(unknown)
        }
      }
    }

    def stop = this ! Stop
  }

  object TrampolineActorB extends Actor {
    def act() {
      loop {
        react {
          case Execute(f) => println("B"); f()
          case Stop       => exit()
          case unknown    => println(unknown)
        }
      }
    }

    def stop = this ! Stop
  }
}

ちなみに Remote Actor で同様の事を行うと、途中で java.lang.IllegalAccessError が発生するので原因調査中です。このくらいであれば簡単に出来るのですけれど…。

Remote Actor + CPS

下記は、あまり嬉しくない。Remote Actor 間でトランポリンする方法を考え中。うーん、reset と shift 以外の制御構造が欲しいと思うのは私だけ?

object RemoteTest {
  import scala.actors.{Actor, AbstractActor}
  import scala.actors.Actor.actor
  import scala.actors.remote.RemoteActor.{alive, register, select}
  import scala.actors.remote.Node
  import scala.util.continuations.{reset, shift, cpsParam}

  def main (args: Array[String]) {
    if (args.length == 1 && args(0) == "test1") {
      runTest1()
    } else if (args.length == 1 && args(0) == "test2") {
      runTest2()
    } else {
      ServerActor.start
    }
  }

  case class  Update(f: Int => Int)
  case object Show

  def runTest1() {
    def update(remoteActor: AbstractActor)(f: Int => Int): Int = {
      remoteActor ! Update(f)
      remoteActor !? Show match {
        case n: Int => n
        case _      => 0
      }
    }

    actor {
      val remoteActor = selectRemoteActor()
      val remoteUpdate = update(remoteActor) _

      println(remoteUpdate(n => n + 1))
      println(remoteUpdate(n => n + 2))
    }
  }

  def runTest2() {
    def update(remoteActor: AbstractActor)(): Int @cpsParam[Int, Int] = shift { k: (Int => Int) =>
      remoteActor ! Update(k)
      remoteActor !? Show match {
        case n: Int => n
        case _      => 0
      }
    }

    actor {
      val remoteActor = selectRemoteActor()
      val remoteUpdate = update(remoteActor) _

      println(reset(remoteUpdate() * 2))
      println(reset(remoteUpdate() * 3))
    }
  }

  def selectRemoteActor(): AbstractActor = select(new Node("127.0.0.1", 10000), 'server)

  object ServerActor extends Actor {
    var counter = 0

    def act() {
      alive(10000)
      register('server, this)

      println("start server")
      loop {
        react {
          case Update(f) => counter = f(counter)
          case Show      => reply(counter)
          case unknown   => println(unknown)
        }
      }
    }
  }
}

Scala Actor の reply が正しくできない問題は scala.actors.Channel が悪いのかも?

突っ込み大歓迎です。というか助けてください。

環境は、Mac OS 10.6.5, Scala 2.8.1.final, Java 1.6.0_22 となります。
検証用のコードは、Scala STM を試してみる その3 で使ったコードの修正版のこちらとなります。

STM と対比するために Actor のみを用いてカウンタのデモを書いたのですが、300 メッセージを超えた辺りから reply が失敗する状態となってしまいました。そこで reply 時に使われている Channel リストの操作に疑いを持ち、試しに返信用の Channel を作成し、それをメッセージで渡してみました。結果は、残念な事に 300 メッセージを超えた辺りから、Channel に送ったメッセージが取得できなくなりました。*1

最終的に、mailboxSize を確認すると良いのではないか?と考え、試しに下記を試してみたのですが…

object CounterInActor extends Actor {
// ..snip..
  def increment(): Int = {
    if (n < this.mailboxSize) {
      1
    } else {
      this !? Increment match {
        case n: Int => n
        case _      => 0
      }
    }
  }
// ..snip..

n の値を 0 にすると正しく動作しましたw;*2 300 近い Actor が、ほぼ同時にメッセージを送るので、確認した瞬間は 0 or 1 ですが、送り終わった後には 300 近辺の値になっていると思われます。そこで下記のように書き換えてみると…

object CounterInActor extends Actor {
// ..snip..
  def act() = loop {
    react {
      case Increment =>
        println(mailboxSize)
        counter += 1
        reply(counter)
// ..snip..
    }
  }
// ..snip..
}
--<400>--
0
310
309
308
307
306
305
304
303
302
301
300
299
298
385
384
383
382
381
380
379
378
377
376
375
374
373
372
371
370
369
368
367
366
365
364
363
362
361
360
359
358
357
356
// ..snip..

0 から 310 に跳ね上がってました。こうなって来ると、下記の workers を 200 個づつに分割して、間に Sleep を 50ms 入れる等の工夫が必要となります。

  def test (testCases: (String, Supervisor)*) {
// ..snip..
        val results = workers map {
          _.increment()
        } map {
          future => future()
        }
// ..snip..
  }

大元の検証コードに誤りがあると嬉しいのですが…。今の所、reply が必要な場合は、Client となる Actor の並列度に制限を行うか STM の使用を検討する事しか、私には思いつきませんでした。
うーん…、Channel の処理を深追いするしか無いかな。

*1:Future も試しましたが駄目でした。Future も内部的には Channel を使ってるらしいので、当たり前の結果ではありますが…。

*2:n = 1 だと駄目でした。