Kouhei Sutou
null+****@clear*****
Thu Nov 8 22:25:41 JST 2012
Kouhei Sutou 2012-11-08 22:25:41 +0900 (Thu, 08 Nov 2012) New Revision: d6873935d5faec78f85e14466bb172be3ddec90b https://github.com/groonga/fluent-plugin-groonga/commit/d6873935d5faec78f85e14466bb172be3ddec90b Log: in: extract common code Modified files: lib/fluent/plugin/in_groonga.rb Modified: lib/fluent/plugin/in_groonga.rb (+19 -26) =================================================================== --- lib/fluent/plugin/in_groonga.rb 2012-11-08 22:15:22 +0900 (b09c613) +++ lib/fluent/plugin/in_groonga.rb 2012-11-08 22:25:41 +0900 (372502f) @@ -29,24 +29,20 @@ module Fluent end config_param :protocol, :string, :default => "http" - config_param :bind, :string, :default => "0.0.0.0" - config_param :port, :integer, :default => 10041 - config_param :real_host, :string - config_param :real_port, :integer, :default => 10041 def configure(conf) super - repeater_factory = RepeaterFactory.new(@real_host, @real_port) case @protocol when "http" - @input = HTTPInput.new(@bind, @port, repeater_factory) + @input = HTTPInput.new when "gqtp" - @input = GQTPInput.new(@bind, @port, repeater_factory) + @input = GQTPInput.new else message = "unknown protocol: <#{@protocol.inspect}>" $log.error message raise ConfigError, message end + @input.configure(conf) end def start @@ -57,17 +53,6 @@ module Fluent @input.shutdown end - class RepeaterFactory - def initialize(host, port) - @host = host - @port = port - end - - def connect(client) - Repeater.connect(@host, @port, client) - end - end - class Repeater < Coolio::TCPSocket def initialize(socket, handler) super(socket) @@ -83,21 +68,22 @@ module Fluent end end - class HTTPInput + class BaseInput + include Configurable include DetachMultiProcessMixin - def initialize(bind, port, repeater_factory) - @bind = bind - @port = port - @repeater_factory = repeater_factory - end + config_param :bind, :string, :default => "0.0.0.0" + config_param :port, :integer, :default => 10041 + config_param :real_host, :string + config_param :real_port, :integer, :default => 10041 def start listen_socket = TCPServer.new(@bind, @port) detach_multi_process do @loop = Coolio::Loop.new - @socket = Coolio::TCPServer.new(listen_socket, nil, Handler, self) + @socket = Coolio::TCPServer.new(listen_socket, nil, + handler_class, self) @loop.attach(@socket) @shutdown_notifier = Coolio::AsyncWatcher.new @@ -117,7 +103,7 @@ module Fluent end def create_repeater(client) - repeater = @repeater_factory.connect(client) + repeater = Repeater.connect(@real_host, @real_port, client) repeater.attach(@loop) repeater end @@ -129,6 +115,13 @@ module Fluent $log.error "unexpected error", :error => $!.to_s $log.error_backtrace end + end + + class HTTPInput < BaseInput + private + def handler_class + Handler + end class Handler < Coolio::Socket def initialize(socket, input) -------------- next part -------------- HTML����������������������������...ダウンロード