[Groonga-commit] droonga/fluent-plugin-droonga at bcc71e6 [master] Implement the engine farm function.

Back to archive index

Daijiro MORI null+****@clear*****
Mon Sep 2 19:26:19 JST 2013


Daijiro MORI	2013-09-02 19:26:19 +0900 (Mon, 02 Sep 2013)

  New Revision: bcc71e62734c59f41f01526f8311a081a97ffc57
  https://github.com/droonga/fluent-plugin-droonga/commit/bcc71e62734c59f41f01526f8311a081a97ffc57

  Message:
    Implement the engine farm function.

  Modified files:
    lib/droonga/catalog.rb
    lib/droonga/engine.rb
    lib/droonga/executor.rb
    lib/droonga/proxy.rb
    lib/droonga/server.rb
    lib/fluent/plugin/out_droonga.rb

  Modified: lib/droonga/catalog.rb (+20 -0)
===================================================================
--- lib/droonga/catalog.rb    2013-08-21 13:36:32 +0900 (265b4fb)
+++ lib/droonga/catalog.rb    2013-09-02 19:26:19 +0900 (5ef15de)
@@ -50,6 +50,26 @@ module Droonga
       end
     end
 
+    def get_engines(name)
+      device = @catalog["farms"][name]["device"]
+      pattern = Regexp.new("^#{name}\.")
+      results = {}
+      @catalog["datasets"].each do |key, dataset|
+        dataset["ring"].each do |key, part|
+          part["partitions"].each do |range, partitions|
+            partitions.each do |partition|
+              if partition =~ pattern
+                path = File.join([device, $POSTMATCH, 'db'])
+                options = { :database => path }
+                results[partition] = options
+              end
+            end
+          end
+        end
+      end
+      return results
+    end
+
     def get_routes(name, args)
       routes = []
       dataset = dataset(name)

  Modified: lib/droonga/engine.rb (+9 -4)
