Kouhei Sutou
null+****@clear*****
Thu Sep 26 18:00:53 JST 2013
Kouhei Sutou 2013-09-26 18:00:53 +0900 (Thu, 26 Sep 2013) New Revision: 7c50c20c63fee4e2262de9f5fb23a58cfc247005 https://github.com/droonga/fluent-plugin-droonga/commit/7c50c20c63fee4e2262de9f5fb23a58cfc247005 Message: Support n_workers > 0 configuration Modified files: lib/droonga/engine.rb lib/droonga/executor.rb lib/droonga/server.rb lib/droonga/worker.rb Modified: lib/droonga/engine.rb (+5 -55) =================================================================== --- lib/droonga/engine.rb 2013-09-26 17:41:18 +0900 (cd35395) +++ lib/droonga/engine.rb 2013-09-26 18:00:53 +0900 (23d28e6) @@ -28,11 +28,7 @@ module Droonga DEFAULT_OPTIONS = { :queue_name => "DroongaQueue", :n_workers => 0, - :with_server => false } - # TODO: It doesn't work fine when n_workers > 0 && number of databases > 1 - # since more than one ServerEngine instance can't be in a process. - # It causes dump_uncaught_error in the SignalThread. def initialize(options={}) @options = DEFAULT_OPTIONS.merge(options) @@ -43,74 +39,28 @@ module Droonga Droonga::JobQueue.ensure_schema(@options[:database], @options[:queue_name]) end - if @options[:n_workers] > 0 || @options[:with_server] - @message_input, @message_output = IO.pipe - @message_input.sync = true - @message_output.sync = true - start_supervisor - end - if @options[:with_server] - start_emitter - else - @executor = Executor.new(@options) - end + start_supervisor if @options[:n_workers] > 0 + @executor = Executor.new(@options) end def shutdown $log.trace("engine: shutdown: start") - shutdown_emitter if @emitter @executor.shutdown if @executor - if @supervisor - shutdown_supervisor - @message_input.close unless @message_input.closed? - @message_output.close unless @message_output.closed? - end + shutdown_supervisor if @supervisor $log.trace("engine: shutdown: done") end def emit(tag, time, record, synchronous=nil) $log.trace("tag: <#{tag}>") - if @executor - @executor.dispatch(tag, time, record, synchronous) - else - @emitter.write(MessagePack.pack([tag, time, record, synchronous])) - @loop_breaker.signal - end + @executor.dispatch(tag, time, record, synchronous) end private - def start_emitter - @loop = Coolio::Loop.new - @emitter = Coolio::IO.new(@message_output) - @emitter.on_write_complete do - $log.trace("emitter: written") - end - @emitter.attach(@loop) - @loop_breaker = Coolio::AsyncWatcher.new - @loop_breaker.attach(@loop) - @emitter_thread = Thread.new do - @loop.run - end - end - - def shutdown_emitter - $log.trace("emitter: shutdown: start") - @emitter.close - $log.trace("emitter: shutdown: emitter: closed") - @loop.stop - @loop_breaker.signal - $log.trace("emitter: shutdown: loop: stopped") - @emitter_thread.join - $log.trace("emitter: shutdown: done") - end - def start_supervisor - server = @options[:with_server] ? Server : nil - @supervisor = ServerEngine::Supervisor.new(server, Worker) do + @supervisor = ServerEngine::Supervisor.new(Server, Worker) do force_options = { :worker_type => "process", :workers => @options[:n_workers], - :message_input => @message_input, :log_level => $log.level, } @options.merge(force_options) Modified: lib/droonga/executor.rb (+0 -8) =================================================================== --- lib/droonga/executor.rb 2013-09-26 17:41:18 +0900 (bb649b4) +++ lib/droonga/executor.rb 2013-09-26 18:00:53 +0900 (7b274b9) @@ -57,14 +57,6 @@ module Droonga end end - def unblock_queue - 3.times do |i| - super - @queue.unblock - sleep(i ** 2 * 0.1) - end - end - def add_handler(name) plugin = HandlerPlugin.new(name) @handlers << plugin.instantiate(self) Modified: lib/droonga/server.rb (+16 -71) =================================================================== --- lib/droonga/server.rb 2013-09-26 17:41:18 +0900 (78b1596) +++ lib/droonga/server.rb 2013-09-26 18:00:53 +0900 (28cc399) @@ -15,93 +15,38 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "msgpack" -require "cool.io" require "groonga" -require "droonga/executor" - module Droonga module Server - class Receiver - def initialize(input) - @input = input - end - - def run - $log.trace - @io = Coolio::IO.new(@input) - unpacker = MessagePack::Unpacker.new - @io.on_read do |data| - $log.trace("receiver: received: <#{data.bytesize}>") - unpacker.feed_each(data) do |message| - yield message - end - end - @loop = Coolio::Loop.new - @loop.attach(@io) - @loop_breaker = Coolio::AsyncWatcher.new - @loop.attach(@loop_breaker) - @running = true - @loop.run - end - - def stop - unless @running - $log.trace("receiver: stop: not needed") - return - end - - $log.trace("receiver: stop: start") - @io.close - $log.trace("receiver: stop: closed") - @loop.stop - @running = false - @loop_breaker.signal - $log.trace("receiver: stop: done") - end - end - - def initialize - super - @message_input = config[:message_input] - @executor = Executor.new(config) - end - def before_run - @receiver = Receiver.new(@message_input) - @receiver_thread = Thread.new do - @receiver.run do |message| - $log.trace("received: start") - @executor.dispatch(*message) - $log.trace("received: done") - end - end + $log.trace("server: before_run: start") + # TODO: Use JobQueue object + @context = Groonga::Context.new + @database =****@conte*****_database(config[:database]) + @queue = @context[config[:queue_name]] + $log.trace("server: before_run: done") end def after_run $log.trace("server: after_run: start") - - $log.trace("server: after_run: receiver: start") - @receiver_thread.join - $log.trace("server: after_run: receiver: done") - - $log.trace("server: after_run: groonga: start") - @executor.shutdown - $log.trace("server: after_run: groonga: done") - + @queue.close + @database.close + @context.close $log.trace("server: after_run: done") end def stop(stop_graceful) $log.trace("server: stop: start") - $log.trace("server: stop: receiver: stop: start") - @receiver.stop - $log.trace("server: stop: receiver: stop: done") - $log.trace("server: stop: queue: unblock: start") - @executor.unblock_queue + 3.times do |i| + $log.trace("server: stop: queue: unblock: #{i}: start") + super(stop_graceful) + @queue.unblock + sleep(i ** 2 * 0.1) + $log.trace("server: stop: queue: unblock: #{i}: done") + end $log.trace("server: stop: queue: unblock: done") $log.trace("server: stop: done") Modified: lib/droonga/worker.rb (+0 -19) =================================================================== --- lib/droonga/worker.rb 2013-09-26 17:41:18 +0900 (ba5aa95) +++ lib/droonga/worker.rb 2013-09-26 18:00:53 +0900 (54b198f) @@ -42,24 +42,5 @@ module Droonga @running = false $log.trace("worker: stop: done") end - - private - def shutdown_workers - @pool.each do |pid| - Process.kill(:TERM, pid) - end - queue = @context[@queue_name] - 3.times do |i| - break if****@pool*****? - queue.unblock - @pool.reject! do |pid| - not Process.waitpid(pid, Process::WNOHANG).nil? - end - sleep(i ** 2 * 0.1) - end - @pool.each do |pid| - Process.kill(:KILL, pid) - end - end end end -------------- next part -------------- HTML����������������������������... 下载