[Groonga-commit] droonga/fluent-plugin-droonga at 554e6fe [master] Support sub set of catalog version 2

アーカイブの一覧に戻る

Kouhei Sutou null+****@clear*****
Wed Feb 26 17:21:40 JST 2014


Kouhei Sutou	2014-02-26 17:21:40 +0900 (Wed, 26 Feb 2014)

  New Revision: 554e6fec59d5c905e3ea3ba51272033adf1f7cd3
  https://github.com/droonga/fluent-plugin-droonga/commit/554e6fec59d5c905e3ea3ba51272033adf1f7cd3

  Message:
    Support sub set of catalog version 2

  Modified files:
    lib/droonga/catalog/version2.rb
    lib/droonga/catalog_loader.rb
    test/command/config/default/catalog.json
    test/command/suite/search/error/unknown-source.expected
    test/unit/catalog/test_version2.rb
    test/unit/fixtures/catalog/version2.json

  Modified: lib/droonga/catalog/version2.rb (+125 -0)
===================================================================
--- lib/droonga/catalog/version2.rb    2014-02-26 16:22:00 +0900 (634696e)
+++ lib/droonga/catalog/version2.rb    2014-02-26 17:21:40 +0900 (26b41aa)
@@ -18,10 +18,135 @@ require "droonga/catalog/base"
 module Droonga
   module Catalog
     class Version2 < Base
+      def get_partitions(name)
+        slices(name)
+      end
+
+      def slices(name)
+        device = "."
+        pattern = Regexp.new("^#{name}\.")
+        results = {}
+        @data["datasets"].each do |dataset_name, dataset|
+          n_workers = dataset["nWorkers"]
+          plugins = dataset["plugins"]
+          dataset["replicas"].each do |replica|
+            replica["slices"].each do |slice|
+              volume_address = slice["volume"]["address"]
+              if pattern =~ volume_address
+                path = File.join([device, $POSTMATCH, "db"])
+                path = File.expand_path(path, base_path)
+                options = {
+                  :dataset => dataset_name,
+                  :database => path,
+                  :n_workers => n_workers,
+                  :plugins => plugins
+                }
+                results[volume_address] = options
+              end
+            end
+          end
+        end
+        results
+      end
+
+      def get_routes(name, args)
+        routes = []
+        dataset = dataset(name)
+        case args["type"]
+        when "broadcast"
+          replicas = select_replicas(dataset["replicas"], args["replica"])
+          replicas.each do |replica|
+            slices = select_slices(replica)
+            slices.each do |slice|
+              routes << slice["volume"]["address"]
+            end
+          end
+        when "scatter"
+          replicas = select_replicas(dataset["replicas"], args["replica"])
+          replicas.each do |replica|
+            slice = select_slice(replica, args["key"])
+            routes << slice["volume"]["address"]
+          end
+        end
+        routes
+      end
+
       private
       def validate
         # TODO: Implement me.
       end
+
+      def prepare_data
+        @data["datasets"].each do |name, dataset|
+          replicas = dataset["replicas"]
+          replicas.each do |replica|
+            total_weight = compute_total_weight(replica)
+            continuum = []
+            slices = replica["slices"]
+            n_partitions = slices.size
+            slices.each do |slice|
+              weight = slice["weight"] || default_weight
+              points = n_partitions * 160 * weight / total_weight
+              points.times do |point|
+                hash = Digest::SHA1.hexdigest("#{name}:#{point}")
+                continuum << [hash[0..7].to_i(16), slice]
+              end
+            end
+            replica["continuum"] = continuum.sort do |a, b|
+              a[0] - b[0]
+            end
+          end
+        end
+      end
+
+      def default_weight
+        1
+      end
+
+      def compute_total_weight(replica)
+        slices = replica["slices"]
+        slices.reduce(0) do |result, slice|
+          result + (slice["weight"] || default_weight)
+        end
+      end
+
+      def select_replicas(replicas, how)
+        case how
+        when "top"
+          [replicas.first]
+        when "random"
+          [replicas.sample]
+        when "all"
+          replicas
+        end
+      end
+
+      def select_slices(replica, range=0..-1)
+        sorted_slices = replica["slices"].sort_by do |slice|
+          slice["label"]
+        end
+        sorted_slices[range]
+      end
+
+      def select_slice(replica, key)
+        continuum = replica["continuum"]
+        return replica["slices"].first unless continuum
+
+        hash = Zlib.crc32(key)
+        min = 0
+        max = continuum.size - 1
+        while (min < max) do
+          index = (min + max) / 2
+          value, key = continuum[index]
+          return key if value == hash
+          if value > hash
+            max = index
+          else
+            min = index + 1
+          end
+        end
+        continuum[max][1]
+      end
     end
   end
 end

  Modified: lib/droonga/catalog_loader.rb (+1 -0)
