[Groonga-commit] groonga/fluent-plugin-groonga [master] in: use WEBrick::HTTPResponse for non proxy mode

アーカイブの一覧に戻る

Kouhei Sutou null+****@clear*****
Wed Nov 7 22:49:31 JST 2012


Kouhei Sutou	2012-11-07 22:49:31 +0900 (Wed, 07 Nov 2012)

  New Revision: 3f410260627136e96cc9ea8a7f77697e7d9a46f0
  https://github.com/groonga/fluent-plugin-groonga/commit/3f410260627136e96cc9ea8a7f77697e7d9a46f0

  Log:
    in: use WEBrick::HTTPResponse for non proxy mode

  Modified files:
    lib/fluent/plugin/in_groonga.rb

  Modified: lib/fluent/plugin/in_groonga.rb (+67 -20)
===================================================================
--- lib/fluent/plugin/in_groonga.rb    2012-11-07 18:58:59 +0900 (711e24b)
+++ lib/fluent/plugin/in_groonga.rb    2012-11-07 22:49:31 +0900 (a3f227f)
@@ -15,7 +15,12 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "webrick/httputils"
+require "English"
+
+require "webrick/config"
+require "webrick/httprequest"
+require "webrick/httpresponse"
+
 require "http_parser"
 
 module Fluent
@@ -29,8 +34,8 @@ module Fluent
     config_param :protocol, :string, :default => "http"
     config_param :bind, :string, :default => "0.0.0.0"
     config_param :port, :integer, :default => 10041
-    config_param :proxy_protocol, :string, :default => "http"
-    config_param :proxy_host, :string
+    config_param :proxy_protocol, :string, :default => nil
+    config_param :proxy_host, :string, :default => nil
     config_param :proxy_port, :integer, :default => 10041
 
     def configure(conf)
@@ -74,6 +79,8 @@ module Fluent
     end
 
     class HTTPInput
+      include DetachMultiProcessMixin
+
       def initialize(bind, port, proxy_factory)
         @bind = bind
         @port = port
@@ -81,21 +88,27 @@ module Fluent
       end
 
       def start
-        @loop = Coolio::Loop.new
+        listen_socket = TCPServer.new(@bind, @port)
+        detach_multi_process do
+          @loop = Coolio::Loop.new
 
-        @socket = Coolio::TCPServer.new(@host, @port,
-                                        Handler, @loop, @proxy_factory)
-        @loop.attach(@socket)
+          @socket = Coolio::TCPServer.new(listen_socket, nil,
+                                          Handler, @loop, @proxy_factory)
+          @loop.attach(@socket)
 
-        @thread = Thread.new do
-          run
+          @shutdown_notifier = Coolio::AsyncWatcher.new
+          @loop.attach(@shutdown_notifier)
+
+          @thread = Thread.new do
+            run
+          end
         end
       end
 
       def shutdown
-        @loop.watchers.each(&:detach)
         @loop.stop
         @socket.close
+        @shutdown_notifier.signal
         @thread.join
       end
 
@@ -108,17 +121,33 @@ module Fluent
       end
 
       class Handler < Coolio::Socket
+        class << self
+          @@response_config = nil
+          def response_config
+            @@response_config ||= WEBrick::Config::HTTP.dup.update(
+              :Logger => $log
+            )
+          end
+        end
+
         def initialize(socket, loop, proxy_factory)
           super(socket)
-          @socket = socket
           @loop = loop
           @proxy_factory = proxy_factory
+          @completed = false
         end
 
+        alias_method :<<, :write
+
         def on_connect
           @parser = HTTP::Parser.new(self)
-          @proxy = @proxy_factory.connect(@socket)
-          @proxy.attach(@loop) if @proxy
+          @proxy = @proxy_factory.connect(self)
+          if @proxy
+            @proxy.attach(@loop)
+            @response = nil
+          else
+            @response = WEBrick::HTTPResponse.new(self.class.response_config)
+          end
         end
 
         def on_read(data)
@@ -126,6 +155,13 @@ module Fluent
           @proxy.write(data) if @proxy
         end
 
+        def on_write_complete
+          return unless @completed
+          if @response
+            close
+          end
+        end
+
         def on_message_begin
           @body = ""
         end
@@ -147,13 +183,24 @@ module Fluent
         def on_message_complete
           params = WEBrick::HTTPUtils.parse_query(@parser.query_string)
           path_info =****@parse*****_path
-          command = path_info.sub(/\A\/d\//, "")
-          process(command, params, @body)
+          case path_info
+          when /\A\/d\//
+            command = $POSTMATCH
+            process(command, params, @body)
+          else
+            if @response
+              @response.status = "404"
+            end
+          end
+          if @response
+            @response["connection"] = "close"
+            @response.send_response(self)
+          end
+          @completed = true
         end
 
         private
         def process(command, params, body)
-          p command
           case command
           when "load"
             params["data"] = body
@@ -166,17 +213,17 @@ module Fluent
     end
 
     class HTTPGroongaProxy < Coolio::TCPSocket
-      def initialize(socket, client)
+      def initialize(socket, handler)
         super(socket)
-        @client = client
+        @handler = handler
       end
 
       def on_read(data)
-        @client.write(data)
+        @handler.write(data)
       end
 
       def on_close
-        @client.close
+        @handler.close
       end
     end
   end
-------------- next part --------------
HTML����������������������������...
ダウンロード 



More information about the Groonga-commit mailing list
アーカイブの一覧に戻る