Daijiro MORI
null+****@clear*****
Mon Sep 2 19:26:19 JST 2013
Daijiro MORI 2013-09-02 19:26:19 +0900 (Mon, 02 Sep 2013) New Revision: bcc71e62734c59f41f01526f8311a081a97ffc57 https://github.com/droonga/fluent-plugin-droonga/commit/bcc71e62734c59f41f01526f8311a081a97ffc57 Message: Implement the engine farm function. Modified files: lib/droonga/catalog.rb lib/droonga/engine.rb lib/droonga/executor.rb lib/droonga/proxy.rb lib/droonga/server.rb lib/fluent/plugin/out_droonga.rb Modified: lib/droonga/catalog.rb (+20 -0) =================================================================== --- lib/droonga/catalog.rb 2013-08-21 13:36:32 +0900 (265b4fb) +++ lib/droonga/catalog.rb 2013-09-02 19:26:19 +0900 (5ef15de) @@ -50,6 +50,26 @@ module Droonga end end + def get_engines(name) + device = @catalog["farms"][name]["device"] + pattern = Regexp.new("^#{name}\.") + results = {} + @catalog["datasets"].each do |key, dataset| + dataset["ring"].each do |key, part| + part["partitions"].each do |range, partitions| + partitions.each do |partition| + if partition =~ pattern + path = File.join([device, $POSTMATCH, 'db']) + options = { :database => path } + results[partition] = options + end + end + end + end + end + return results + end + def get_routes(name, args) routes = [] dataset = dataset(name) Modified: lib/droonga/engine.rb (+9 -4) =================================================================== --- lib/droonga/engine.rb 2013-08-21 13:36:32 +0900 (42a3982) +++ lib/droonga/engine.rb 2013-09-02 19:26:19 +0900 (c3e6d30) @@ -26,7 +26,6 @@ require "droonga/executor" module Droonga class Engine DEFAULT_OPTIONS = { - :database => "droonga/db", :queue_name => "DroongaQueue", :handlers => ["proxy"], :n_workers => 1, @@ -38,6 +37,11 @@ module Droonga end def start + if !@options[:database] || @options[:database].empty? + name = @options[:name] + database = File.join([File.basename(name), 'db']) + @options[:database] = database + end if @options[:n_workers] > 0 || @options[:with_server] @message_input, @message_output = IO.pipe @message_input.sync = true @@ -48,6 +52,7 @@ module Droonga start_emitter else @executor = Executor.new(@options) + @executor.add_handler("proxy_message") if @options[:name] end end @@ -63,12 +68,12 @@ module Droonga $log.trace("engine: shutdown: done") end - def emit(tag, time, record) + def emit(tag, time, record, synchronous=nil) $log.trace("tag: <#{tag}>") if @executor - @executor.dispatch(tag, time, record) + @executor.dispatch(tag, time, record, synchronous) else - @emitter.write(MessagePack.pack([tag, time, record])) + @emitter.write(MessagePack.pack([tag, time, record, synchronous])) @loop_breaker.signal end end Modified: lib/droonga/executor.rb (+7 -4) =================================================================== --- lib/droonga/executor.rb 2013-08-21 13:36:32 +0900 (acedc98) +++ lib/droonga/executor.rb 2013-09-02 19:26:19 +0900 (66935a3) @@ -72,9 +72,13 @@ module Droonga envelope["via"].push(route) end - def dispatch(*message) - body, type, arguments = parse_message(message) - post_or_push(message, body, "type" => type, "arguments" => arguments) + def dispatch(tag, time, record, synchronous=nil) + message = [tag, time, record] + body, type, arguments = parse_message([tag, time, record]) + post_or_push(message, body, + "type" => type, + "arguments" => arguments, + "synchronous" => synchronous) end def execute_one @@ -212,7 +216,6 @@ module Droonga @handler_names.each do |handler_name| add_handler(handler_name) end - add_handler("proxy_message") end def find_handler(command) Modified: lib/droonga/proxy.rb (+26 -3) =================================================================== --- lib/droonga/proxy.rb 2013-08-21 13:36:32 +0900 (8138eee) +++ lib/droonga/proxy.rb 2013-09-02 19:26:19 +0900 (51f9a2e) @@ -22,6 +22,12 @@ module Droonga class Proxy attr_reader :collectors def initialize(worker, name) + @engines = {} + Droonga::catalog.get_engines(name).each do |name, options| + engine = Droonga::Engine.new(options.merge({:with_server => false})) + engine.start + @engines[name] = engine + end @worker = worker @name = name @collectors = {} @@ -29,6 +35,12 @@ module Droonga @local = Regexp.new("^#{@name}") end + def shutdown + @engines.each do |name, engine| + engine.shutdown + end + end + def handle(message, arguments) case message when Array @@ -72,6 +84,15 @@ module Droonga end end + def deliver(id, route, message, type, synchronous) + if id == route + post(message, "type" => type, "synchronous"=> synchronous) + else + envelope =****@worke*****("body" => message, "type" => type) + @engines[route].emit('', Time.now.to_f, envelope, synchronous) + end + end + def post(message, destination) @worker.post(message, destination) end @@ -263,9 +284,7 @@ module Droonga message["descendants"] = descendants message["id"] = @id end - @proxy.post(message, - "type" => command, - "synchronous"=> synchronous) + @proxy.deliver(@id, task["route"], message, command, synchronous) end return if task["n_of_inputs"] < n_of_expects #the task is done @@ -300,6 +319,10 @@ module Droonga @proxy = Droonga::Proxy.new(@worker, @worker.name) end + def shutdown + @proxy.shutdown + end + command :proxy def proxy(request, *arguments) @proxy.handle(request, arguments) Modified: lib/droonga/server.rb (+1 -0) =================================================================== --- lib/droonga/server.rb 2013-08-21 13:36:32 +0900 (78b1596) +++ lib/droonga/server.rb 2013-09-02 19:26:19 +0900 (849cd3a) @@ -66,6 +66,7 @@ module Droonga super @message_input = config[:message_input] @executor = Executor.new(config) + @executor.add_handler("proxy_message") if config[:name] end def before_run Modified: lib/fluent/plugin/out_droonga.rb (+1 -1) =================================================================== --- lib/fluent/plugin/out_droonga.rb 2013-08-21 13:36:32 +0900 (f89205e) +++ lib/fluent/plugin/out_droonga.rb 2013-09-02 19:26:19 +0900 (84f08ea) @@ -23,7 +23,7 @@ module Fluent config_param :name, :string, :default => "" config_param :n_workers, :integer, :default => 1 - config_param :database, :string, :default => "droonga/db" + config_param :database, :string, :default => "" config_param :queue_name, :string, :default => "DroongaQueue" config_param :handlers, :default => [] do |value| value.split(/\s*,\s*/) -------------- next part -------------- HTML����������������������������... 下载