[Groonga-commit] groonga/fluent-plugin-kotoumi [master] Add out_kotoumi.rb

アーカイブの一覧に戻る

Daijiro MORI null+****@clear*****
Fri Jan 25 16:12:30 JST 2013


Daijiro MORI	2013-01-25 16:12:30 +0900 (Fri, 25 Jan 2013)

  New Revision: 128e091e48c35099ff0573241b062b9cf4b411a1
  https://github.com/groonga/fluent-plugin-kotoumi/commit/128e091e48c35099ff0573241b062b9cf4b411a1

  Log:
    Add out_kotoumi.rb

  Added files:
    lib/fluent/plugin/out_kotoumi.rb

  Added: lib/fluent/plugin/out_kotoumi.rb (+66 -0) 100644
===================================================================
--- /dev/null
+++ lib/fluent/plugin/out_kotoumi.rb    2013-01-25 16:12:30 +0900 (9581f18)
@@ -0,0 +1,66 @@
+# -*- mode: ruby; coding: utf-8 -*-
+
+require 'SocketIO'
+
+module Fluent
+  class KotoumiOutput < Output
+    Plugin.register_output('kotoumi', self)
+
+    require 'fluent/plugin/kotoumi'
+    include Kotoumi
+
+    config_param :database, :string, :default => "kotoumi.db"
+    config_param :queuename, :string, :default => "KotoumiQueue"
+
+    def start
+      super
+      # prefork @workers
+      @session = Session.new(@database, @queuename)
+      @outputs = {}
+    end
+
+    def shutdown
+      super
+      @outputs.each do |dest, socket|
+        socket.disconnect
+      end
+    end
+
+    def emit(tag, es, chain)
+      es.each do |time, record|
+        # Merge it if needed
+        dispatch(tag, time, record)
+      end
+      chain.next
+    end
+
+    def dispatch(tag, time, record)
+      # Post to peers or execute it as needed
+      exec(tag, time, record)
+    end
+
+    def exec(tag, time, record)
+      result =****@sessi*****_message(tag, time, record)
+      if record["replyTo"]
+        post(record["replyTo"], tag, {
+               inReplyTo: record["id"],
+               type: (record["type"] || "") + '.result',
+               body: result
+             })
+      end
+    end
+
+    def post(dest, tag, result)
+      post_socket_io(dest, tag, result)
+    end
+
+    def post_socket_io(dest, tag, result)
+      unless @outputs[dest]
+        uri = 'http://' + dest
+        socket = SocketIO.connect(uri, sync: true)
+        @outputs[dest] = socket
+      end
+      @outputs[dest].emit(tag, result)
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
ダウンロード 



More information about the Groonga-commit mailing list
アーカイブの一覧に戻る