YUKI Hiroshi
null+****@clear*****
Tue Apr 21 11:43:52 JST 2015
YUKI Hiroshi 2015-04-21 11:43:52 +0900 (Tue, 21 Apr 2015) New Revision: f1f606b947a22c4e90ae6b975d62ab5f9e0fc903 https://github.com/droonga/droonga-engine/commit/f1f606b947a22c4e90ae6b975d62ab5f9e0fc903 Message: Use features of serf module directly Modified files: bin/droonga-engine-absorb-data bin/droonga-engine-join Modified: bin/droonga-engine-absorb-data (+10 -10) =================================================================== --- bin/droonga-engine-absorb-data 2015-04-21 11:43:17 +0900 (84a327a) +++ bin/droonga-engine-absorb-data 2015-04-21 11:43:52 +0900 (f3d6153) @@ -182,14 +182,14 @@ module Droonga :tag => @options.tag) end - def run_remote_command(target, command, options) - serf = Serf.new(target, :verbose => @options.verbose) - serf.send_query(command, options) + def source_node_serf + @source_node_serf ||= Serf.new(source_node.to_s, + :verbose => @options.verbose) end - def tag_value(target, tag) - serf = Serf.new(target, :verbose => @options[:verbose]) - serf.get_tag(tag) + def destination_node_serf + @destination_node_serf ||= Serf.new(destination_node.to_s, + :verbose => @options.verbose) end def absorber @@ -239,7 +239,7 @@ module Droonga puts "" - timestamp = tag_value(source_node.to_s, "last-processed-message-timestamp") + timestamp = source_node_serf.last_processed_message_timestamp unless timestamp $stderr.puts("Couldn't get the time stamp of " + "the last processed message from the source node.") @@ -248,9 +248,9 @@ module Droonga if timestamp and not timestamp.empty? puts "The timestamp of the last processed message in the source node: #{timestamp}" puts "Setting effective message timestamp for the destination node..." - response = run_remote_command(destination_node.to_s, "accept_messages_newer_than", - "node" => destination_node.to_s, - "timestamp" => timestamp) + response = destination_node_serf.send_query("accept_messages_newer_than", + "node" => destination_node.to_s, + "timestamp" => timestamp) end true end Modified: bin/droonga-engine-join (+39 -39) =================================================================== --- bin/droonga-engine-join 2015-04-21 11:43:17 +0900 (175a3ab) +++ bin/droonga-engine-join 2015-04-21 11:43:52 +0900 (f8eee72) @@ -154,6 +154,16 @@ module Droonga :tag => @options[:tag]) end + def source_node_serf + @source_node_serf ||= Serf.new(source_node.to_s, + :verbose => @options.verbose) + end + + def joining_node_serf + @joining_node_serf ||= Serf.new(joining_node.to_s, + :verbose => @options.verbose) + end + def source_cluster_id source_catalog.cluster_id end @@ -179,16 +189,6 @@ module Droonga fetcher.fetch(:dataset => dataset) end - def run_remote_command(target, command, options) - serf = Serf.new(target, :verbose => @options[:verbose]) - serf.send_query(command, options) - end - - def tag_value(target, tag) - serf = Serf.new(target, :verbose => @options[:verbose]) - serf.get_tag(tag) - end - def absorber @absorber ||= prepare_absorber end @@ -221,9 +221,9 @@ module Droonga def set_source_node_role if absorber.source_node_suspendable? puts("Changing role of the source node...") - run_remote_command(source_node.to_s, "change_role", - "node" => source_node.to_s, - "role" => NodeMetadata::Role::ABSORB_SOURCE) + source_node_serf.send_query("change_role", + "node" => source_node.to_s, + "role" => NodeMetadata::Role::ABSORB_SOURCE) wait_until_restarted(source_node) end @source_node_role_changed = true @@ -231,9 +231,9 @@ module Droonga def set_joining_node_role puts("Changing role of the joining node...") - run_remote_command(joining_node.to_s, "change_role", - "node" => joining_node.to_s, - "role" => NodeMetadata::Role::ABSORB_DESTINATION) + joining_node_serf.send_query("change_role", + "node" => joining_node.to_s, + "role" => NodeMetadata::Role::ABSORB_DESTINATION) wait_until_restarted(joining_node) @joining_node_role_changed = true end @@ -241,9 +241,9 @@ module Droonga def reset_source_node_role if absorber.source_node_suspendable? puts("Restoring role of the source node...") - run_remote_command(source_node.to_s, "change_role", - "node" => source_node.to_s, - "role" => NodeMetadata::Role::SERVICE_PROVIDER) + source_node_serf.send_query("change_role", + "node" => source_node.to_s, + "role" => NodeMetadata::Role::SERVICE_PROVIDER) wait_until_restarted(source_node.to_s) end @source_node_role_changed = false @@ -251,20 +251,20 @@ module Droonga def reset_joining_node_role puts("Restoring role of the joining node...") - run_remote_command(joining_node.to_s, "change_role", - "node" => joining_node.to_s, - "role" => NodeMetadata::Role::SERVICE_PROVIDER) + joining_node_serf.send_query("change_role", + "node" => joining_node.to_s, + "role" => NodeMetadata::Role::SERVICE_PROVIDER) wait_until_restarted(joining_node.to_s) @joining_node_role_changed = false end def do_join puts("Joining new replica to the cluster...") - run_remote_command(joining_node.to_s, "join", - "node" => joining_node.to_s, - "type" => "replica", - "source" => source_node.to_s, - "dataset" => dataset) + joining_node_serf.send_query("join", + "node" => joining_node.to_s, + "type" => "replica", + "source" => source_node.to_s, + "dataset" => dataset) wait_until_restarted(joining_node) end @@ -291,7 +291,7 @@ module Droonga end def set_effective_message_timestamp - timestamp = tag_value(source_node.to_s, "last-processed-message-timestamp") + timestamp = source_node_serf.last_processed_message_timestamp unless timestamp $stderr.puts("Couldn't get the time stamp of " + "the last processed message from the source node.") @@ -300,28 +300,28 @@ module Droonga if timestamp and not timestamp.empty? puts "The timestamp of the last processed message in the source node: #{timestamp}" puts "Setting effective message timestamp for the destination node..." - response = run_remote_command(joining_node.to_s, "accept_messages_newer_than", - "node" => joining_node.to_s, - "timestamp" => timestamp) + response = joining_node_serf.send_query("accept_messages_newer_than", + "node" => joining_node.to_s, + "timestamp" => timestamp) end end def register_to_existing_nodes puts("Register new node to existing hosts in the cluster...") - run_remote_command(source_node.to_s, "add_replicas", - "cluster_id" => source_cluster_id, - "dataset" => dataset, - "hosts" => [joining_node.host]) + source_node_serf.send_query("add_replicas", + "cluster_id" => source_cluster_id, + "dataset" => dataset, + "hosts" => [joining_node.host]) wait_until_restarted(source_node) @node_registered = true end def unregister_from_existing_nodes puts("Unregister new node from existing hosts in the cluster...") - run_remote_command(source_node, "remove_replicas", - "cluster_id" => source_cluster_id, - "dataset" => dataset, - "hosts" => [joining_node.host]) + source_node_serf.send_query("remove_replicas", + "cluster_id" => source_cluster_id, + "dataset" => dataset, + "hosts" => [joining_node.host]) wait_until_restarted(source_node) @node_registered = false end -------------- next part -------------- HTML����������������������������... 下载