YUKI Hiroshi
null+****@clear*****
Thu Dec 18 18:14:28 JST 2014
YUKI Hiroshi 2014-12-18 18:14:28 +0900 (Thu, 18 Dec 2014) New Revision: b3078ff2defb8f83637f7b68a346a43265e613bb https://github.com/droonga/droonga-engine/commit/b3078ff2defb8f83637f7b68a346a43265e613bb Message: Implement ForwardBuffer Added files: lib/droonga/forward_buffer.rb Modified files: lib/droonga/engine_state.rb lib/droonga/forwarder.rb Modified: lib/droonga/engine_state.rb (+17 -3) =================================================================== --- lib/droonga/engine_state.rb 2014-12-18 17:20:47 +0900 (f11ccdb) +++ lib/droonga/engine_state.rb 2014-12-18 18:14:28 +0900 (d7c2ee8) @@ -41,7 +41,9 @@ module Droonga @internal_name = internal_name @sessions = {} @current_id = 0 - @forwarder = Forwarder.new(@loop, :buffering => true) + @forwarder = Forwarder.new(@loop, + :buffering => true, + :engine_state => self) @replier = Replier.new(@forwarder) @on_ready = nil @on_finish = nil @@ -65,6 +67,18 @@ module Droonga route.start_with?(@name) or route.start_with?(@internal_name) end + def unwritable_node?(node_name) + case node_status.role + when NodeStatus::Role::SERVICE_PROVIDER + absorb_source_nodes.include?(node_name) or + absorb_destination_nodes.include?(node_name) + when NodeStatus::Role::ABSORB_SOURCE + absorb_destination_nodes.include?(node_name) + else + false + end + end + def farm_path(route) if /\A[^:]+:\d+\/[^.]+/ =~ route name = $MATCH @@ -129,7 +143,7 @@ module Droonga if @live_nodes_list @live_nodes_list.absorb_source_nodes else - all_nodes + [] end end @@ -137,7 +151,7 @@ module Droonga if @live_nodes_list @live_nodes_list.absorb_destination_nodes else - all_nodes + [] end end Added: lib/droonga/forward_buffer.rb (+87 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/forward_buffer.rb 2014-12-18 18:14:28 +0900 (d88a24f) @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +require "fileutils" +require "json" +require "pathname" + +require "droonga/loggable" +require "droonga/path" +require "droonga/safe_file_writer" + +module Droonga + class ForwardBuffer + include Loggable + + SUFFIX = ".json" + + def initialize(node_name, params) + @node_name = node_name + @forwarder = params[:forwarder] + + @data_directory = Path.intentional_buffer + "#{@node_name}" + FileUtils.mkdir_p(@data_directory.to_s) + end + + def add(receiver, message, command, arguments, options) + logger.trace("add: start") + buffered_message = { + "receiver" => receiver, + "message" => message, + "command" => command, + "arguments" => arguments, + "options" => options, + } + SafeFileWriter.write(file_path) do |output, file| + output.puts(JSON.generate(buffered_message)) + end + logger.trace("add: done") + end + + def resume + logger.trace("resume: start") + Pathname.glob("#{@data_directory}/*#{SUFFIX}").collect do |buffered_message_path| + output(buffered_message_path) + end + logger.trace("resume: done") + end + + def empty? + @data_directory.children.empty? + end + + private + def output(buffered_message_path) + file_contents = Pathname(buffered_message_path).read + buffered_message = JSON.parse(file_contents) + @forwarder.output(buffered_message["receiver"], + buffered_message["message"], + buffered_message["command"], + buffered_message["arguments"], + buffered_message["options"]) + FileUtils.rm_f(buffered_message_path.to_s) + end + + def file_path(timestamp=Time.now) + @data_directory + "#{time_stamp.iso8601(6)}.#{SUFFIX}" + end + + def log_tag + "[#{Process.ppid}] forward-buffer" + end + end +end Modified: lib/droonga/forwarder.rb (+35 -2) =================================================================== --- lib/droonga/forwarder.rb 2014-12-18 17:20:47 +0900 (52e938b) +++ lib/droonga/forwarder.rb 2014-12-18 18:14:28 +0900 (a3a95eb) @@ -19,6 +19,7 @@ require "droonga/loggable" require "droonga/path" require "droonga/event_loop" require "droonga/buffered_tcp_socket" +require "droonga/forward_buffer" require "droonga/fluent_message_sender" module Droonga @@ -28,6 +29,8 @@ module Droonga def initialize(loop, options={}) @loop = loop @buffering = options[:buffering] + @engine_state = options[:engine_state] + @buffers = {} @senders = {} end @@ -50,11 +53,15 @@ module Droonga command = destination["type"] receiver = destination["to"] arguments = destination["arguments"] - output(receiver, message, command, arguments) + buffered_output(receiver, message, command, arguments) logger.trace("forward: done") end def resume + resume_from_accidents + end + + def resume_from_accidents return unless Path.accidental_buffer.exist? Pathname.glob("#{Path.accidental_buffer}/*") do |path| next unless path.directory? @@ -83,7 +90,6 @@ module Droonga end end - private def output(receiver, message, command, arguments, options={}) logger.trace("output: start") if not receiver.is_a?(String) or not command.is_a?(String) @@ -95,6 +101,7 @@ module Droonga unless receiver =~ /\A(.*):(\d+)\/(.*?)(\?.+)?\z/ raise "format: hostname:port/tag(?params)" end + host = $1 port = $2 tag = $3 @@ -120,6 +127,27 @@ module Droonga logger.trace("output: done") end + private + def buffered_output(receiver, message, command, arguments, options={}) + receiver_is_node = (receiver =~ /\A([^:]+:\d+\/[^\.]+)/) + node_name = $1 + unless receiver_is_node + output(receiver, message, command, arguments, options) + return + end + + buffer = buffer_for(node_name) + if @engine_state and + @engine_state.unwritable_node?(node_name) + buffer.add(receiver, message, command, arguments, options) + elsif buffer.empty? + output(receiver, message, command, arguments, options) + else + buffer.add(receiver, message, command, arguments, options) + buffer.resume + end + end + def find_sender(host, port, params) connection_id = extract_connection_id(params) destination = "#{host}:#{port}" @@ -147,6 +175,11 @@ module Droonga sender end + def buffer_for(node_name) + @buffers[node_name] ||= ForwardBuffer.new(node_name, + :forwarder => self) + end + def log_tag "[#{Process.ppid}] forwarder" end -------------- next part -------------- HTML����������������������������... 下载