===================================================================
--- lib/droonga/catalog_loader.rb    2014-02-26 16:22:00 +0900 (ecbfe1b)
+++ lib/droonga/catalog_loader.rb    2014-02-26 17:21:40 +0900 (f8c9416)
@@ -16,6 +16,7 @@
 require "json"
 
 require "droonga/catalog/version1"
+require "droonga/catalog/version2"
 
 module Droonga
   class CatalogLoader

  Modified: test/command/config/default/catalog.json (+72 -46)
===================================================================
--- test/command/config/default/catalog.json    2014-02-26 16:22:00 +0900 (441018a)
+++ test/command/config/default/catalog.json    2014-02-26 17:21:40 +0900 (daa7edd)
@@ -1,59 +1,85 @@
 {
-  "version": 1,
-  "effective_date": "2013-09-01T00:00:00Z",
-  "zones": ["localhost:23003/droonga"],
-  "farms": {
-    "localhost:23003/droonga": {
-      "device": ".",
-      "capacity": 10
-    }
-  },
+  "version": 2,
+  "effectiveDate": "2014-02-28T00:00:00Z",
   "datasets": {
     "Droonga": {
-      "workers": 2,
+      "nWorkers": 3,
       "plugins": ["groonga", "crud", "search"],
-      "number_of_replicas": 2,
-      "number_of_partitions": 2,
-      "partition_key": "_key",
-      "date_range": "infinity",
-      "ring": {
-        "localhost:23041": {
-          "weight": 50,
-          "partitions": {
-            "2013-09-01": [
-              "localhost:23003/droonga.000",
-              "localhost:23003/droonga.001"
-            ]
-          }
+      "replicas": [
+        {
+          "dimension": "_key",
+          "slicer": "hash",
+          "slices": [
+            {
+              "label": "slice000",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.000"
+              }
+            },
+            {
+              "label": "slice001",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.001"
+              }
+            },
+            {
+              "label": "slice002",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.002"
+              }
+            }
+          ]
         },
-        "localhost:23042": {
-          "weight": 50,
-          "partitions": {
-            "2013-09-01": [
-              "localhost:23003/droonga.002",
-              "localhost:23003/droonga.003"
-            ]
-          }
+        {
+          "dimension": "_key",
+          "slicer": "hash",
+          "slices": [
+            {
+              "label": "slice010",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.010"
+              }
+            },
+            {
+              "label": "slice011",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.011"
+              }
+            },
+            {
+              "label": "slice012",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.012"
+              }
+            }
+          ]
         }
-      }
+      ]
     },
     "Watch": {
-      "workers": 2,
+      "nWorkers": 3,
       "plugins": ["groonga", "watch", "search", "crud"],
-      "number_of_replicas": 1,
-      "number_of_partitions": 1,
-      "partition_key": "_key",
-      "date_range": "infinity",
-      "ring": {
-        "localhost:23041": {
-          "weight": 50,
-          "partitions": {
-            "2013-09-01": [
-              "localhost:23003/droonga.watch"
-            ]
-          }
+      "replicas": [
+        {
+          "dimension": "_key",
+          "slicer": "hash",
+          "slices": [
+            {
+              "label": "slice100",
+              "weight": 50,
+              "volume": {
+                "address": "localhost:23003/droonga.watch"
+              }
+            }
+          ]
         }
-      }
+      ]
     }
   }
 }

  Modified: test/command/suite/search/error/unknown-source.expected (+12 -0)
===================================================================
--- test/command/suite/search/error/unknown-source.expected    2014-02-26 16:22:00 +0900 (49d95df)
+++ test/command/suite/search/error/unknown-source.expected    2014-02-26 17:21:40 +0900 (1cff434)
@@ -38,6 +38,18 @@
             }
           }
         }
