Kouhei Sutou
null+****@clear*****
Wed Nov 7 22:49:31 JST 2012
Kouhei Sutou 2012-11-07 22:49:31 +0900 (Wed, 07 Nov 2012) New Revision: 3f410260627136e96cc9ea8a7f77697e7d9a46f0 https://github.com/groonga/fluent-plugin-groonga/commit/3f410260627136e96cc9ea8a7f77697e7d9a46f0 Log: in: use WEBrick::HTTPResponse for non proxy mode Modified files: lib/fluent/plugin/in_groonga.rb Modified: lib/fluent/plugin/in_groonga.rb (+67 -20) =================================================================== --- lib/fluent/plugin/in_groonga.rb 2012-11-07 18:58:59 +0900 (711e24b) +++ lib/fluent/plugin/in_groonga.rb 2012-11-07 22:49:31 +0900 (a3f227f) @@ -15,7 +15,12 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "webrick/httputils" +require "English" + +require "webrick/config" +require "webrick/httprequest" +require "webrick/httpresponse" + require "http_parser" module Fluent @@ -29,8 +34,8 @@ module Fluent config_param :protocol, :string, :default => "http" config_param :bind, :string, :default => "0.0.0.0" config_param :port, :integer, :default => 10041 - config_param :proxy_protocol, :string, :default => "http" - config_param :proxy_host, :string + config_param :proxy_protocol, :string, :default => nil + config_param :proxy_host, :string, :default => nil config_param :proxy_port, :integer, :default => 10041 def configure(conf) @@ -74,6 +79,8 @@ module Fluent end class HTTPInput + include DetachMultiProcessMixin + def initialize(bind, port, proxy_factory) @bind = bind @port = port @@ -81,21 +88,27 @@ module Fluent end def start - @loop = Coolio::Loop.new + listen_socket = TCPServer.new(@bind, @port) + detach_multi_process do + @loop = Coolio::Loop.new - @socket = Coolio::TCPServer.new(@host, @port, - Handler, @loop, @proxy_factory) - @loop.attach(@socket) + @socket = Coolio::TCPServer.new(listen_socket, nil, + Handler, @loop, @proxy_factory) + @loop.attach(@socket) - @thread = Thread.new do - run + @shutdown_notifier = Coolio::AsyncWatcher.new + @loop.attach(@shutdown_notifier) + + @thread = Thread.new do + run + end end end def shutdown - @loop.watchers.each(&:detach) @loop.stop @socket.close + @shutdown_notifier.signal @thread.join end @@ -108,17 +121,33 @@ module Fluent end class Handler < Coolio::Socket + class << self + @@response_config = nil + def response_config + @@response_config ||= WEBrick::Config::HTTP.dup.update( + :Logger => $log + ) + end + end + def initialize(socket, loop, proxy_factory) super(socket) - @socket = socket @loop = loop @proxy_factory = proxy_factory + @completed = false end + alias_method :<<, :write + def on_connect @parser = HTTP::Parser.new(self) - @proxy = @proxy_factory.connect(@socket) - @proxy.attach(@loop) if @proxy + @proxy = @proxy_factory.connect(self) + if @proxy + @proxy.attach(@loop) + @response = nil + else + @response = WEBrick::HTTPResponse.new(self.class.response_config) + end end def on_read(data) @@ -126,6 +155,13 @@ module Fluent @proxy.write(data) if @proxy end + def on_write_complete + return unless @completed + if @response + close + end + end + def on_message_begin @body = "" end @@ -147,13 +183,24 @@ module Fluent def on_message_complete params = WEBrick::HTTPUtils.parse_query(@parser.query_string) path_info =****@parse*****_path - command = path_info.sub(/\A\/d\//, "") - process(command, params, @body) + case path_info + when /\A\/d\// + command = $POSTMATCH + process(command, params, @body) + else + if @response + @response.status = "404" + end + end + if @response + @response["connection"] = "close" + @response.send_response(self) + end + @completed = true end private def process(command, params, body) - p command case command when "load" params["data"] = body @@ -166,17 +213,17 @@ module Fluent end class HTTPGroongaProxy < Coolio::TCPSocket - def initialize(socket, client) + def initialize(socket, handler) super(socket) - @client = client + @handler = handler end def on_read(data) - @client.write(data) + @handler.write(data) end def on_close - @client.close + @handler.close end end end -------------- next part -------------- HTML����������������������������... ダウンロード