[Groonga-commit] groonga/fluent-plugin-groonga [master] in: always require real server

Back to archive index

Kouhei Sutou null+****@clear*****
Thu Nov 8 22:15:22 JST 2012


Kouhei Sutou	2012-11-08 22:15:22 +0900 (Thu, 08 Nov 2012)

  New Revision: 40bd35bd639cf9bde420d1b6256c6cdc47e29a5e
  https://github.com/groonga/fluent-plugin-groonga/commit/40bd35bd639cf9bde420d1b6256c6cdc47e29a5e

  Log:
    in: always require real server

  Modified files:
    lib/fluent/plugin/in_groonga.rb
    test/run-test.rb
    test/test_input.rb

  Modified: lib/fluent/plugin/in_groonga.rb (+37 -84)
===================================================================
--- lib/fluent/plugin/in_groonga.rb    2012-11-07 22:49:31 +0900 (a3f227f)
+++ lib/fluent/plugin/in_groonga.rb    2012-11-08 22:15:22 +0900 (b09c613)
@@ -16,10 +16,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "English"
-
-require "webrick/config"
-require "webrick/httprequest"
-require "webrick/httpresponse"
+require "webrick/httputils"
 
 require "http_parser"
 
@@ -34,18 +31,17 @@ module Fluent
     config_param :protocol, :string, :default => "http"
     config_param :bind, :string, :default => "0.0.0.0"
     config_param :port, :integer, :default => 10041
-    config_param :proxy_protocol, :string, :default => nil
-    config_param :proxy_host, :string, :default => nil
-    config_param :proxy_port, :integer, :default => 10041
+    config_param :real_host, :string
+    config_param :real_port, :integer, :default => 10041
 
     def configure(conf)
       super
-      @proxy_factory = ProxyFactory.new(@proxy_protocol, @proxy_host, @proxy_port)
+      repeater_factory = RepeaterFactory.new(@real_host, @real_port)
       case @protocol
       when "http"
-        @input = HTTPInput.new(@bind, @port, @proxy_factory)
+        @input = HTTPInput.new(@bind, @port, repeater_factory)
       when "gqtp"
-        @input = GQTPInput.new(@bind, @port, @proxy_factory)
+        @input = GQTPInput.new(@bind, @port, repeater_factory)
       else
         message = "unknown protocol: <#{@protocol.inspect}>"
         $log.error message
@@ -61,30 +57,39 @@ module Fluent
       @input.shutdown
     end
 
-    class ProxyFactory
-      def initialize(protocol, host, port)
-        @protocol = protocol
+    class RepeaterFactory
+      def initialize(host, port)
         @host = host
         @port = port
       end
 
       def connect(client)
-        case @protocol
-        when "http"
-          HTTPGroongaProxy.connect(@host, @port, client)
-        else
-          nil
-        end
+        Repeater.connect(@host, @port, client)
+      end
+    end
+
+    class Repeater < Coolio::TCPSocket
+      def initialize(socket, handler)
+        super(socket)
+        @handler = handler
+      end
+
+      def on_read(data)
+        @handler.write(data)
+      end
+
+      def on_close
+        @handler.close
       end
     end
 
     class HTTPInput
       include DetachMultiProcessMixin
 
-      def initialize(bind, port, proxy_factory)
+      def initialize(bind, port, repeater_factory)
         @bind = bind
         @port = port
-        @proxy_factory = proxy_factory
+        @repeater_factory = repeater_factory
       end
 
       def start
@@ -92,8 +97,7 @@ module Fluent
         detach_multi_process do
           @loop = Coolio::Loop.new
 
-          @socket = Coolio::TCPServer.new(listen_socket, nil,
-                                          Handler, @loop, @proxy_factory)
+          @socket = Coolio::TCPServer.new(listen_socket, nil, Handler, self)
           @loop.attach(@socket)
 
           @shutdown_notifier = Coolio::AsyncWatcher.new
@@ -112,6 +116,12 @@ module Fluent
         @thread.join
       end
 
+      def create_repeater(client)
+        repeater = @repeater_factory.connect(client)
+        repeater.attach(@loop)
+        repeater
+      end
+
       private
       def run
         @loop.run
@@ -121,45 +131,19 @@ module Fluent
       end
 
       class Handler < Coolio::Socket
-        class << self
-          @@response_config = nil
-          def response_config
-            @@response_config ||= WEBrick::Config::HTTP.dup.update(
-              :Logger => $log
-            )
-          end
-        end
-
-        def initialize(socket, loop, proxy_factory)
+        def initialize(socket, input)
           super(socket)
-          @loop = loop
-          @proxy_factory = proxy_factory
-          @completed = false
+          @input = input
         end
 
