repl.info

Fluent Bitにmruby Filter Pluginを追加し、フィルタ処理をmrubyで書けるようにする

Fluent Bitという、IoT・組み込み向けのCで書かれた軽量なFluentdがある。Cloud Native Meetup Tokyo #1 で存在を知ったのだが、知ったときに「mruby組み込めそうだな」と思ったのである。Cなので、プラグインを書いて組み込む場合は自分でビルドする必要があるが、mrubyでプラグインを書いて動的に読み込めるならさらに便利になるかもしれない(Goでプラグインを書くこともできるし、Lua Filter Pluginが既にあるが)。ということで興味を持っていたのだけどRubyKaigiなどでばたばたしており、仙台から戻った翌日くらいから少しずつFilter Pluginとして組み込んで昨日くらいにプロトタイプが動いた。Cを書くのが10年ぶりくらいで大変だったけど、楽しかった。

動作光景

なにはともあれどう動くのかを見てみる。まず、以下のようなtest.rbを用意する。fooメソッドの引数はこの3つで固定で、lua filter pluginと同じ。受け取ったレコードを加工して返す。

def foo(tag, timestamp, record)

  puts "record = #{record}"

  {'message' => record['message'].upcase}

end

次にconfigファイル。filter pluginとしてmrubyを指定して、読み込むスクリプトファイルと実行するメソッドを設定している。

[INPUT]

    Name dummy



[FILTER]

    Name mruby

    Match *

    script /path/to/test.rb

    call foo



[OUTPUT]

    Name stdout

    Match *

そして、これを実行すると、加工されたレコードがstdoutに出力されていることがわかる。

⟩ ./bin/fluent-bit --config=fluent-bit.conf

Fluent-Bit v0.14.0

Copyright (C) Treasure Data



[2018/06/12 09:49:22] [info] [engine] started (pid=45274)

record = {"message"=>"dummy"}

record = {"message"=>"dummy"}

record = {"message"=>"dummy"}

record = {"message"=>"dummy"}

[0] dummy.0: [1528764563.000000000, {"message"=>"DUMMY"}]

[1] dummy.0: [1528764564.000000000, {"message"=>"DUMMY"}]

[2] dummy.0: [1528764565.000000000, {"message"=>"DUMMY"}]

[3] dummy.0: [1528764566.000000000, {"message"=>"DUMMY"}]

record = {"message"=>"dummy"}

もちろん、このようなプラグインをCで書くこともできるが、mrubyで書けると便利だろう。

mruby Filter Pluginのメリット

FluentdはRubyでプラグインが書ける。また、DigdagやEmbulkでもRubyを使える(EmbulkはjRubyだっけ)。となるとFluent BitでもRubyが使えると、Rubyを書く人にとってうれしい。プラグイン部分を実行ファイルの外に置けるのも利点だろう。修正の度にビルドしなくてもよいというのはありがたいのではないか。mrbgemsの資産を使えるのも結構便利かもしれない(利用する際にビルドして組み込む必要があるが)。

mruby Filter Pluginのデメリット

mrubyなので、Rubyと違う部分でつまづくことはあるだろう。デメリットか、というとそこまででもないと思うけど。パフォーマンスはどうなんだろうか。測定してみないとわからないが、さすがにCで書いたプラグインに比べると遅くなるか?パフォーマンスが必要な場合はCやGoで書く、という手が使えるのでそこまでデメリットでもなさそう。Lua Filter Pluginと比べるとどうか?という話もある。

実装

Filter Pluginにはcb_init、cb_filter、cb_exitという3つのコールバックがある。fluent-bitの起動時にcb_initでmrb_stateを初期化、スクリプトの読み込みとロードを行う。cb_filterではスクリプト内のメソッドを呼び出して、Parser Pluginから渡されたレコードを処理する。cb_exitではmrb_stateを終了するなど、後片付けを行うという流れになるだろう。filter部分でmrb_stateの初期化やスクリプトの読み込みなどを行うことも可能だが、オーバーヘッドとなるのでinitで行うのがよいだろう。

コード全体はこのdiffを参照。

cb_mruby_init

initはそんなにややこしくない。コメントの通り。

static int cb_mruby_init(struct flb_filter_instance *f_ins,

                         struct flb_config *config,

                         void *data)

{

    struct mruby_filter *ctx;

    struct mf_t *mf;

    mrb_value obj;



    // Create mrb_state

    mf = flb_calloc(1, sizeof(struct mf_t));

    mf->mrb = mrb_open();

    mf->mrb->ud = mf;



    // Create context

    ctx = flb_calloc(1, sizeof(struct mruby_filter));

    ctx->mf = mf;

    ctx->call = flb_filter_get_property("call", f_ins);



    // Load mruby script

    FILE* fp = fopen(flb_filter_get_property("script", f_ins), "r");

    obj = mrb_load_file(mf->mrb, fp);

    ctx->mf->obj = obj;

    fclose(fp);



    // Set context

    flb_filter_set_context(f_ins, ctx);



    return 0;

}

cb_mruby_filter