+      },
+      "sources2": {
+        "statusCode": 404,
+        "body": {
+          "name": "UnknownSource",
+          "message": "Source not found: <unknown> It must be a name of an existing table or another query.",
+          "detail": {
+            "unknown-source": {
+              "source": "unknown"
+            }
+          }
+        }
       }
     }
   }

  Modified: test/unit/catalog/test_version2.rb (+40 -72)
===================================================================
--- test/unit/catalog/test_version2.rb    2014-02-26 16:22:00 +0900 (bbbed24)
+++ test/unit/catalog/test_version2.rb    2014-02-26 17:21:40 +0900 (74dd549)
@@ -19,10 +19,9 @@ class CatalogTestVersion2 < Test::Unit::TestCase
   class << self
     def minimum_data
       {
-        "effectiveDate" => "2013-09-01T00:00:00Z",
-        "zones" => [],
-        "farms" => {},
-        "datasets" => {},
+        "effectiveDate" => "2014-02-28T00:00:00Z",
+        "datasets" => {
+        },
       }
     end
   end
@@ -36,58 +35,35 @@ class CatalogTestVersion2 < Test::Unit::TestCase
     Droonga::Catalog::Version2.new(data, path)
   end
 
-  class OptionTest < self
-    def create_catalog(options)
-      super(minimum_data.merge("options" => options), "path")
-    end
-
-    def test_nonexistent
-      catalog = create_catalog({})
-      assert_nil(catalog.option("nonexistent"))
-    end
-
-    def test_existent
-      catalog = create_catalog("plugins" => ["crud", "groonga"])
-      assert_equal(["crud", "groonga"],
-                   catalog.option("plugins"))
-    end
-  end
-
-  class PartitionTest < self
+  class SliceTest < self
     def setup
       data = JSON.parse(File.read(catalog_path))
       @catalog = create_catalog(data, catalog_path)
     end
 
-    def test_get_partitions
-      partitions =****@catal*****_partitions("localhost:23003/test")
+    def test_slices
+      slices =****@catal*****("localhost:23003/test")
       assert_equal({
                      "localhost:23003/test.000" => {
                        :database  => "#{base_path}/000/db",
                        :dataset   => "Test",
-                       :plugins   => ["for_dataset"],
-                       :n_workers => 0
+                       :plugins   => ["plugin1", "plugin2", "plugin3"],
+                       :n_workers => 4,
                      },
                      "localhost:23003/test.001" => {
                        :database  => "#{base_path}/001/db",
                        :dataset   => "Test",
-                       :plugins   => ["for_dataset"],
-                       :n_workers => 0
+                       :plugins   => ["plugin1", "plugin2", "plugin3"],
+                       :n_workers => 4,
                      },
                      "localhost:23003/test.002" => {
                        :database  => "#{base_path}/002/db",
                        :dataset   => "Test",
-                       :plugins   => ["for_dataset"],
-                       :n_workers => 0
-                     },
-                     "localhost:23003/test.003" => {
-                       :database  => "#{base_path}/003/db",
-                       :dataset   => "Test",
-                       :plugins   => ["for_dataset"],
-                       :n_workers => 0
+                       :plugins   => ["plugin1", "plugin2", "plugin3"],
+                       :n_workers => 4,
                      },
                    },
-                   partitions)
+                   slices)
     end
 
     def fixture_path(base_path)
@@ -104,34 +80,26 @@ class CatalogTestVersion2 < Test::Unit::TestCase
 
     class PluginsTest < self
       def setup