-        alias_method :<<, :write
-
         def on_connect
           @parser = HTTP::Parser.new(self)
-          @proxy = @proxy_factory.connect(self)
-          if @proxy
-            @proxy.attach(@loop)
-            @response = nil
-          else
-            @response = WEBrick::HTTPResponse.new(self.class.response_config)
-          end
+          @repeater =****@input*****_repeater(self)
         end
 
         def on_read(data)
           @parser << data
-          @proxy.write(data) if @proxy
-        end
-
-        def on_write_complete
-          return unless @completed
-          if @response
-            close
-          end
+          @repeater.write(data)
         end
 
         def on_message_begin
@@ -167,13 +151,6 @@ module Fluent
         end
 
         def on_headers_complete(headers)
-          expect = nil
-          headers.each do |name, value|
-            case name.downcase
-            when "content-type"
-              @content_type = value
-            end
-          end
         end
 
         def on_body(chunk)
@@ -187,16 +164,7 @@ module Fluent
           when /\A\/d\//
             command = $POSTMATCH
             process(command, params, @body)
-          else
-            if @response
-              @response.status = "404"
-            end
-          end
-          if @response
-            @response["connection"] = "close"
-            @response.send_response(self)
           end
-          @completed = true
         end
 
         private
@@ -211,20 +179,5 @@ module Fluent
         end
       end
     end
-
-    class HTTPGroongaProxy < Coolio::TCPSocket
-      def initialize(socket, handler)
-        super(socket)
-        @handler = handler
-      end
-
-      def on_read(data)
-        @handler.write(data)
-      end
-
-      def on_close
-        @handler.close
-      end
-    end
   end
 end

  Modified: test/run-test.rb (+2 -0)
===================================================================
--- test/run-test.rb    2012-11-07 22:49:31 +0900 (e76e879)
+++ test/run-test.rb    2012-11-08 22:15:22 +0900 (0183bbc)
@@ -17,6 +17,8 @@
 
 # $VERBOSE = true
 
+Thread.abort_on_exception = true
+
 base_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
 lib_dir = File.join(base_dir, "lib")
 test_dir = File.join(base_dir, "test")

  Modified: test/test_input.rb (+42 -1)
===================================================================
--- test/test_input.rb    2012-11-07 22:49:31 +0900 (6dabf72)
+++ test/test_input.rb    2012-11-08 22:15:22 +0900 (e313155)
@@ -18,10 +18,14 @@
 require "time"
 require "cgi/util"
 require "net/http"
+require "webrick/config"
+require "webrick/httpresponse"
 
 require "fluent/test"
 require "fluent/plugin/in_groonga"
 
+require "http_parser"
+
 class GroongaInputTest < Test::Unit::TestCase
   setup
   def setup_fluent
@@ -43,6 +47,41 @@ EOC
   end
 
   class HTTPTest < self
+    setup :before => :append
+    def setup_real_server
+      @real_host = "127.0.0.1"
+      @real_port = 29292
+      @real_server = TCPServer.new(@real_host, @real_port)
+      @repeater = nil
+      response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log)
+      @real_response = WEBrick::HTTPResponse.new(response_config)
+      Thread.new do
+        @repeater = @real_server.accept
+        @real_server.close
+        parser = HTTP::Parser.new
+        parser.on_message_complete = lambda do
+          @real_response.send_response(@repeater)
+          @repeater.close
+        end
+
+        loop do
+          break if****@repea*****?
+          data =****@repea*****(4096)
+          break if data.nil?
+          parser << data
+        end
+      end
+    end
+
+    teardown
+    def teardown_real_server
+      @real_server.close unless @real_server.closed?
+
+      if @repeater and not****@repea*****?
+        @repeater.close
+      end
+    end
+
     def setup
       @host = "127.0.0.1"
       @port = 2929
@@ -56,6 +95,8 @@ EOC
       protocol http
       bind #{@host}
       port #{@port}
+      real_host #{@real_host}
+      real_port #{@real_port}
 EOC
     end
 
@@ -63,7 +104,6 @@ EOC
       @driver.expect_emit("groonga.command.table_create",
                           @now,
                           {"name" => "Users"})
-
       @driver.run do
         get("/d/table_create", "name" => "Users")
         assert_equal("200", @last_response.code)
@@ -92,6 +132,7 @@ EOJ
 
     def test_not_command
       @driver.run do
+        @real_response.status = 404
         get("/index.html")
         assert_equal("404", @last_response.code)
       end
-------------- next part --------------
HTML����������������������������...
下载 



More information about the Groonga-commit mailing list
Back to archive index