Kouhei Sutou
kou****@clear*****
Tue Apr 21 18:50:16 JST 2015
> + class Tag インスタンス化する必要がない(ネームスペースだけ必要)なら classじゃなくてmoduleの方が適切ですよ。 In <52872d1048b5a85bd3a31c4ed7399029a5b56f6a �� jenkins.clear-code.com> "[Groonga-commit] droonga/droonga-engine �� 52872d1 [master] Define serf tags in a place" on Tue, 21 Apr 2015 17:19:31 +0900, YUKI Hiroshi <null+groonga �� clear-code.com> wrote: > YUKI Hiroshi 2015-04-21 17:19:31 +0900 (Tue, 21 Apr 2015) > > New Revision: 52872d1048b5a85bd3a31c4ed7399029a5b56f6a > https://github.com/droonga/droonga-engine/commit/52872d1048b5a85bd3a31c4ed7399029a5b56f6a > > Message: > Define serf tags in a place > > Added files: > lib/droonga/serf/tag.rb > Modified files: > lib/droonga/serf.rb > > Modified: lib/droonga/serf.rb (+21 -26) > =================================================================== > --- lib/droonga/serf.rb 2015-04-21 17:07:45 +0900 (7369213) > +++ lib/droonga/serf.rb 2015-04-21 17:19:31 +0900 (8a0c5c2) > @@ -20,6 +20,7 @@ require "droonga/loggable" > require "droonga/catalog/loader" > require "droonga/node_name" > require "droonga/node_role" > +require "droonga/serf/tag" > require "droonga/serf/downloader" > require "droonga/serf/agent" > require "droonga/serf/command" > @@ -61,9 +62,9 @@ module Droonga > end > > def initialize_tags > - set_tag("type", "engine") > - set_tag("cluster_id", cluster_id) > - set_tag("role", role) > + set_tag(Tag.node_type, "engine") > + set_tag(Tag.node_role, role) > + set_tag(Tag.cluster_id, cluster_id) > end > > def leave > @@ -122,20 +123,20 @@ module Droonga > nodes = {} > unprocessed_messages_existence = {} > current_members.each do |member| > - foreign = member["tags"]["cluster_id"] != current_cluster_id > + foreign = member["tags"][Tag.cluster_id] != current_cluster_id > next if foreign > > member["tags"].each do |key, value| > - next unless key.start_with?(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX) > - node_name = key.sub(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, "") > + next unless Tag.have_unprocessed_messages_tag?(key) > + node_name = Tag.extract_node_name_from_have_unprocessed_messages_tag(key) > next if unprocessed_messages_existence[node_name] > unprocessed_messages_existence[node_name] = value == "true" > end > > nodes[member["name"]] = { > - "type" => member["tags"]["type"], > - "role" => member["tags"]["role"], > - "accept_messages_newer_than" => member["tags"]["accept-newer-than"], > + "type" => member["tags"][Tag.node_type], > + "role" => member["tags"][Tag.node_role], > + "accept_messages_newer_than" => member["tags"][Tag.accept_messages_newer_than], > "live" => member["status"] == "alive", > } > end > @@ -171,44 +172,44 @@ module Droonga > end > > def update_cluster_id > - set_tag("cluster_id", cluster_id) > + set_tag(Tag.cluster_id, cluster_id) > end > > def set_have_unprocessed_messages_for(node_name) > - tag = have_unprocessed_messages_tag_for(node_name) > + tag = Tag.have_unprocessed_messages_tag_for(node_name) > set_tag(tag, true) unless @tags_cache.key?(tag) > end > > def reset_have_unprocessed_messages_for(node_name) > - delete_tag(have_unprocessed_messages_tag_for(node_name)) > + delete_tag(Tag.have_unprocessed_messages_tag_for(node_name)) > end > > def role > - NodeRole.normalize(get_tag("role")) > + NodeRole.normalize(get_tag(Tag.node_role)) > end > > def role=(new_role) > role = NodeRole.normalize(new_role) > - set_tag("role", role) > + set_tag(Tag.node_role, role) > # after that you must run update_cluster_state to update the cluster information cache > role > end > > def last_processed_message_timestamp > - get_tag("last-timestamp") > + get_tag(Tag.last_processed_message_timestamp) > end > > def last_processed_message_timestamp=(timestamp) > - set_tag("last-timestamp", timestamp.to_s) > + set_tag(Tag.last_processed_message_timestamp, timestamp.to_s) > # after that you must run update_cluster_state to update the cluster information cache > end > > def accept_messages_newer_than_timestamp > - get_tag("accept-newer-than") > + get_tag(Tag.accept_messages_newer_than) > end > > def accept_messages_newer_than(timestamp) > - set_tag("accept-newer-than", timestamp.to_s) > + set_tag(Tag.accept_messages_newer_than, timestamp.to_s) > # after that you must run update_cluster_state to update the cluster information cache > end > > @@ -223,13 +224,13 @@ module Droonga > > def ensure_restarted(&block) > start_time = Time.now > - previous_internal_name = get_tag("internal-name") > + previous_internal_name = get_tag(Tag.internal_node_name) > restarted = false > > yield # the given operation must restart the service. > > while Time.now - start_time < CHECK_RESTARTED_TIMEOUT > - restarted = get_tag("internal-name") == previous_internal_name > + restarted = get_tag(Tag.internal_node_name) == previous_internal_name > break if restarted > sleep(CHECK_RESTARTED_INTERVAL) > end > @@ -302,12 +303,6 @@ module Droonga > end > end > > - HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = "buffered-for-" > - > - def have_unprocessed_messages_tag_for(node_name) > - "#{HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX}#{node_name}" > - end > - > def log_tag > "serf" > end > > Added: lib/droonga/serf/tag.rb (+60 -0) 100644 > =================================================================== > --- /dev/null > +++ lib/droonga/serf/tag.rb 2015-04-21 17:19:31 +0900 (564ea55) > @@ -0,0 +1,60 @@ > +# Copyright (C) 2015 Droonga Project > +# > +# This library is free software; you can redistribute it and/or > +# modify it under the terms of the GNU Lesser General Public > +# License version 2.1 as published by the Free Software Foundation. > +# > +# This library is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > +# Lesser General Public License for more details. > +# > +# You should have received a copy of the GNU Lesser General Public > +# License along with this library; if not, write to the Free Software > +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA > + > +module Droonga > + class Serf > + class Tag > + class << self > + def node_type > + "type" > + end > + > + def node_role > + "role" > + end > + > + def internal_node_name > + "internal-name" > + end > + > + def cluster_id > + "cluster_id" > + end > + > + def accept_messages_newer_than > + "accept-newer-than" > + end > + > + def last_processed_message_timestamp > + "last-timestamp" > + end > + > + HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = "buffered-for-" > + > + def have_unprocessed_messages_tag_for(node_name) > + "#{HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX}#{node_name}" > + end > + > + def have_unprocessed_messages_tag?(tag) > + tag.start_with?(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX) > + end > + > + def extract_node_name_from_have_unprocessed_messages_tag(tag) > + tag.sub(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, "") > + end > + end > + end > + end > +end