Unite を始めてみた(ack と組み合わせて使う方法など…)
Vim の Plugin を更新がてら、Yokohama.vim #0 の頃から気になっていた Unite に入門してみました。
基本設定
一先ずインストール後に下記のようなキーマッピングにしてみました。
nmap ;; :Unite nmap ;s :Unite source<CR> nmap ;b :Unite buffer<CR> nmap ;f :Unite file<CR> nmap ;r :Unite ref/
';' を全て Unite に捧げてます。使用頻度の高い source, buffer, file 以外は、試行錯誤中です。
Unite file でカレントファイルと同じディレクトリのファイル一覧を表示してくれるので、':e' の使用頻度が減り、下記を削除できました。
autocmd BufNewFile,BufRead,BufEnter * exec ':lcd ' . expand("%:p:h")
ちなみに、Unite + vim-ref の組み合わせは、最高に使い勝手が良いので、Vim をご使用の Erlanger や Perl Mongers は是非ご利用を…。
ack 用の設定
好みの問題ですが、grep より ack の方が好きなので、unite-qf と組み合わせて、下記のようにしてみました。
set grepprg=ack\ -a\ --nocolor nmap ;q :Unite qf<CR> nmap ;/ :grep \| Unite qf<LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT><LEFT>
注意点として、ack は、カレントディレクトリ配下を検索範囲とするので、前述の autocmd で lcd を実行する設定を入れないと、カレントファイルが存在するディレクトリ以下を検索範囲にしません。
毎度の如く突っ込み大歓迎。
neocomplcache + snipMate をやめて neocomplcache 単体に切り替える
久しぶりに neocomplcache を git pull したら snipMate の multiple snippets*1 に対応していたので、neocomplcache 単体に切り替えてみました。
これで、snipMate_complete.vim は、お払い箱です。
.vimrc に追加した内容
始めに、snipMate の TAB キーを模倣するため、マッピングを追加*2
imap <expr><TAB> neocomplcache#sources#snippets_complete#expandable() ? "\<Plug>(neocomplcache_snippets_expand)" : pumvisible() ? "\<C-n>" : "\<TAB>" smap <expr><TAB> neocomplcache#sources#snippets_complete#expandable() ? "\<Plug>(neocomplcache_snippets_expand)" : pumvisible() ? "\<C-n>" : "\<TAB>"
次に、snippets の保存ディレクトリを snipMate に合わせる*3
let g:neocomplcache_snippets_dir = $HOME . '/.vim/snippets'
最後に、snippets 内で `Filename()` を使いたかったので snipMate から移植
fun! Filename(...) let filename = expand('%:t:r') if filename == '' | return a:0 == 2 ? a:2 : '' | endif return !a:0 || a:1 == '' ? filename : substitute(a:1, '$1', filename, 'g') endf
*1:同じトリガーキーを複数登録できる機能
*2:smap を加えないと、デフォルト値で良い場合に TAB で移動できない
*3:neocomplcache_snippets_dir は、カンマ区切りで複数指定可能
他のプロセスから送られてきたメッセージを誤受信しないよう制限する
刺身☆ブーメランのはてなダイアリー 2011/01/12(Wed) への返信です。
元コードの wait_server:client/3 は、receive で From を束縛しているので、この状態だとメッセージの選択受信になりません。From を Server_Pid に変更すると、ただしく動作すると思います*1。
蛇足ですが、client を spawn する件と make_ref/0 を使う件に対応した添削コードを下記に掲載しておきます*2。
-module(wait_server). -export([spawn_server/0, spawn_client/3]). -export([server/0, client/3]). -export([test/0]). spawn_server() -> spawn(wait_server, server, []). spawn_client(Server, Time, Req) -> spawn(wait_server, client, [Server, Time, Req]). server() -> receive {Client, Ref, Time, Req} -> timer:sleep(Time), Client ! {self(), Ref, Req}; stop -> exit(normal); Unknown -> io:format("Server: received unknown message: ~p~n", [Unknown]) after 60000 -> io:format("Server: timeout~n"), exit(normal) end, server(). client(Server, Time, Req) -> Ref = make_ref(), Server ! {self(), Ref, Time, Req}, receive {Server, Ref, Res} -> io:format("Client: received ~p from ~p~n", [Res, Server]), {ok, Res} after Time + 100 -> io:format("Client: timeout~n"), {ng, timeout} end. test() -> Data = {foo, bar, baz}, Server = spawn_server(), {ok, Data} = client(Server, 100, Data), Client = spawn_client(Server, 100, Data), lists:foreach( fun (_) -> Client ! ignoring end, lists:seq(1, 10) ), timer:sleep(100), Server ! stop, ok.
1> wait_server:test(). Client: received {foo,bar,baz} from <0.33.0> Client: received {foo,bar,baz} from <0.33.0> ok 2> q().
Scala STM を試してみる その4 (Actor vs Actor + STM その2)
その1、その2、その3 に引き続き、今回は reply が不要なパターン*1で試してみました。
import scala.concurrent.stm._ import scala.actors.{Actor, Exit} import scala.actors.Actor.actor import scala.actors.Actor.State.{New, Terminated} import System.currentTimeMillis import java.io.{File, PrintWriter} trait EnsureStartActor { def ensureStart(actor: Actor) { actor.getState match { case New => actor.start case Terminated => actor.restart case _ => } } } object SampleForSTM4 extends EnsureStartActor { def main (args: Array[String]) { Tester.start } } object Tester extends Actor with EnsureStartActor { def act() { test( "actor only " -> new Supervisor with UseActor, "actor with stm" -> new Supervisor with UseSTM ) LoggerInActor.close() LoggerInSTM.close() } def test (testCases: (String, Supervisor)*) { val tryCount = 10 val testData = 1 to 256 map {n => "a"} mkString 100 to 400 by 50 foreach { workersCount => println("--<actor = %s, try count = %s>--".format(workersCount, tryCount)) testCases foreach { testCase => val (message, actor) = testCase ensureStart(actor) val workers = actor.startChildren(workersCount) Counter.enroll(this, workersCount * tryCount) val startTime = currentTimeMillis workers foreach {_.write(tryCount, testData)} receive { case Finish => case unknown => println("Tester: unknown message [%s], ignoring" format unknown) } println("%s [%s]ms".format(message, currentTimeMillis - startTime)) actor.stop } } } } trait WorkerFactory { def makeWorker(): Worker = new Worker() def getLogger(): Option[Actor] = None } trait UseSTM extends WorkerFactory { override def makeWorker(): Worker = new Worker() with CanLoggingWithSTM } trait UseActor extends WorkerFactory { override def makeWorker(): Worker = new Worker() with CanLoggingWithActor override def getLogger(): Option[Actor] = Some(LoggerInActor) } class Supervisor() extends Actor with EnsureStartActor with WorkerFactory { case class StartChildren(workersCount: Int) case object Stop trapExit = true def act() = loop { react { case Exit(child: Actor, 'normal) => case Exit(child: Actor, reason) => restartChild(child) case StartChildren(workersCount) => startLogger() reply(startWorkers(workersCount)) case Stop => exit("stop") case unknown => println("Supervisor: unknown message [%s], ignoring" format unknown) } } def restartChild(child: Actor) { link(child) child.restart } def startLogger() { getLogger() match { case Some(logger) => ensureStart(logger) link(logger) case None => } } def startWorkers(workersCount: Int): Array[Worker] = { 1 to workersCount map {count => val worker = makeWorker() link(worker) worker.start worker } toArray } def startChildren(workersCount: Int): List[Worker] = { val arrayWorker = this !? StartChildren(workersCount) match { case workers: Array[Worker] => workers case _ => Array() } arrayWorker.toList } def stop = this ! Stop } trait CanLogging { def writeLog(line: String) {} } trait CanLoggingWithSTM extends CanLogging { override def writeLog(line: String) {LoggerInSTM.write(line)} } trait CanLoggingWithActor extends CanLogging { override def writeLog(line: String) {LoggerInActor.write(line)} } class Worker() extends Actor with CanLogging { case class WriteLog(tryCount: Int, line: String) def act() = loop { react { case WriteLog(tryCount, line) => 1 to tryCount foreach {n => writeLog(line)} case unknown => println("Worker: unknown message [%s], ignoring" format unknown) } } def write(tryCount: Int, line: String) { this ! WriteLog(tryCount, line) } } trait HasWriter { var hasWriter: Option[PrintWriter] = None def open(name: String) { hasWriter = Some(new PrintWriter(new File("./%s.log" format name))) } def close(): Unit = hasWriter match { case Some(writer) => writer.close case _ => } def puts(line: String): Unit = hasWriter match { case Some(writer) => writer.println(line) case _ => } } object LoggerInSTM extends HasWriter { open("stm") val refLock = Ref(false) def write(line: String) { atomic { implicit txn => val lock = refLock() if (lock) retry refLock() = true } puts(line) Counter.increment atomic { implicit txn => refLock() = false } } } object LoggerInActor extends Actor with HasWriter { case class WriteLog(line: String) open("actor") def act() = loop { react { case WriteLog(line) => puts(line) Counter.increment case unknown => println("LoggerInActor: unknown message [%s], ignoring" format unknown) } } def write(line: String) { this ! WriteLog(line) } } case object Finish object Counter { var hasReceiver: Option[Actor] = None var replyCount = 0 val refCounter = Ref(0) def increment() { atomic { implicit txn => refCounter() = refCounter() + 1 } sendFinish() } def sendFinish() { if (replyCount != 0 && replyCount <= refCounter.single()) { hasReceiver match { case Some(receiver) => receiver ! Finish case _ => } } } def enroll(setReceiver: Actor, setReplyCount: Int) { hasReceiver = Some(setReceiver) replyCount = setReplyCount atomic { implicit txn => refCounter() = 0 } } }
--<actor = 100, try count = 10>-- actor only [206]ms actor with stm [146]ms --<actor = 150, try count = 10>-- actor only [184]ms actor with stm [232]ms --<actor = 200, try count = 10>-- actor only [247]ms actor with stm [54]ms --<actor = 250, try count = 10>-- actor only [166]ms actor with stm [58]ms --<actor = 300, try count = 10>-- actor only [223]ms actor with stm [92]ms --<actor = 350, try count = 10>-- actor only [112]ms actor with stm [47]ms --<actor = 400, try count = 10>-- actor only [106]ms actor with stm [62]ms
結果は毎回違うので何とも言えませんが、多分… STM を使った方が性能が良いです。
内部で終了を判定するカウンタを利用していますが、これを Actor で作ると変な挙動*2を示したので STM に切り替えたら正常に動作するようになりました。また、起動する Actor を 3000 とかにすると、やはり変な挙動になるので 400 に抑えてます。
Erlang と同じノリで多数の Actor を起動したり、多数のメッセージを送ると安定しないので、Scala では、なるべく STM で代用できるところは STM を使った方が良さそうという結論に達しました。
STM は順序を保証しないので、例えば、ログを残す順序が大切な場合は、Actor を使う事になります。*3
Actor + CPS でコールバック関数を隠蔽する
勉強会第20回 - Scala勉強会 in 渋谷で久しぶりに発表するので下準備など。
ちょっとイロイロと手抜きコードですが… runTest メソッドだけ注目して下さい。switch メソッドを評価する度に Actor が切り替わります。
object Trampoline { import scala.actors.Actor import scala.actors.Actor.actor import scala.util.continuations.{reset, shift, cpsParam} def main (args: Array[String]) { TrampolineActorA.start TrampolineActorB.start reset(runTest()) Thread.sleep(100) TrampolineActorA.stop TrampolineActorB.stop } def switch(otherSide: Actor): Unit @cpsParam[Unit, Unit] = { shift { k: (Unit => Unit) => otherSide ! Execute(k) } } def runTest(): Unit @cpsParam[Unit, Unit] = { var n = 0 println(n) switch(TrampolineActorA) n = n + 1 println(n) switch(TrampolineActorB) n = n + 1 println(n) switch(TrampolineActorA) n = n + 1 println(n) switch(TrampolineActorB) n = n + 1 println(n) } case class Execute(f: Unit => Unit) case object Stop object TrampolineActorA extends Actor { def act() { loop { react { case Execute(f) => println("A"); f() case Stop => exit() case unknown => println(unknown) } } } def stop = this ! Stop } object TrampolineActorB extends Actor { def act() { loop { react { case Execute(f) => println("B"); f() case Stop => exit() case unknown => println(unknown) } } } def stop = this ! Stop } }
ちなみに Remote Actor で同様の事を行うと、途中で java.lang.IllegalAccessError が発生するので原因調査中です。このくらいであれば簡単に出来るのですけれど…。
Remote Actor + CPS
下記は、あまり嬉しくない。Remote Actor 間でトランポリンする方法を考え中。うーん、reset と shift 以外の制御構造が欲しいと思うのは私だけ?
object RemoteTest { import scala.actors.{Actor, AbstractActor} import scala.actors.Actor.actor import scala.actors.remote.RemoteActor.{alive, register, select} import scala.actors.remote.Node import scala.util.continuations.{reset, shift, cpsParam} def main (args: Array[String]) { if (args.length == 1 && args(0) == "test1") { runTest1() } else if (args.length == 1 && args(0) == "test2") { runTest2() } else { ServerActor.start } } case class Update(f: Int => Int) case object Show def runTest1() { def update(remoteActor: AbstractActor)(f: Int => Int): Int = { remoteActor ! Update(f) remoteActor !? Show match { case n: Int => n case _ => 0 } } actor { val remoteActor = selectRemoteActor() val remoteUpdate = update(remoteActor) _ println(remoteUpdate(n => n + 1)) println(remoteUpdate(n => n + 2)) } } def runTest2() { def update(remoteActor: AbstractActor)(): Int @cpsParam[Int, Int] = shift { k: (Int => Int) => remoteActor ! Update(k) remoteActor !? Show match { case n: Int => n case _ => 0 } } actor { val remoteActor = selectRemoteActor() val remoteUpdate = update(remoteActor) _ println(reset(remoteUpdate() * 2)) println(reset(remoteUpdate() * 3)) } } def selectRemoteActor(): AbstractActor = select(new Node("127.0.0.1", 10000), 'server) object ServerActor extends Actor { var counter = 0 def act() { alive(10000) register('server, this) println("start server") loop { react { case Update(f) => counter = f(counter) case Show => reply(counter) case unknown => println(unknown) } } } } }
Scala Actor の reply が正しくできない問題は scala.actors.Channel が悪いのかも?
突っ込み大歓迎です。というか助けてください。
環境は、Mac OS 10.6.5, Scala 2.8.1.final, Java 1.6.0_22 となります。
検証用のコードは、Scala STM を試してみる その3 で使ったコードの修正版のこちらとなります。
STM と対比するために Actor のみを用いてカウンタのデモを書いたのですが、300 メッセージを超えた辺りから reply が失敗する状態となってしまいました。そこで reply 時に使われている Channel リストの操作に疑いを持ち、試しに返信用の Channel を作成し、それをメッセージで渡してみました。結果は、残念な事に 300 メッセージを超えた辺りから、Channel に送ったメッセージが取得できなくなりました。*1
最終的に、mailboxSize を確認すると良いのではないか?と考え、試しに下記を試してみたのですが…
object CounterInActor extends Actor { // ..snip.. def increment(): Int = { if (n < this.mailboxSize) { 1 } else { this !? Increment match { case n: Int => n case _ => 0 } } } // ..snip..
n の値を 0 にすると正しく動作しましたw;*2 300 近い Actor が、ほぼ同時にメッセージを送るので、確認した瞬間は 0 or 1 ですが、送り終わった後には 300 近辺の値になっていると思われます。そこで下記のように書き換えてみると…
object CounterInActor extends Actor { // ..snip.. def act() = loop { react { case Increment => println(mailboxSize) counter += 1 reply(counter) // ..snip.. } } // ..snip.. }
--<400>-- 0 310 309 308 307 306 305 304 303 302 301 300 299 298 385 384 383 382 381 380 379 378 377 376 375 374 373 372 371 370 369 368 367 366 365 364 363 362 361 360 359 358 357 356 // ..snip..
0 から 310 に跳ね上がってました。こうなって来ると、下記の workers を 200 個づつに分割して、間に Sleep を 50ms 入れる等の工夫が必要となります。
def test (testCases: (String, Supervisor)*) { // ..snip.. val results = workers map { _.increment() } map { future => future() } // ..snip.. }
大元の検証コードに誤りがあると嬉しいのですが…。今の所、reply が必要な場合は、Client となる Actor の並列度に制限を行うか STM の使用を検討する事しか、私には思いつきませんでした。
うーん…、Channel の処理を深追いするしか無いかな。