Piro / YUKI Hiroshi
null+****@clear*****
Thu Apr 23 04:12:17 JST 2015
Piro / YUKI Hiroshi 2015-04-23 04:12:17 +0900 (Thu, 23 Apr 2015) New Revision: 317cb6c7bd0c07f320c8b3eaebf92e0e6641dfae https://github.com/droonga/droonga-engine/commit/317cb6c7bd0c07f320c8b3eaebf92e0e6641dfae Message: Refresh connections to other nodes from workers, when cluster state is changed Modified files: lib/droonga/command/droonga_engine_worker.rb lib/droonga/dispatcher.rb lib/droonga/engine.rb lib/droonga/farm.rb lib/droonga/forwarder.rb lib/droonga/process_control_protocol.rb lib/droonga/process_supervisor.rb lib/droonga/slice.rb lib/droonga/worker_process_agent.rb Modified: lib/droonga/command/droonga_engine_worker.rb (+7 -0) =================================================================== --- lib/droonga/command/droonga_engine_worker.rb 2015-04-23 03:36:15 +0900 (91589de) +++ lib/droonga/command/droonga_engine_worker.rb 2015-04-23 04:12:17 +0900 (c2bd13a) @@ -164,6 +164,10 @@ module Droonga @loop.stop end + def refresh_node_reference + @forwarder.refresh_all_connections + end + def start_forwarder @forwarder = Forwarder.new(@loop, :auto_close_timeout => @@ -223,6 +227,9 @@ module Droonga @worker_process_agent.on_stop_immediately = lambda do stop_immediately end + @worker_process_agent.on_refresh_node_reference = lambda do + refresh_node_reference + end @worker_process_agent.start @worker_process_agent.ready end Modified: lib/droonga/dispatcher.rb (+4 -0) =================================================================== --- lib/droonga/dispatcher.rb 2015-04-23 03:36:15 +0900 (453ee2e) +++ lib/droonga/dispatcher.rb 2015-04-23 04:12:17 +0900 (0367d02) @@ -96,6 +96,10 @@ module Droonga logger.trace("stop_immediately: done") end + def refresh_node_reference + @farm.refresh_node_reference + end + def process_message(message) logger.trace("process_message: start", :message => message) @message = message Modified: lib/droonga/engine.rb (+3 -0) =================================================================== --- lib/droonga/engine.rb 2015-04-23 03:36:15 +0900 (1b8df34) +++ lib/droonga/engine.rb 2015-04-23 04:12:17 +0900 (1447df8) @@ -50,6 +50,9 @@ module Droonga options[:internal_connection_lifetime]) @dispatcher = create_dispatcher + @cluster.on_change = lambda do + @dispatcher.refresh_node_reference + end end def start Modified: lib/droonga/farm.rb (+6 -0) =================================================================== --- lib/droonga/farm.rb 2015-04-23 03:36:15 +0900 (1f5cf35) +++ lib/droonga/farm.rb 2015-04-23 04:12:17 +0900 (5fd26f1) @@ -88,6 +88,12 @@ module Droonga end end + def refresh_node_reference + @slices.each_value do |slice| + slice.refresh_node_reference + end + end + def process(slice_name, message) unles****@slice*****?(slice_name) raise NoSlice.new(slice_name, :message => message, :slices => @slices.keys) Modified: lib/droonga/forwarder.rb (+12 -0) =================================================================== --- lib/droonga/forwarder.rb 2015-04-23 03:36:15 +0900 (f06d349) +++ lib/droonga/forwarder.rb 2015-04-23 04:12:17 +0900 (bdf33bf) @@ -50,9 +50,11 @@ module Droonga @senders.each_value do |sender| sender.shutdown end + @senders = {} @auto_close_timers.each_value do |timer| timer.detach end + @auto_close_timers = {} logger.trace("shutdown: done") end @@ -72,6 +74,16 @@ module Droonga sender.shutdown @senders.delete(name) end + timer = @auto_close_timers[name] + if timer + timer.detach + @auto_close_timers.delete(name) + end + end + + def refresh_all_connections + shutdown + start end private Modified: lib/droonga/process_control_protocol.rb (+1 -0) =================================================================== --- lib/droonga/process_control_protocol.rb 2015-04-23 03:36:15 +0900 (210cdaa) +++ lib/droonga/process_control_protocol.rb 2015-04-23 04:12:17 +0900 (1507c03) @@ -19,6 +19,7 @@ module Droonga STOP_GRACEFUL = "stop-graceful\n" STOP_IMMEDIATELY = "stop-immediately\n" REFRESH_SELF_REFERENCE = "refresh-self-reference\n" + REFRESH_NODE_REFERENCE = "refresh-node-reference\n" READY = "ready\n" FINISH = "finish\n" Modified: lib/droonga/process_supervisor.rb (+6 -0) =================================================================== --- lib/droonga/process_supervisor.rb 2015-04-23 03:36:15 +0900 (bf64736) +++ lib/droonga/process_supervisor.rb 2015-04-23 04:12:17 +0900 (b3b0fd2) @@ -70,6 +70,12 @@ module Droonga logger.trace("refresh_self_reference: done") end + def refresh_node_reference + logger.trace("refresh_node_reference: start") + @output.write(Messages::REFRESH_NODE_REFERENCE) + logger.trace("refresh_node_reference: done") + end + private def create_input(raw_input) input = Coolio::IO.new(raw_input) Modified: lib/droonga/slice.rb (+6 -0) =================================================================== --- lib/droonga/slice.rb 2015-04-23 03:36:15 +0900 (dd1b9be) +++ lib/droonga/slice.rb 2015-04-23 04:12:17 +0900 (070ad9f) @@ -69,6 +69,12 @@ module Droonga logger.trace("stop_immediately: done") end + def refresh_node_reference + logger.trace("refresh_node_reference: start") + @supervisor.refresh_node_reference if @supervisor + logger.trace("refresh_node_reference: done") + end + def process(message) logger.trace("process: start") @processor.process(message) Modified: lib/droonga/worker_process_agent.rb (+10 -0) =================================================================== --- lib/droonga/worker_process_agent.rb 2015-04-23 03:36:15 +0900 (8f6c29b) +++ lib/droonga/worker_process_agent.rb 2015-04-23 04:12:17 +0900 (7adde99) @@ -77,6 +77,10 @@ module Droonga @on_refresh_self_reference = callback end + def on_refresh_node_reference=(callback) + @on_refresh_node_reference = callback + end + private def create_input(raw_input) @input = Coolio::IO.new(raw_input) @@ -91,6 +95,8 @@ module Droonga on_stop_immediately when Messages::REFRESH_SELF_REFERENCE on_refresh_self_reference + when Messages::REFRESH_NODE_REFERENCE + on_refresh_node_reference end end end @@ -133,6 +139,10 @@ module Droonga @on_refresh_self_reference.call if @on_refresh_self_reference end + def on_refresh_node_reference + @on_refresh_node_reference.call if @on_refresh_node_reference + end + def log_tag "worker_process_agent" end -------------- next part -------------- HTML����������������������������... ダウンロード