Twitterにおけるつぶやきの即時話題推定技術「どたばたかいぎ」の裏側(実装途中)

OverView

最初はPerlで書いてたんですけど、今は主にErlangで書いてあります。TwitterのUser Streamsからつぶやきを取得し、MeCab形態素解析にかけて名詞のみ抽出しています。MeCabは既に強化済です*1
このエントリのお話すること

  1. ErlangからOauthライブラリ使ってUserStreamsにつなぐところ
  2. どたばたかいぎのCore部分
  3. ErlangからMeCabを触るNIFインターフェイスのこと

Erlang内でつかっているもの

べ、べつに http://naoyat.hatenablog.jp/entry/2012/01/04/220639 のエントリの存在に書き終えたあとで気づいたりとかしてないし...車輪の再発明じゃないし...(´;ω;`)

1.ErlangからOauthライブラリ使ってUserStreamsにつなぐところ

何度見たかわからないid:takkkunhttp://d.hatena.ne.jp/takkkun/20091016/1255722692を読みながらhttp://d.hatena.ne.jp/polistes/20111130/1322663545とうまいことつなげる。
つぶやきが流れてきたらjiffyちゃんでdecodeしたのちにコールバック関数CallBack(Bin, Args)でゴニョゴニョできるような設計に。なにか画面出力するときはNotifierに流しこむようにしておきました。じゃないと標準出力が非常に残念なことになる(´・ω・`) 流れてきた欠片を拾うだけのstreamer/2と、それを受け取ってからプロセス生成したりNotifierに伝えるlistenerがあります。要erlang-oauth, jiffy。get_access_token()という関数があるんだけどコピペなので省略。

-module(userstream).
-author("Eric Sartre <siritori@gmail.com>").
-export([start/5, stop/1]).
-define(CONSUMER_KEY, "ConsumerKey").
-define(CONSUMER_SECRET, "ConsmerSecret").

start(AccessToken, AccessTokenSecret, Notifier, CallBack, Args) ->
   Listener = spawn(?MODULE, listener, [Notifier, CallBack, Args]),
   Streamer  = spawn(fun() ->
      Consumer = {?CONSUMER_KEY, ?CONSUMER_SECRET, hmac_sha1},
      Options = [{sync, false}, {stream, self}],
      URL = "https://userstream.twitter.com/2/user.json",
      case oauth:post(URL, [], Consumer, AccessToken, AccessTokenSecret, Options) of
         {ok, RequestId} -> streamer(RequestId, Listener);
         {error, Reason} -> send(Listener, {error, Reason})
      end
   end),
   {ok, Streamer}.

stop(Streamer) ->
   send(Streamer, stop),
   receive {Streamer, stopped} -> stopped end.

streamer(RequestId, Listener) ->
   receive
      {http, {RequestId, stream_start, _Headers}} ->
         send(Listener, started),
         streamer(RequestId, Listener);
      {http, {RequestId, stream, Part}} ->
         case Part of
            <<"\r\n">> -> noop;
            _          -> send(Listener, {stream, Part})
         end,
         streamer(RequestId, Listener);
      {http, {RequestId, {error, Reason}}} ->
         send(Listener, {error, Reason}),
         erlang:exit(stream_disconnected);
      {From, stop} ->
         httpc:cancel_request(RequestId),
         send(Listener, stop),
         send(From, stopped)
   end.

listener(Notifier, CallBack, Args) ->
   receive
      {_Streamer, start} ->
         send(Notifier, start),
         listener(Notifier, CallBack, Args);
      {_Streamer, {stream, Part}} ->
         spawn(fun() ->
            {Data} = jiffy:decode(Part),
            case lists:keyfind(<<"text">>, 1, Data) of
               false -> ok;
               {<<"text">>, Text}  ->
                  CallBack(unicode:characters_to_binary(Text), Args)
            end
         end),
         listener(Notifier, CallBack, Args);
      {_Streamer, {error, Reason}} ->
         send(Notifier, {error, Reason});
      {_Streamer, stop} ->
         send(Notifier, stop)
   end.

send(To, Msg) -> To ! {self(), Msg}.

たとえばこんなふうにつかう。

-module(hoge).
-compile(export_all).
callback(Bin, _Args) -> printer ! {tweet, Bin}.
test() ->
   erlang:register(printer, spawn(?MODULE, printer, [])),
   userstream:start("なんたら", "かんたら", printer, fun ?MODULE:callback/2, []).
printer() ->
   receive 
      {_Listener, start} ->  
         io:format("Info: stream started~n"), 
         printer(); 
      {tweet, Bin} ->  
         io:format("tweet!:~ts~n", [Bin]), 
         printer(); 
      {_Listener, {error, Reason}} ->  
         io:format("Error: ~p~n", [Reason]); 
      {_Listener, stop} ->  
         io:format("Info: stream stopped~n") 
   end.

たぶん一行づつUserStreamが流れるとおもう。

2.どたばたかいぎのCore部分

どたばたかいぎではさっきのuserstreamモジュールをこんなふうに使っています。category.etsには単語のカテゴリのリスト、redirect.etsには単語の言い換えのリスト(ほぼ要素数は1)が格納されてます。

-module(dotabata).
-export([kaigi/0, stop/1, callback/2, printer/0]).

kaigi() ->
   {ok, _} = mecab:init_context(),
   {ok, CategoryTab} = ets:file2tab("category.ets"),
   {ok, RedirectTab} = ets:file2tab("redirect.ets"),
   Tab = {CategoryTab, RedirectTab},
   {ok, AccessToken, AccessTokenSecret} = userstream:get_access_token(),
   register(printer, spawn(fun ?MODULE:printer/0)),
   Streamer = userstream:start(AccessToken, AccessTokenSecret, printer, fun ?MODULE:callback/2, Tab),
   {Streamer, CategoryTab, RedirectTab}.

stop({Streamer, CategoryTab, RedirectTab}) ->
   userstream:stop(Streamer),
   ets:delete(CategoryTab),
   ets:delete(RedirectTab).

printer() ->
   receive
      {_Listener, start} ->
         io:format("Info: stream started~n"),
         printer();
      {tweet, Bin, Tokens} ->
         io:format("tweet!:~ts~n", [Bin]),
         lists:foreach(fun(I) -> io:format("~ts~n", [I]) end, Tokens),
         io:format("~n"),
         printer();
      {_Listener, {error, Reason}} ->
         io:format("Error: ~p~n", [Reason]);
      {_Listener, stop} ->
         io:format("Info: stream stopped~n")
   end.

callback(Bin, {CategoryTab, RedirectTab}) ->
   {ok, Tokens0} = mecab:parse(Bin),
   Tokens = lists:foldl(fun(I, Acc) ->
      case ets:lookup(RedirectTab, I) of
         [] -> [I|Acc];
         [{I, Token}] -> Token ++ Acc
      end
   end, [], Tokens0),
   Categories = lists:foldl(fun(I, Acc) ->
      case ets:lookup(CategoryTab, I) of
         [] -> [I|Acc];
         [{I, Category}] -> [I|Category] ++ Acc
      end
   end, [], Tokens),
   printer ! {tweet, Bin, sets:to_list(sets:from_list(Categories))}.

キモはcallback/2。mecabモジュール(後述)によってつぶやきから名詞のリストを取り出し、redirect.etsによって表記ゆれを吸収しています。その後category.etsによってその上位概念を取得し、すべてリストに追加してuniqueにしたあとprinterに投げて表示させています。

3.ErlangからMeCabを触るNIFインターフェイス

Erlang形態素解析をするためにMeCabさんを使います。NIFをつかってErlangVMにドゴォォォンと載せちゃう。まずは.erlのスケルトン。

-module(mecab).
-export([load/0, init_context/0, parse/1, destroy_context/0]).

load() -> erlang:load_nif("./mecab", 0).
init_context() -> {error, library_not_loaded}.
parse(Bin) when is_binary(Bin) -> {error, library_not_loaded}.
destroy_context() -> {error, library_not_loaded}.

んでもってCをゴリゴリ書く。大枠はこんなかんじ。こう書くとErlangからはfunc0/0とfunc1/3が見えるようになる。

#include "erl_nif.h"
static ERL_NIF_TERM func0_(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
   ・
   ・
}
static ERL_NIF_TERM func1_(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
   ・
   ・   
}

static ErlNifFunc nif_funcs[] = {
   {"func0", 0, func0_},
   {"func1", 3, func1_},
};
ERL_NIF_INIT(mecab, nif_funcs, NULL, NULL, NULL, NULL)

引数でbinaryを指定して受け取って、それを文字列にするあたりのコード。
enif_inspect_binaryでERL_NIF_TERMなbinaryをErlNifBinaryに変換するんですって。ErlNifBinaryは使った後に後片付けが必要。

static ERL_NIF_TERM parse(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
   ErlNifBinary bin;
   enif_inspect_binary(env, argv[0], &bin);
   char *input = (char*)malloc(bin.size+1);
   memcpy(input, bin.data, bin.size);
   input[bin.size] = '\0';
   enif_release_binary(&bin);

逆に文字列('\0'で終わってないけど)からbinaryに変換するコード。ERL_NIF_TERMになったのでもうこいつはVMの管理下にあるのでreleaseはしない。

   ErlNifBinary bin;
   enif_alloc_binary(node->length, &bin);
   memcpy(bin.data, node->surface, node->length);
   ERL_NIF_TERM bin = enif_make_binary(env, &bin);

というわけで全コード。

#include "erl_nif.h"
#include <stdio.h>
#include <stdlib.h>
#include <mecab.h>
#include <string.h>

static mecab_t *mecab = NULL;

static ERL_NIF_TERM tuple2(ErlNifEnv *env, const char *s1, const char *s2) {
   ERL_NIF_TERM t1 = enif_make_atom(env, s1);
   ERL_NIF_TERM t2 = enif_make_atom(env, s2);
   return enif_make_tuple2(env, t1, t2);
}

static ERL_NIF_TERM init_context(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
   if(mecab != NULL) return tuple2(env, "error", "already_initialized");
   if((mecab = mecab_new(0, NULL)) == NULL) {
      return tuple2(env, "error", "failed_to_initialize");
   }
   return tuple2(env, "ok", "initialized");
}

static ERL_NIF_TERM destroy_context(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
   if(mecab != NULL) {
      mecab_destroy(mecab);
      mecab = NULL;
   }
   return tuple2(env, "ok", "destroyed");
}

static ERL_NIF_TERM trace_node_(ErlNifEnv *env, mecab_node_t *node, ERL_NIF_TERM rest) {
   if(node == NULL) return rest;
   if( strstr(node->feature, "代名詞")) return trace_node_(env, node->next, rest);
   if(!strstr(node->feature, "名詞,"))  return trace_node_(env, node->next, rest);
   if( strstr(node->feature, "接尾"))   return trace_node_(env, node->next, rest);
   if( strstr(node->feature, "非自立")) return trace_node_(env, node->next, rest);
   ErlNifBinary bin;
   enif_alloc_binary(node->length, &bin);
   memcpy(bin.data, node->surface, node->length);
   ERL_NIF_TERM tail = enif_make_list_cell(env, enif_make_binary(env, &bin), rest);
   return trace_node_(env, node->next, tail);
}

static ERL_NIF_TERM trace_node(ErlNifEnv *env, mecab_node_t *node) {
   return trace_node_(env, node, enif_make_list(env, 0));
}

static ERL_NIF_TERM parse(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
   if(mecab == NULL) {
      return tuple2(env, "error", "context_not_initialized");
   }
   /* map ErlNifBinary to char string */
   ErlNifBinary bin;
   enif_inspect_binary(env, argv[0], &bin);
   char *input = (char*)malloc(bin.size+1);
   memcpy(input, bin.data, bin.size);
   input[bin.size] = '\0';
   enif_release_binary(&bin);
   /* parse to node*/
   const mecab_node_t *node = mecab_sparse_tonode(mecab, input);
   if(node == NULL) return tuple2(env, "error", "failed_to_parse");
   ERL_NIF_TERM ok = enif_make_atom(env, "ok");
   return enif_make_tuple2(env, ok, trace_node(env, node));
}

static ErlNifFunc nif_funcs[] = {
   {"init_context",    0, init_context},
   {"destroy_context", 0, destroy_context},
   {"parse", 1, parse}
};

ERL_NIF_INIT(mecab, nif_funcs, NULL, NULL, NULL, NULL)

trace_node_で末尾再帰しながらMeCabのnodeリストをErlangのバイナリのリストに変換していってます。ついでに名詞の振り分けとかもここでやっちゃってる。
コンパイルで大コケする。

[eric@TSUBAKI]% gcc -I$ERL_ROOT/usr/include `mecab-config --cflags` `mecab-config --libs` --undefined suppress -flat_namespace -fPIC -shared -o mecab.so mecab.c 

$ERL_ROOTはえりっくさんの環境では/opt/local/lib/erlang/でした。ちなみに便宜上環境変数で表現しただけでこういう環境変数がなにかのお陰でセットされたりすることはありません。
こうやって使う↓

1> l(mecab).
{module,mecab}
2> mecab:load().
ok
3> mecab:init_context().
{ok,initialized}
4> String = unicode:characters_to_binary("Erlang触って睡眠不足いえーい。").
<<69,114,108,97,110,103,232,167,166,227,129,163,227,129,
  166,231,157,161,231,156,160,228,184,141,232,182,179,227,
  129,...>>
5> {ok, L} = mecab:parse(String).
{ok,[<<231,157,161,231,156,160,228,184,141,232,182,179>>,
     <<"Erlang">>]}
6> lists:foreach(fun(I)->io:format("~ts~n", [I]) end, L).
睡眠不足
Erlang
ok
7> 

ツッコミ歓迎

「お前ここ遅くなるぞ」とかいろいろあったら教えて下さい。