Kouhei Sutou
null+****@clear*****
Wed Feb 26 17:21:40 JST 2014
Kouhei Sutou 2014-02-26 17:21:40 +0900 (Wed, 26 Feb 2014) New Revision: 554e6fec59d5c905e3ea3ba51272033adf1f7cd3 https://github.com/droonga/fluent-plugin-droonga/commit/554e6fec59d5c905e3ea3ba51272033adf1f7cd3 Message: Support sub set of catalog version 2 Modified files: lib/droonga/catalog/version2.rb lib/droonga/catalog_loader.rb test/command/config/default/catalog.json test/command/suite/search/error/unknown-source.expected test/unit/catalog/test_version2.rb test/unit/fixtures/catalog/version2.json Modified: lib/droonga/catalog/version2.rb (+125 -0) =================================================================== --- lib/droonga/catalog/version2.rb 2014-02-26 16:22:00 +0900 (634696e) +++ lib/droonga/catalog/version2.rb 2014-02-26 17:21:40 +0900 (26b41aa) @@ -18,10 +18,135 @@ require "droonga/catalog/base" module Droonga module Catalog class Version2 < Base + def get_partitions(name) + slices(name) + end + + def slices(name) + device = "." + pattern = Regexp.new("^#{name}\.") + results = {} + @data["datasets"].each do |dataset_name, dataset| + n_workers = dataset["nWorkers"] + plugins = dataset["plugins"] + dataset["replicas"].each do |replica| + replica["slices"].each do |slice| + volume_address = slice["volume"]["address"] + if pattern =~ volume_address + path = File.join([device, $POSTMATCH, "db"]) + path = File.expand_path(path, base_path) + options = { + :dataset => dataset_name, + :database => path, + :n_workers => n_workers, + :plugins => plugins + } + results[volume_address] = options + end + end + end + end + results + end + + def get_routes(name, args) + routes = [] + dataset = dataset(name) + case args["type"] + when "broadcast" + replicas = select_replicas(dataset["replicas"], args["replica"]) + replicas.each do |replica| + slices = select_slices(replica) + slices.each do |slice| + routes << slice["volume"]["address"] + end + end + when "scatter" + replicas = select_replicas(dataset["replicas"], args["replica"]) + replicas.each do |replica| + slice = select_slice(replica, args["key"]) + routes << slice["volume"]["address"] + end + end + routes + end + private def validate # TODO: Implement me. end + + def prepare_data + @data["datasets"].each do |name, dataset| + replicas = dataset["replicas"] + replicas.each do |replica| + total_weight = compute_total_weight(replica) + continuum = [] + slices = replica["slices"] + n_partitions = slices.size + slices.each do |slice| + weight = slice["weight"] || default_weight + points = n_partitions * 160 * weight / total_weight + points.times do |point| + hash = Digest::SHA1.hexdigest("#{name}:#{point}") + continuum << [hash[0..7].to_i(16), slice] + end + end + replica["continuum"] = continuum.sort do |a, b| + a[0] - b[0] + end + end + end + end + + def default_weight + 1 + end + + def compute_total_weight(replica) + slices = replica["slices"] + slices.reduce(0) do |result, slice| + result + (slice["weight"] || default_weight) + end + end + + def select_replicas(replicas, how) + case how + when "top" + [replicas.first] + when "random" + [replicas.sample] + when "all" + replicas + end + end + + def select_slices(replica, range=0..-1) + sorted_slices = replica["slices"].sort_by do |slice| + slice["label"] + end + sorted_slices[range] + end + + def select_slice(replica, key) + continuum = replica["continuum"] + return replica["slices"].first unless continuum + + hash = Zlib.crc32(key) + min = 0 + max = continuum.size - 1 + while (min < max) do + index = (min + max) / 2 + value, key = continuum[index] + return key if value == hash + if value > hash + max = index + else + min = index + 1 + end + end + continuum[max][1] + end end end end Modified: lib/droonga/catalog_loader.rb (+1 -0) =================================================================== --- lib/droonga/catalog_loader.rb 2014-02-26 16:22:00 +0900 (ecbfe1b) +++ lib/droonga/catalog_loader.rb 2014-02-26 17:21:40 +0900 (f8c9416) @@ -16,6 +16,7 @@ require "json" require "droonga/catalog/version1" +require "droonga/catalog/version2" module Droonga class CatalogLoader Modified: test/command/config/default/catalog.json (+72 -46) =================================================================== --- test/command/config/default/catalog.json 2014-02-26 16:22:00 +0900 (441018a) +++ test/command/config/default/catalog.json 2014-02-26 17:21:40 +0900 (daa7edd) @@ -1,59 +1,85 @@ { - "version": 1, - "effective_date": "2013-09-01T00:00:00Z", - "zones": ["localhost:23003/droonga"], - "farms": { - "localhost:23003/droonga": { - "device": ".", - "capacity": 10 - } - }, + "version": 2, + "effectiveDate": "2014-02-28T00:00:00Z", "datasets": { "Droonga": { - "workers": 2, + "nWorkers": 3, "plugins": ["groonga", "crud", "search"], - "number_of_replicas": 2, - "number_of_partitions": 2, - "partition_key": "_key", - "date_range": "infinity", - "ring": { - "localhost:23041": { - "weight": 50, - "partitions": { - "2013-09-01": [ - "localhost:23003/droonga.000", - "localhost:23003/droonga.001" - ] - } + "replicas": [ + { + "dimension": "_key", + "slicer": "hash", + "slices": [ + { + "label": "slice000", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.000" + } + }, + { + "label": "slice001", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.001" + } + }, + { + "label": "slice002", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.002" + } + } + ] }, - "localhost:23042": { - "weight": 50, - "partitions": { - "2013-09-01": [ - "localhost:23003/droonga.002", - "localhost:23003/droonga.003" - ] - } + { + "dimension": "_key", + "slicer": "hash", + "slices": [ + { + "label": "slice010", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.010" + } + }, + { + "label": "slice011", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.011" + } + }, + { + "label": "slice012", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.012" + } + } + ] } - } + ] }, "Watch": { - "workers": 2, + "nWorkers": 3, "plugins": ["groonga", "watch", "search", "crud"], - "number_of_replicas": 1, - "number_of_partitions": 1, - "partition_key": "_key", - "date_range": "infinity", - "ring": { - "localhost:23041": { - "weight": 50, - "partitions": { - "2013-09-01": [ - "localhost:23003/droonga.watch" - ] - } + "replicas": [ + { + "dimension": "_key", + "slicer": "hash", + "slices": [ + { + "label": "slice100", + "weight": 50, + "volume": { + "address": "localhost:23003/droonga.watch" + } + } + ] } - } + ] } } } Modified: test/command/suite/search/error/unknown-source.expected (+12 -0) =================================================================== --- test/command/suite/search/error/unknown-source.expected 2014-02-26 16:22:00 +0900 (49d95df) +++ test/command/suite/search/error/unknown-source.expected 2014-02-26 17:21:40 +0900 (1cff434) @@ -38,6 +38,18 @@ } } } + }, + "sources2": { + "statusCode": 404, + "body": { + "name": "UnknownSource", + "message": "Source not found: <unknown> It must be a name of an existing table or another query.", + "detail": { + "unknown-source": { + "source": "unknown" + } + } + } } } } Modified: test/unit/catalog/test_version2.rb (+40 -72) =================================================================== --- test/unit/catalog/test_version2.rb 2014-02-26 16:22:00 +0900 (bbbed24) +++ test/unit/catalog/test_version2.rb 2014-02-26 17:21:40 +0900 (74dd549) @@ -19,10 +19,9 @@ class CatalogTestVersion2 < Test::Unit::TestCase class << self def minimum_data { - "effectiveDate" => "2013-09-01T00:00:00Z", - "zones" => [], - "farms" => {}, - "datasets" => {}, + "effectiveDate" => "2014-02-28T00:00:00Z", + "datasets" => { + }, } end end @@ -36,58 +35,35 @@ class CatalogTestVersion2 < Test::Unit::TestCase Droonga::Catalog::Version2.new(data, path) end - class OptionTest < self - def create_catalog(options) - super(minimum_data.merge("options" => options), "path") - end - - def test_nonexistent - catalog = create_catalog({}) - assert_nil(catalog.option("nonexistent")) - end - - def test_existent - catalog = create_catalog("plugins" => ["crud", "groonga"]) - assert_equal(["crud", "groonga"], - catalog.option("plugins")) - end - end - - class PartitionTest < self + class SliceTest < self def setup data = JSON.parse(File.read(catalog_path)) @catalog = create_catalog(data, catalog_path) end - def test_get_partitions - partitions =****@catal*****_partitions("localhost:23003/test") + def test_slices + slices =****@catal*****("localhost:23003/test") assert_equal({ "localhost:23003/test.000" => { :database => "#{base_path}/000/db", :dataset => "Test", - :plugins => ["for_dataset"], - :n_workers => 0 + :plugins => ["plugin1", "plugin2", "plugin3"], + :n_workers => 4, }, "localhost:23003/test.001" => { :database => "#{base_path}/001/db", :dataset => "Test", - :plugins => ["for_dataset"], - :n_workers => 0 + :plugins => ["plugin1", "plugin2", "plugin3"], + :n_workers => 4, }, "localhost:23003/test.002" => { :database => "#{base_path}/002/db", :dataset => "Test", - :plugins => ["for_dataset"], - :n_workers => 0 - }, - "localhost:23003/test.003" => { - :database => "#{base_path}/003/db", - :dataset => "Test", - :plugins => ["for_dataset"], - :n_workers => 0 + :plugins => ["plugin1", "plugin2", "plugin3"], + :n_workers => 4, }, }, - partitions) + slices) end def fixture_path(base_path) @@ -104,34 +80,26 @@ class CatalogTestVersion2 < Test::Unit::TestCase class PluginsTest < self def setup - @data = minimum_data.merge({ - "zones" => [farm_name], - "farms" => { - farm_name => { - "device" => ".", - }, - }, + custom_data = { "datasets" => { "Droonga" => { - "workers" => 1, - "number_of_partitions" => 1, - "number_of_replicas" => 1, - "date_range" => "infinity", - "partition_key" => "_key", - "plugins" => [], - "ring" => { - "localhost:23041" => { - "weight" => 50, - "partitions" => { - "2014-02-09" => [ - "#{farm_name}.000", - ], - }, + "nWorkers" => 1, + "replicas" => [ + { + "slices" => [ + { + "volume" => { + "address" => "#{farm_name}.000", + }, + }, + ], }, - }, + ], }, }, - }) + + } + @data = minimum_data.merge(custom_data) end def farm_name @@ -140,7 +108,7 @@ class CatalogTestVersion2 < Test::Unit::TestCase def plugins(data) catalog = create_catalog(data, catalog_path) - catalog.get_partitions(farm_name).collect do |partition, options| + catalog.slices(farm_name).collect do |volum_address, options| options[:plugins] end end @@ -155,31 +123,31 @@ class CatalogTestVersion2 < Test::Unit::TestCase end class DataSetTest < self - class RingTest < self + class ReplicaTest < self class TotalWeightTest < self - def test_three_zones - dataset = { - "ring" => { - "zone1" => { + def test_three_slices + replica = { + "slices" => [ + { "weight" => 10, }, - "zone2" => { + { "weight" => 20, }, - "zone3" => { + { "weight" => 30, }, - } + ], } assert_equal(10 + 20 + 30, - total_weight(dataset)) + total_weight(replica)) end private - def total_weight(dataset) + def total_weight(replica) catalog = create_catalog(minimum_data, "base-path") - catalog.send(:compute_total_weight, dataset) + catalog.send(:compute_total_weight, replica) end end end Modified: test/unit/fixtures/catalog/version2.json (+53 -32) =================================================================== --- test/unit/fixtures/catalog/version2.json 2014-02-26 16:22:00 +0900 (a2265e7) +++ test/unit/fixtures/catalog/version2.json 2014-02-26 17:21:40 +0900 (1e4fe75) @@ -1,41 +1,62 @@ { "version": 2, - "effectiveDate": "2013-09-01T00:00:00Z", - "zones": ["localhost:23003/test"], - "farms": { - "localhost:23003/test": { - "device": ".", - "capacity": 10 - } - }, + "effectiveDate": "2014-02-28T00:00:00Z", "datasets": { "Test": { - "workers": 0, - "plugins": ["for_dataset"], - "number_of_replicas": 2, - "number_of_partitions": 2, - "partition_key": "_key", - "date_range": "infinity", - "ring": { - "localhost:23041": { - "weight": 50, - "partitions": { - "2013-09-01": [ - "localhost:23003/test.000", - "localhost:23003/test.001" - ] - } + "nWorkers": 4, + "plugins": [ + "plugin1", + "plugin2", + "plugin3" + ], + "replicas": [ + { + "dimension": "_key", + "slicer": "hash", + "slices": [ + { + "label": "slice00", + "volume": { + "address": "localhost:23003/test.000" + } + }, + { + "label": "slice01", + "volume": { + "address": "localhost:23003/test.001" + } + }, + { + "label": "slice02", + "volume": { + "address": "localhost:23003/test.002" + } + } + ] }, - "localhost:23042": { - "weight": 50, - "partitions": { - "2013-09-01": [ - "localhost:23003/test.002", - "localhost:23003/test.003" - ] - } + { + "slices": [ + { + "label": "slice10", + "volume": { + "address": "localhost:23004/test.010" + } + }, + { + "label": "slice11", + "volume": { + "address": "localhost:23004/test.011" + } + }, + { + "label": "slice12", + "volume": { + "address": "localhost:23004/test.012" + } + } + ] } - } + ] } } } -------------- next part -------------- HTML����������������������������... ダウンロード