-        @data = minimum_data.merge({
-          "zones" => [farm_name],
-          "farms" => {
-            farm_name => {
-              "device" => ".",
-            },
-          },
+        custom_data = {
           "datasets" => {
             "Droonga" => {
-              "workers" => 1,
-              "number_of_partitions" => 1,
-              "number_of_replicas" => 1,
-              "date_range" => "infinity",
-              "partition_key" => "_key",
-              "plugins" => [],
-              "ring" => {
-                "localhost:23041" => {
-                  "weight" =>  50,
-                  "partitions" => {
-                    "2014-02-09" => [
-                      "#{farm_name}.000",
-                    ],
-                  },
+              "nWorkers" => 1,
+              "replicas" => [
+                {
+                  "slices" => [
+                    {
+                      "volume" => {
+                        "address" => "#{farm_name}.000",
+                      },
+                    },
+                  ],
                 },
-              },
+              ],
             },
           },
-        })
+
+        }
+        @data = minimum_data.merge(custom_data)
       end
 
       def farm_name
@@ -140,7 +108,7 @@ class CatalogTestVersion2 < Test::Unit::TestCase
 
       def plugins(data)
         catalog = create_catalog(data, catalog_path)
-        catalog.get_partitions(farm_name).collect do |partition, options|
+        catalog.slices(farm_name).collect do |volum_address, options|
           options[:plugins]
         end
       end
@@ -155,31 +123,31 @@ class CatalogTestVersion2 < Test::Unit::TestCase
   end
 
   class DataSetTest < self
-    class RingTest < self
+    class ReplicaTest < self
       class TotalWeightTest < self
-        def test_three_zones
-          dataset = {
-            "ring" => {
-              "zone1" => {
+        def test_three_slices
+          replica = {
+            "slices" => [
+              {
                 "weight" => 10,
               },
-              "zone2" => {
+              {
                 "weight" => 20,
               },
-              "zone3" => {
+              {
                 "weight" => 30,
               },
-            }
+            ],
           }
           assert_equal(10 + 20 + 30,
-                       total_weight(dataset))
+                       total_weight(replica))
         end
 
         private
-        def total_weight(dataset)
+        def total_weight(replica)
           catalog = create_catalog(minimum_data,
                                    "base-path")
-          catalog.send(:compute_total_weight, dataset)
+          catalog.send(:compute_total_weight, replica)
         end
       end
     end

  Modified: test/unit/fixtures/catalog/version2.json (+53 -32)
===================================================================
--- test/unit/fixtures/catalog/version2.json    2014-02-26 16:22:00 +0900 (a2265e7)
+++ test/unit/fixtures/catalog/version2.json    2014-02-26 17:21:40 +0900 (1e4fe75)
@@ -1,41 +1,62 @@
 {
   "version": 2,
-  "effectiveDate": "2013-09-01T00:00:00Z",
-  "zones": ["localhost:23003/test"],
-  "farms": {
-    "localhost:23003/test": {
-      "device": ".",
-      "capacity": 10
-    }
-  },
+  "effectiveDate": "2014-02-28T00:00:00Z",
   "datasets": {
     "Test": {
-      "workers": 0,
-      "plugins": ["for_dataset"],
-      "number_of_replicas": 2,
-      "number_of_partitions": 2,
-      "partition_key": "_key",
-      "date_range": "infinity",
-      "ring": {
-        "localhost:23041": {
-          "weight": 50,
-          "partitions": {
-            "2013-09-01": [
-              "localhost:23003/test.000",
-              "localhost:23003/test.001"
-            ]
-          }
+      "nWorkers": 4,
+      "plugins": [
+        "plugin1",
+        "plugin2",
+        "plugin3"
+      ],
+      "replicas": [
+        {
+          "dimension": "_key",
+          "slicer": "hash",
+          "slices": [
+            {
+              "label": "slice00",
+              "volume": {
+                "address": "localhost:23003/test.000"
+              }
+            },
+            {
+              "label": "slice01",
+              "volume": {
+                "address": "localhost:23003/test.001"
+              }
+            },
+            {
+              "label": "slice02",
+              "volume": {
+                "address": "localhost:23003/test.002"
+              }
+            }
+	  ]
         },
-        "localhost:23042": {
-          "weight": 50,
-          "partitions": {
-            "2013-09-01": [
-              "localhost:23003/test.002",
-              "localhost:23003/test.003"
-            ]
-          }
+        {
+          "slices": [
+            {
+              "label": "slice10",
+              "volume": {
+                "address": "localhost:23004/test.010"
+              }
+            },
+            {
+              "label": "slice11",
+              "volume": {
+                "address": "localhost:23004/test.011"
+              }
+            },
+            {
+              "label": "slice12",
+              "volume": {
+                "address": "localhost:23004/test.012"
+              }
+            }
+	  ]
         }
-      }
+      ]
     }
   }
 }
-------------- next part --------------
HTML����������������������������...
ダウンロード 



More information about the Groonga-commit mailing list
アーカイブの一覧に戻る