diff --git a/TASK4.md b/TASK4.md index 9b7e6f4..53aec1f 100644 --- a/TASK4.md +++ b/TASK4.md @@ -1,20 +1,21 @@ ## Задание 4 -Необходимо реализовать `KeyValueApi` с функцией разделения пространства хранимых пар ключ-значение по кластеру. +Необходимо реализовать `KeyValueApi` с функцией распределения хранимых пар ключ-значение по кластеру. Подобное разделение называется партиционированием: все множество ключей делится на части - партиции, для хранилища в каждый момент известно на каком из составляющих его Key-value Persistent Storage Unit находится часть. Составляющие партицированный кластер ноды принято называть шардами, партиции иногда называют [виртуальными нодами](https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeDistribute.html). Последнее -происходит если число партиций сильно больше числа шардов, тогда один шард обрабатывает ______. +происходит если число партиций сильно больше числа шардов, тогда один шард обрабатывает множество партиций. В данном +задании, для упрощения, предполагается что шард обслуживает одну партицию. -Для облегчения задачи поставляется интерфейс `Partitioner`, описывающий преобразование ключа в партицию, и три его реализации: +Для облегчения задачи поставляется интерфейс `Partitioner`, описывающий преобразование ключа в партицию (для данного +задания это имя шарда), и три его реализации: - `FirstLetterPartitioner`, на основе интервалов начальных символов ключа - `ModNPartitioner`, на основе хеш-кода ключа и деления по модулю - `ConsistentHashMd5Partitioner`, на основе [консистентного хеширования](https://en.wikipedia.org/wiki/Consistent_hashing) Партицированный `KeyValueApi`, который требуется реализовать, имеет статическую конфигурацию: -- список шардов `List[KeyValueApi]`, составляющих кластер. Каждый шард обслуживает одну или более партиции, -использование имени шарда в качестве имени партиции допустимо +- список шардов `List[KeyValueApi]`, составляющих кластер - `timeout` - максимальное время выполнения операции с шардом -- `partitioner` - алгоритм определения партиции из ключа (один из указанных выше) +- `partitioner` - алгоритм определения партиции-шарда из ключа (один из указанных выше) Логика определения шарда, с которой нужно производить операции для конкретного ключа, содержится в надстройке-координаторе. Координатор находится на каждом из шардов, клиент, использующий партицированное @@ -24,42 +25,44 @@ - cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner) - cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ModNPartitioner) -Кластер версии 2 моделирует отказ шарда node1 в кластере версии 1, при этом работоспособность node1 не нарушается. В -cluster2 ключи, ранее принадлежавшие node1, должны быть перераспределены между node0 и node2. Важной составляющей +Кластер с номером 2 моделирует отказ шарда node1 в кластере с номером 1, при этом работоспособность node1 не нарушается +. В cluster2 ключи, ранее принадлежавшие node1, должны быть перераспределены между node0 и node2. Важной составляющей практического задания является проверка в тестах количества ключей, перемещенных в результате -"удаления"/"добавления" нод в кластере (в идеальном случае перемещается не более K/N ключей, где K - +"удаления" нод в кластере (в идеальном случае перемещается не более K/N ключей, где K - общее их число, N - число нод). Для практического задания значения в парах key-value не играют существенной роли, -рекомендуется вариант когда в значении содержится байты адреса шарда/партиции. +рекомендуется вариант когда в значении содержатся байтовое представление ключа. ### Требуется: -- синхронизировать {your-awesome-team-fork-repo} c upstream для получения обновленных файлов https://help.github -.com/articles/syncing-a-fork/, устранить конфликты в результате merge +- синхронизировать {your-awesome-team-fork-repo} c upstream для получения обновленных файлов https://help.github.com/articles/syncing-a-fork/, устранить конфликты в результате merge - сделать бранч `csc-bdse-task4`, где будет находиться сдаваемый материал для четвертого задания - создать реализацию партицированного `KeyValueApi`, включающую: - - получения и использование конфигурации - списка адресов нод Persistent Storage Unit (`List[KeyValueApi]`), timeout, - partitioner - - координатор в виде надстройки для каждой из нод с функцией маршрутизации запросов в нижележайшие ноды. Операции - записи, чтения и удаления по ключу производятся с нодой соотнесенной с партицией ключа. Операция получения списка - ключей производит обращение ко всем нодам и объединяет результат. Операции получения информации и выполнения команд + - получения и использование конфигурации: `List[KeyValueApi]`, `timeout`, `partitioner` + - координатор в виде надстройки для каждой из нод с функцией маршрутизации запросов в шарды. Операции + записи, чтения и удаления по ключу производятся с шардом соотнесенным с ключом. Операция получения списка + ключей производит обращение ко всем шардам и объединяет результат. Операции получения информации и выполнения команд выполняют, соотвественно, опрос нод кластера и отправку команды на соотнесенную с командой ноду. - HTTP API партицированного координатора - создать расширение `KeyValueApiHttpClient` для работы со списком нод-координаторов. Клиент работает со списком -равнозначных нод-координаторов, если происходит ошибка связи с координатором 1, то перация исполняется на -координаторе 2 и далее. Если аналогичный клиент уже реализован в задании 3, повтороной реализации не потребуется. -- создать интеграционные тесты (`PartitionedKeyValueApiHttpClientTest`) записи, чтения и удаления на кластере с -партиционированием (предполагается, что он и составляющие поднимаются в контейнерах) в конфигурациях: - - cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner), - cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = FirstLetterPartitioner). - Моделирует отключение одной ноды при партиционировании по начальному символу ключа - - cluster1: PartitionedKeyValueApi(Set(node0, node1, node2, node3, node4), timeout = 3s, partitioner = +равнозначных нод, если происходит ошибка связи с координатором 1, то операция исполняется на +координаторе 2 и далее. Если аналогичный клиент уже реализован в задании 3, повтороной реализации не требуется. +- создать интеграционные тесты (наследуются от `AbstractPartitionedKeyValueApiHttpClientTest`) партицированных +кластеров. В тестах получаются число потерянных в результате ребалансировки ключей и число неудаленных +ключей с отказавшего шарда, эти числа сравниваются в соотношении с общим числом ключей, подсказки о границах +соотношений можно найти в тестах `Partitioner`'ов. Предполагается, что кластера для тестов поднимаются в +контейнерах, необходимо рассмотреть ситуации: + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner), + cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = FirstLetterPartitioner)`, + моделирует отключение одной ноды при партиционировании по начальному символу ключа + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2, node3, node4), timeout = 3s, partitioner = ModNPartitioner), cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = - ModNPartitioner). Моделирует отключение двух нод при партиционировании по modN(хеш-код ключа) - - cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner), - cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner). - Моделирует смену схемы партиционирования - - cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner), - cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner). - Моделирует отключение одной ноды при консистентном хешировании + ModNPartitioner)`, моделирует отключение двух нод при партиционировании по modN(хеш-код ключа) + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner), + cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)` + моделирует смену схемы партиционирования + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = + ConsistentHashMd5Partitioner), + cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner)`, + моделирует отключение одной ноды при консистентном хешировании - описать в `INSTALL_TASK4.md` специфику сборки приложений и запуска интеграционных тестов - описать в `README_TASK4.md` что было реализовано, описание проблем, не решенных в коде и требующих дальнейшего рассмотрения, неявных моментов. Обязательно добавить название и список участников команды. diff --git a/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/AbstractPartitionedKeyValueApiHttpClientTest.java b/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/AbstractPartitionedKeyValueApiHttpClientTest.java new file mode 100644 index 0000000..a7345e1 --- /dev/null +++ b/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/AbstractPartitionedKeyValueApiHttpClientTest.java @@ -0,0 +1,47 @@ +package ru.csc.bdse.kv; + +import org.junit.Test; + +/** + * Test have to be implemented + * + * @author alesavin + */ +public abstract class AbstractPartitionedKeyValueApiHttpClientTest { + +/* + protected abstract KeyValueApi newCluster1(); + protected abstract KeyValueApi newCluster2(); + // Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet()); + protected abstract Set keys(); + protected abstract float expectedKeysLossProportion(); + protected abstract float expectedUndeletedKeysProportion(); + + private KeyValueApi cluster1 = newCluster1(); + private KeyValueApi cluster2 = newCluster2(); + + private Set keys = keys(); +*/ + + @Test + public void put1000KeysAndReadItCorrectlyOnCluster1() { + // TODO put 1000 keys to storage ant read it + } + + @Test + public void readKeysFromCluster2AndCheckLossProportion() { + // TODO read all keys from cluster2 and made some statistics (related to expectedKeysLossProportion) + } + + @Test + public void deleteAllKeysFromCluster2() { + // TODO try to delete all keys on cluster2 + } + + @Test + public void readKeysFromCluster1AfterDeletionAtCluster2() { + // TODO read all keys from cluster1, made some statistics (related to expectedUndeletedKeysProportion) + } +} + + diff --git a/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/PartitionedKeyValueApiHttpClientTest.java b/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/PartitionedKeyValueApiHttpClientTest.java deleted file mode 100644 index a48e315..0000000 --- a/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/PartitionedKeyValueApiHttpClientTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package ru.csc.bdse.kv; - -import org.apache.commons.lang.RandomStringUtils; -import org.junit.Test; - -import java.util.Collection; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Test have to be implemented - * - * @author alesavin - */ -public abstract class PartitionedKeyValueApiHttpClientTest { - - protected abstract KeyValueApi newCluster1(); - protected abstract KeyValueApi newCluster2(); - protected abstract float expectedKeysLossProportion(); - - private KeyValueApi cluster1 = newCluster1(); - private KeyValueApi cluster2 = newCluster2(); - private int keysCount = 1000; - -/* - private Collection keys = - Stream.generate(() -> RandomStringUtils.random(10)).limit(keysCount).collect(Collectors.toList()); -*/ - - @Test - public void put1000KeysAndReadItCorrectlyOnCluster1() { -// keys.forEach(key -> cluster1.put(key, key.getBytes())); - - // TODO put 1000 keys to storage ant read it: getKeys(), get() - } - - @Test - public void readKeysFromCluster2AndCheckLossProportion() { - // TODO read all keys from cluster2 - } -} - - diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java index abf12ff..857c98e 100644 --- a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java @@ -2,6 +2,7 @@ import java.util.Collection; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Function; @@ -19,12 +20,12 @@ public class ConsistentHash { private final SortedMap circle = new TreeMap<>(); public ConsistentHash(final Function hashFunction, - final Collection nodes) { - this(hashFunction, nodes, 3); + final Set nodes) { + this(hashFunction, nodes, 1); } public ConsistentHash(final Function hashFunction, - final Collection nodes, + final Set nodes, int factor) { this.hashFunction = hashFunction; this.factor = factor; diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java index d099b3e..96c2b29 100644 --- a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java @@ -1,5 +1,6 @@ package ru.csc.bdse.partitioning; +import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import java.util.Arrays; @@ -19,10 +20,10 @@ public class ConsistentHashMd5PartitionerTest { @Test - public void mapsToNodeByConsistentHashingForSerialKeys() { + public void moveLessThanHalfOfKeysThenRebalance() { Set keys = - Stream.iterate(1000, n -> n + 3).limit(100).map(String::valueOf).collect(Collectors.toSet()); + Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet()); final Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); final Map map = PartitionerUtils.getAll( @@ -35,7 +36,7 @@ public void mapsToNodeByConsistentHashingForSerialKeys() { keys); // There less than half of the keys to be moved - assertThat(PartitionerUtils.diffStatistics(partitions, map, map2).values().stream().mapToLong(Math::abs).sum()).as("no diffs") + assertThat(PartitionerUtils.moves(map, map2)).as("moves") .isLessThan(keys.size() / 2); } } \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java index 3a70851..976880d 100644 --- a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java @@ -4,7 +4,6 @@ import org.junit.Test; import java.util.*; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java index 4fa0e9e..e397dc8 100644 --- a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java @@ -1,5 +1,6 @@ package ru.csc.bdse.partitioning; +import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import java.util.Arrays; @@ -19,7 +20,7 @@ public class FirstLetterPartitionerTest { @Test - public void mapsToNodeByFirstLetter() { + public void mapsByFirstLetter() { Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); final Partitioner partitioner = new FirstLetterPartitioner(partitions); assertThat(partitioner.getPartition("\u0000")).isEqualTo("0"); @@ -44,7 +45,7 @@ public void mapsToNodeByFirstLetter() { } @Test - public void mapsToRepeatedNodeByFirstLetter() { + public void mapsByFirstLetterInRanges() { Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); final Partitioner partitioner = new FirstLetterPartitioner(partitions); char c1 = (char)(Character.MAX_VALUE / partitions.size()); @@ -60,25 +61,23 @@ public void mapsToRepeatedNodeByFirstLetter() { } @Test - public void mapsToNodeByFirstLetterForSerialKeys() { + public void moveAllOfKeysThenRebalance() { Set keys = - Stream.iterate(1000, n -> n + 3).limit(100).map(String::valueOf).collect(Collectors.toSet()); + Stream.generate(() -> RandomStringUtils.randomNumeric(10)).limit(1000).collect(Collectors.toSet()); final Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); final Map map = PartitionerUtils.getAll( new FirstLetterPartitioner(partitions), keys); - assertThat(PartitionerUtils.statistics(map).size()).as("count partitions").isEqualTo(1); final Set partitions2 = new HashSet<>(Arrays.asList("1", "2")); final Map map2 = PartitionerUtils.getAll( new FirstLetterPartitioner(partitions2), keys); - assertThat(PartitionerUtils.statistics(map2).size()).as("count partitions").isEqualTo(1); - // There more than half of the keys to be moved - assertThat(PartitionerUtils.diffStatistics(partitions, map, map2).values().stream().mapToLong(Math::abs).sum()).as("no diffs") - .isGreaterThan(keys.size() / 2); + // There move all of the keys + assertThat(PartitionerUtils.moves(map, map2)).as("moves") + .isGreaterThanOrEqualTo(keys.size()); } } \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java index 4e41a39..da5a6d1 100644 --- a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java @@ -1,5 +1,6 @@ package ru.csc.bdse.partitioning; +import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import java.util.Arrays; @@ -19,7 +20,7 @@ public class ModNPartitionerTest { @Test - public void mapsToNodeByModN() { + public void mapsByModN() { Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); final Partitioner partitioner = new ModNPartitioner(partitions); assertThat(partitioner.getPartition("123")).isEqualTo("0"); @@ -29,10 +30,10 @@ public void mapsToNodeByModN() { } @Test - public void mapsToNodeByModNForSerialKeys() { + public void moveHalfOfKeysThenRebalance() { Set keys = - Stream.iterate(1000, n -> n + 3).limit(100).map(String::valueOf).collect(Collectors.toSet()); + Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet()); final Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); final Map map = PartitionerUtils.getAll( @@ -45,7 +46,7 @@ public void mapsToNodeByModNForSerialKeys() { keys); // There more than half of the keys to be moved - assertThat(PartitionerUtils.diffStatistics(partitions, map, map2).values().stream().mapToLong(Math::abs).sum()).as("no diffs") + assertThat(PartitionerUtils.moves(map, map2)).as("moves") .isGreaterThan(keys.size() / 2); } } \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java index 30bdd8c..9c6f99b 100644 --- a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java @@ -1,6 +1,5 @@ package ru.csc.bdse.partitioning; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -13,28 +12,25 @@ public class PartitionerUtils { public static Map getAll(final Partitioner partitioner, - Set keys) { + final Set keys) { return keys.stream().collect(Collectors.toMap(key -> key, partitioner::getPartition)); } - public static Map statistics(Map all) { + public static Map statistics(Map all) { return all.values().stream().collect(Collectors.groupingBy( v -> v, - Collectors.counting())); + Collectors.reducing(0, e -> 1, Integer::sum))); } - public static Map diffStatistics(Set partitions, - Map left, - Map right) { - Map leftStatistics = statistics(left); - Map rightStatistics = statistics(right); - Map diff = new HashMap<>(); - - for (String partition: partitions) { - Long l = leftStatistics.getOrDefault(partition, 0L); - Long r = rightStatistics.getOrDefault(partition, 0L); - diff.put(partition, l - r); + public static int moves(Map left, + Map right) { + int moves = 0; + for (String key: left.keySet()) { + String lp = left.get(key); + String rp = right.get(key); + if (!lp.equals(rp)) moves++; } - return diff; + return moves; } + }