===================================================================
--- lib/droonga/engine.rb    2013-08-21 13:36:32 +0900 (42a3982)
+++ lib/droonga/engine.rb    2013-09-02 19:26:19 +0900 (c3e6d30)
@@ -26,7 +26,6 @@ require "droonga/executor"
 module Droonga
   class Engine
     DEFAULT_OPTIONS = {
-      :database   => "droonga/db",
       :queue_name => "DroongaQueue",
       :handlers   => ["proxy"],
       :n_workers  => 1,
@@ -38,6 +37,11 @@ module Droonga
     end
 
     def start
+      if !@options[:database] || @options[:database].empty?
+        name = @options[:name]
+        database = File.join([File.basename(name), 'db'])
+        @options[:database] = database
+      end
       if @options[:n_workers] > 0 || @options[:with_server]
         @message_input, @message_output = IO.pipe
         @message_input.sync = true
@@ -48,6 +52,7 @@ module Droonga
         start_emitter
       else
         @executor = Executor.new(@options)
+        @executor.add_handler("proxy_message") if @options[:name]
       end
     end
 
@@ -63,12 +68,12 @@ module Droonga
       $log.trace("engine: shutdown: done")
     end
 
-    def emit(tag, time, record)
+    def emit(tag, time, record, synchronous=nil)
       $log.trace("tag: <#{tag}>")
       if @executor
-        @executor.dispatch(tag, time, record)
+        @executor.dispatch(tag, time, record, synchronous)
       else
-        @emitter.write(MessagePack.pack([tag, time, record]))
+        @emitter.write(MessagePack.pack([tag, time, record, synchronous]))
         @loop_breaker.signal
       end
     end

  Modified: lib/droonga/executor.rb (+7 -4)
===================================================================
--- lib/droonga/executor.rb    2013-08-21 13:36:32 +0900 (acedc98)
+++ lib/droonga/executor.rb    2013-09-02 19:26:19 +0900 (66935a3)
@@ -72,9 +72,13 @@ module Droonga
       envelope["via"].push(route)
     end
 
-    def dispatch(*message)
-      body, type, arguments = parse_message(message)
-      post_or_push(message, body, "type" => type, "arguments" => arguments)
+    def dispatch(tag, time, record, synchronous=nil)
+      message = [tag, time, record]
+      body, type, arguments = parse_message([tag, time, record])
+      post_or_push(message, body,
+                   "type" => type,
+                   "arguments" => arguments,
+                   "synchronous" => synchronous)
     end
 
     def execute_one
@@ -212,7 +216,6 @@ module Droonga
       @handler_names.each do |handler_name|
         add_handler(handler_name)
       end
-      add_handler("proxy_message")
     end
 
     def find_handler(command)

  Modified: lib/droonga/proxy.rb (+26 -3)
===================================================================
--- lib/droonga/proxy.rb    2013-08-21 13:36:32 +0900 (8138eee)
+++ lib/droonga/proxy.rb    2013-09-02 19:26:19 +0900 (51f9a2e)
@@ -22,6 +22,12 @@ module Droonga
   class Proxy
     attr_reader :collectors
     def initialize(worker, name)
+      @engines = {}
+      Droonga::catalog.get_engines(name).each do |name, options|
+        engine = Droonga::Engine.new(options.merge({:with_server => false}))
+        engine.start
+        @engines[name] = engine
+      end
       @worker = worker
       @name = name
       @collectors = {}
@@ -29,6 +35,12 @@ module Droonga
       @local = Regexp.new("^#{@name}")
     end
 
+    def shutdown
+      @engines.each do |name, engine|
+        engine.shutdown
+      end
+    end
+
     def handle(message, arguments)
       case message
       when Array
@@ -72,6 +84,15 @@ module Droonga
       end
     end
 
+    def deliver(id, route, message, type, synchronous)
+      if id == route
+        post(message, "type" => type, "synchronous"=> synchronous)
+      else
+        envelope =****@worke*****("body" => message, "type" => type)
+        @engines[route].emit('', Time.now.to_f, envelope, synchronous)
+      end
+    end
+
     def post(message, destination)
       @worker.post(message, destination)
     end
@@ -263,9 +284,7 @@ module Droonga
               message["descendants"] = descendants
               message["id"] = @id
             end
-            @proxy.post(message,
-                        "type" => command,
-                        "synchronous"=> synchronous)
+            @proxy.deliver(@id, task["route"], message, command, synchronous)
           end
           return if task["n_of_inputs"] < n_of_expects
           #the task is done
@@ -300,6 +319,10 @@ module Droonga
       @proxy = Droonga::Proxy.new(@worker, @worker.name)
     end
 
+    def shutdown
+      @proxy.shutdown
+    end
+
     command :proxy
     def proxy(request, *arguments)
       @proxy.handle(request, arguments)

  Modified: lib/droonga/server.rb (+1 -0)
===================================================================
--- lib/droonga/server.rb    2013-08-21 13:36:32 +0900 (78b1596)
+++ lib/droonga/server.rb    2013-09-02 19:26:19 +0900 (849cd3a)
@@ -66,6 +66,7 @@ module Droonga
       super
       @message_input = config[:message_input]
       @executor = Executor.new(config)
+      @executor.add_handler("proxy_message") if config[:name]
     end
 
     def before_run

  Modified: lib/fluent/plugin/out_droonga.rb (+1 -1)
===================================================================
--- lib/fluent/plugin/out_droonga.rb    2013-08-21 13:36:32 +0900 (f89205e)
+++ lib/fluent/plugin/out_droonga.rb    2013-09-02 19:26:19 +0900 (84f08ea)
@@ -23,7 +23,7 @@ module Fluent
 
     config_param :name, :string, :default => ""
     config_param :n_workers, :integer, :default => 1
-    config_param :database, :string, :default => "droonga/db"
+    config_param :database, :string, :default => ""
     config_param :queue_name, :string, :default => "DroongaQueue"
     config_param :handlers, :default => [] do |value|
       value.split(/\s*,\s*/)
-------------- next part --------------
HTML����������������������������...
下载 



More information about the Groonga-commit mailing list
Back to archive index