大まかな流れは以下の3ステップ。

  1. messagepack形式のレコードが引数として渡されるので、これをmrb_value(mrubyのvalur)であるvalueやdouble型のタイムスタンプ、文字列型のタグに変換する。

  2. 次に、設定ファイルで指定したスクリプト内のメソッドを呼び出し、value、タイムスタンプ、タグを引数として渡す。

  3. mrb_valueであるメソッドの実行結果などを次のプラグインに渡すためにmessagepackに変換する。

    static int cb_mruby_filter(void *data, size_t bytes,

                            char *tag, int tag_len,
    
                            void **out_buf, size_t *out_bytes,
    
                            struct flb_filter_instance *f_ins,
    
                            void *filter_context,
    
                            struct flb_config *config)
    

    {

     struct mruby_filter *ctx = filter_context;
    
    
    
     size_t off = 0;
    
     double ts;
    
     struct flb_time t;
    
     msgpack_object *p;
    
     msgpack_sbuffer tmp_sbuf;
    
     msgpack_packer tmp_pck;
    
     msgpack_unpacked result;
    
    
    
     msgpack_sbuffer_init(&tmp_sbuf);
    
     msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
    
    
    
     msgpack_unpacked_init(&result);
    
     while (msgpack_unpack_next(&result, data, bytes, &off)) {
    
         msgpack_packer data_pck;
    
         msgpack_sbuffer data_sbuf;
    
         mrb_state *mrb_state;
    
         mrb_value value;
    
    
    
         mrb_state = ctx->mf->mrb_state;
    
    
    
         msgpack_sbuffer_init(&data_sbuf);
    
         msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write);
    
    
    
         flb_time_pop_from_msgpack(&t, &result, &p);
    
         ts = flb_time_to_double(&t);
    
    
    
         ctx->mf->ts = ts;
    
         ctx->mf->tag = tag;
    
         ctx->mf->record = p;
    
    
    
         value = mrb_funcall(mrb_state, ctx->mf->obj, ctx->call, 3, mrb_str_new_cstr(mrb_state, tag), mrb_float_value(mrb_state, ts), msgpack_obj_to_mrb_value(mrb_state, p));
    
    
    
         msgpack_pack_array(&tmp_pck, 2);
    
         flb_time_from_double(&t, ts);
    
         flb_time_append_to_msgpack(&t, &tmp_pck, 0);
    
         mrb_tommsgpack(mrb_state, value, &tmp_pck);
    
    
    
         msgpack_sbuffer_destroy(&data_sbuf);
    
     }
    
     msgpack_unpacked_destroy(&result);
    
    
    
     *out_buf = tmp_sbuf.data;
    
     *out_bytes = tmp_sbuf.size;
    
    
    
     return FLB_FILTER_MODIFIED;
    

    }

messagepackをmrb_valueに変換するのがmsgpack_obj_to_mrb_value。書きかけで、文字列とマップ以外も実装する必要があるが、雰囲気は伝わるか。

mrb_value msgpack_obj_to_mrb_value(mrb_state *mrb, msgpack_object *record)

{

    int size, i;

    char *s;

    mrb_value mrb_v;



    switch(record->type) {

        case MSGPACK_OBJECT_STR:

            s = flb_malloc(record->via.str.size);

            strncpy(s, record->via.str.ptr, record->via.str.size);

            s[record->via.str.size] = '\0';

            mrb_v = mrb_str_new_cstr(mrb, s);

            break;

        case MSGPACK_OBJECT_MAP:

            size = record->via.map.size;

            if (size != 0) {

                msgpack_object_kv *p = record->via.map.ptr;

                for (i = 0; i < size; i++) {

                    msgpack_object *key = &(p+i)->key;

                    msgpack_object *val = &(p+i)->val;

                    mrb_v = mrb_hash_new(mrb);

                    mrb_hash_set(mrb, mrb_v, msgpack_obj_to_mrb_value(mrb, key), msgpack_obj_to_mrb_value(mrb, val));

                }

            }

            break;

        default:

            break;

    }

    return mrb_v;

}

スクリプトが返すmrb_valueをmessagepackに変換する。

void mrb_tommsgpack(mrb_state *state, mrb_value value, msgpack_packer *pck)

{

    enum mrb_vtype type = mrb_type(value);



    if (mrb_undef_p(value) || mrb_nil_p(value)) {

        printf("undef or nil");

    }



    switch (type) {

        case MRB_TT_STRING: {

            char *c = RSTRING_PTR(value);

            msgpack_pack_str(pck, strlen(c));

            msgpack_pack_str_body(pck, c, strlen(c));

            break;

        }

        case MRB_TT_HASH: {

            mrb_value keys = mrb_hash_keys(state, value);

            int len = RARRAY_LEN(keys);

            msgpack_pack_map(pck, len);

            for (int i = 0; i < len; i++) {

                mrb_value key = mrb_ary_ref(state, keys, i);

                mrb_tommsgpack(state, key, pck);

                mrb_tommsgpack(state, mrb_hash_get(state, value, key), pck);

            }

            break;

        }

    }

}

cb_mruby_exit

closeしたりfreeしたりする。

static int cb_mruby_exit(void *data, struct flb_config *config)

{

    struct mruby_filter *ctx;



    ctx = data;

    mrb_close(ctx->mf->mrb_state);

    free(ctx->mf);

    return 0;

}

個人の感想です

mrubyの組み込み自体はすんなりできた。Fluent Bitのビルドまわりやプラグインの仕組みが綺麗で、開発開始時点でほぼつまづかなかった(cmakeまわりで少し悩んだ)し、何もしない新しいプラグインを追加するだけならすぐにできた。ミドルウェアにmrubyを組み込む方法も非常に参考になった。苦戦したのはmessagepackからmrubyへの変換、その逆の所。おそらく一番苦戦するだろう、と思っていたが予想通りだった。とはいえ、Cに慣れていて、いきなり組み込み始めるのではなくある程度mrubyとmsgpackまわりを事前に調べていればそこまで苦戦はしないのではないかな。

今後について

思いつきで実装を始めたので、プロトイタイプができて結構満足してしまった… Fluent Bit自体をちゃんと使ってみて、必要そうなら機能提案する、というところか。どのくらい需要があるのだろうか?