diff --git a/README.md b/README.md index d952058..bf6042c 100644 --- a/README.md +++ b/README.md @@ -12,3 +12,5 @@ [Задание 3](TASK3.md) +[Задание 4](TASK4.md) + diff --git a/TASK4.md b/TASK4.md new file mode 100644 index 0000000..2c54641 --- /dev/null +++ b/TASK4.md @@ -0,0 +1,76 @@ +## Задание 4 +Необходимо реализовать `KeyValueApi` с функцией распределения хранимых пар ключ-значение по кластеру. +Подобное разделение называется партиционированием: все множество ключей делится на части - партиции, для хранилища в +каждый момент известно на каком из составляющих его Key-value Persistent Storage Unit находится часть. Составляющие +партицированный кластер ноды принято называть шардами, партиции иногда называют [виртуальными нодами](https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeDistribute.html). Последнее +происходит если число партиций сильно больше числа шардов, тогда один шард обрабатывает множество партиций. В данном +задании, для упрощения, предполагается что шард обслуживает одну партицию. + +Для облегчения задачи поставляется интерфейс `Partitioner`, описывающий преобразование ключа в партицию (для данного +задания это имя шарда), и три его реализации: +- `FirstLetterPartitioner`, на основе интервалов начальных символов ключа +- `ModNPartitioner`, на основе хеш-кода ключа и деления по модулю +- `ConsistentHashMd5Partitioner`, на основе [консистентного хеширования](https://en.wikipedia.org/wiki/Consistent_hashing) + +Партицированный `KeyValueApi`, который требуется реализовать, имеет статическую конфигурацию: +- список шардов `List[KeyValueApi]`, составляющих кластер +- `timeout` - максимальное время выполнения операции с шардом +- `partitioner` - алгоритм определения партиции-шарда из ключа (один из указанных выше) + +Логика определения шарда, с которого нужно производить операции для конкретного ключа, содержится в +надстройке-координаторе. Координатор находится на каждом из шардов, клиент, использующий партицированное + хранилище, может обращаться к любому из координаторов с одинаковым результатом. Координаторы инициализируются + одинаковой конфигурацией (описана выше), она не изменяется в процессе работы. Отказ шардов моделируется через + создание кластера с новой конфигурацией, например: +- cluster1: `PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)` +- cluster2: `PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ModNPartitioner)` + +Кластер с номером 2 моделирует отказ шарда node1 в кластере с номером 1, при этом работоспособность node1 не нарушается +. В cluster2 ключи, ранее принадлежавшие node1, должны быть перераспределены между node0 и node2. Важной составляющей +практического задания является проверка в тестах количества ключей, перемещенных в результате +"удаления" нод в кластере (в идеальном случае перемещается не более K/N ключей, где K - +общее их число, N - число нод). Для практического задания значения в парах key-value не играют существенной роли, +рекомендуется вариант когда в значении содержатся байтовое представление ключа. + +### Требуется: +- синхронизировать {your-awesome-team-fork-repo} c upstream для получения обновленных файлов https://help.github.com/articles/syncing-a-fork/, устранить конфликты в результате merge +- сделать бранч `csc-bdse-task4`, где будет находиться сдаваемый материал для четвертого задания +- создать реализацию партицированного `KeyValueApi`, включающую: + - получения и использование конфигурации: `List[KeyValueApi]`, `timeout`, `partitioner` + - координатор в виде надстройки для каждой из нод с функцией маршрутизации запросов в шарды. Операции + записи, чтения и удаления по ключу производятся с шардом соотнесенным с ключом. Операция получения списка + ключей производит обращение ко всем шардам и объединяет результат. Операции получения информации и выполнения команд + выполняют, соответственно, опрос нод кластера и отправку команды на соотнесенную с командой ноду. + - HTTP API партицированного координатора +- создать расширение `KeyValueApiHttpClient` для работы со списком нод-координаторов. Клиент работает со списком +равнозначных нод, если происходит ошибка связи с координатором 1, то операция исполняется на +координаторе 2 и далее. Если аналогичный клиент уже реализован в задании 3, повтороной реализации не требуется. +- создать интеграционные тесты (наследуются от `AbstractPartitionedKeyValueApiHttpClientTest`) партицированных +кластеров. В тестах получаются число потерянных в результате ребалансировки ключей и число неудаленных +ключей с отказавшего шарда, эти числа сравниваются в соотношении с общим числом ключей, подсказки о границах +соотношений можно найти в тестах `Partitioner`'ов. Предполагается, что кластера для тестов поднимаются в +контейнерах, необходимо рассмотреть ситуации: + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner), + cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = FirstLetterPartitioner)`, + моделирует отключение одной ноды при партиционировании по начальному символу ключа + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2, node3, node4), timeout = 3s, partitioner = + ModNPartitioner), cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = + ModNPartitioner)`, моделирует отключение двух нод при партиционировании по modN(хеш-код ключа) + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner), + cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)` + моделирует смену схемы партиционирования + - `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = + ConsistentHashMd5Partitioner), + cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner)`, + моделирует отключение одной ноды при консистентном хешировании +- описать в `INSTALL_TASK4.md` специфику сборки приложений и запуска интеграционных тестов +- описать в `README_TASK4.md` что было реализовано, описание проблем, не решенных в коде и требующих дальнейшего +рассмотрения, неявных моментов. Обязательно добавить название и список участников команды. +- прислать PR {your-awesome-team-fork-repo}/csc-bdse-task4 => {your-awesome-team-fork-repo}/master (добавить alesavin, +dormidon, semkagtn, 747mmHg в ревьюеры) +- добавить ссылку на PR в топик задания 4 курса на https://compscicenter.ru + +### Дополнительно: +- реализация партицированного контроллера, работающего над реплицированными контроллерами из задания 3. +Предполагается репликация самих партиций. + diff --git a/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/AbstractPartitionedKeyValueApiHttpClientTest.java b/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/AbstractPartitionedKeyValueApiHttpClientTest.java new file mode 100644 index 0000000..a7345e1 --- /dev/null +++ b/bdse-integration-tests/src/test/java/ru/csc/bdse/kv/AbstractPartitionedKeyValueApiHttpClientTest.java @@ -0,0 +1,47 @@ +package ru.csc.bdse.kv; + +import org.junit.Test; + +/** + * Test have to be implemented + * + * @author alesavin + */ +public abstract class AbstractPartitionedKeyValueApiHttpClientTest { + +/* + protected abstract KeyValueApi newCluster1(); + protected abstract KeyValueApi newCluster2(); + // Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet()); + protected abstract Set keys(); + protected abstract float expectedKeysLossProportion(); + protected abstract float expectedUndeletedKeysProportion(); + + private KeyValueApi cluster1 = newCluster1(); + private KeyValueApi cluster2 = newCluster2(); + + private Set keys = keys(); +*/ + + @Test + public void put1000KeysAndReadItCorrectlyOnCluster1() { + // TODO put 1000 keys to storage ant read it + } + + @Test + public void readKeysFromCluster2AndCheckLossProportion() { + // TODO read all keys from cluster2 and made some statistics (related to expectedKeysLossProportion) + } + + @Test + public void deleteAllKeysFromCluster2() { + // TODO try to delete all keys on cluster2 + } + + @Test + public void readKeysFromCluster1AfterDeletionAtCluster2() { + // TODO read all keys from cluster1, made some statistics (related to expectedUndeletedKeysProportion) + } +} + + diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java new file mode 100644 index 0000000..abf98e8 --- /dev/null +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHash.java @@ -0,0 +1,62 @@ +package ru.csc.bdse.partitioning; + + +import java.util.Collection; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Function; + +/** + * Represents consistent hashing circle + * See https://web.archive.org/web/20120605030524/http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html + * + * @author alesavin + */ +public class ConsistentHash { + + private final Function hashFunction; + private final int factor; + private final SortedMap circle = new TreeMap<>(); + + public ConsistentHash(final Function hashFunction, + final Set nodes) { + this(hashFunction, nodes, 1); + } + + public ConsistentHash(final Function hashFunction, + final Set nodes, + int factor) { + this.hashFunction = hashFunction; + this.factor = factor; + + for (String node : nodes) { + add(node); + } + } + + private void add(String node) { + for (int i = 0; i < factor; i++) { + circle.put(hashFunction.apply(node + i), node); + } + } + + public void remove(String node) { + for (int i = 0; i < factor; i++) { + circle.remove(hashFunction.apply(node + i)); + } + } + + public String get(String key) { + if (circle.isEmpty()) { + throw new IllegalStateException("ConsistentHash circle is empty"); + } + int hash = hashFunction.apply(key); + if (!circle.containsKey(hash)) { + SortedMap tailMap = circle.tailMap(hash); + hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); + } + return circle.get(hash); + } + +} \ No newline at end of file diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHashMd5Partitioner.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHashMd5Partitioner.java new file mode 100644 index 0000000..08406c6 --- /dev/null +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ConsistentHashMd5Partitioner.java @@ -0,0 +1,22 @@ +package ru.csc.bdse.partitioning; + +import java.util.Set; + +/** + * Selects partition by consistent hashing circle + * + * @author alesavin + */ +public class ConsistentHashMd5Partitioner implements Partitioner { + + private final ConsistentHash ch; + + public ConsistentHashMd5Partitioner(Set partitions) { + this.ch = new ConsistentHash(HashingFunctions.md5Function, partitions); + } + + @Override + public String getPartition(String key) { + return ch.get(key); + } +} diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/FirstLetterPartitioner.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/FirstLetterPartitioner.java new file mode 100644 index 0000000..a912175 --- /dev/null +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/FirstLetterPartitioner.java @@ -0,0 +1,28 @@ +package ru.csc.bdse.partitioning; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Selects partition by first letter of a key + * + * @author alesavin + */ +public class FirstLetterPartitioner implements Partitioner { + + private final List partitions; + + public FirstLetterPartitioner(Set partitions) { + List list = new ArrayList<>(partitions); + Collections.sort(list); + this.partitions = list; + } + + @Override + public String getPartition(String key) { + int index = key.charAt(0) * partitions.size() / ((int)Character.MAX_VALUE + 1); + return partitions.get(index); + } +} diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/HashingFunctions.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/HashingFunctions.java new file mode 100644 index 0000000..071b4da --- /dev/null +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/HashingFunctions.java @@ -0,0 +1,96 @@ +package ru.csc.bdse.partitioning; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.function.Function; + +/** + * Provide some hashing for strings + * + * @author alesavin + */ +public final class HashingFunctions { + + public final static Function hashCodeFunction = + String::hashCode; + + public final static Function md5Function = (String s) -> { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + md.update(s.getBytes()); + byte[] bytes = md.digest(); + return (bytes[0] & 0xFF) + | ((bytes[1] & 0xFF) << 8) + | ((bytes[2] & 0xFF) << 16) + | ((bytes[3] & 0xFF) << 24); + } catch (NoSuchAlgorithmException e) { + return 0; + } + }; + + private static final int DEFAULT_SEED = 104729; + // Constants for 32 bit variant + private static final int C1_32 = 0xcc9e2d51; + private static final int C2_32 = 0x1b873593; + private static final int R1_32 = 15; + private static final int R2_32 = 13; + private static final int M_32 = 5; + private static final int N_32 = 0xe6546b64; + + /** + * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms. + * + * 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94 + */ + public final static Function murmur3Function = (String s) -> { + byte[] data = s.getBytes(); + int length = data.length; + + int hash = DEFAULT_SEED; + final int nblocks = length >> 2; + + // body + for (int i = 0; i < nblocks; i++) { + int i_4 = i << 2; + int k = (data[i_4] & 0xff) + | ((data[i_4 + 1] & 0xff) << 8) + | ((data[i_4 + 2] & 0xff) << 16) + | ((data[i_4 + 3] & 0xff) << 24); + + // mix functions + k *= C1_32; + k = Integer.rotateLeft(k, R1_32); + k *= C2_32; + hash ^= k; + hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32; + } + + // tail + int idx = nblocks << 2; + int k1 = 0; + switch (length - idx) { + case 3: + k1 ^= data[idx + 2] << 16; + case 2: + k1 ^= data[idx + 1] << 8; + case 1: + k1 ^= data[idx]; + + // mix functions + k1 *= C1_32; + k1 = Integer.rotateLeft(k1, R1_32); + k1 *= C2_32; + hash ^= k1; + } + + // finalization + hash ^= length; + hash ^= (hash >>> 16); + hash *= 0x85ebca6b; + hash ^= (hash >>> 13); + hash *= 0xc2b2ae35; + hash ^= (hash >>> 16); + + return hash; + }; +} \ No newline at end of file diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ModNPartitioner.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ModNPartitioner.java new file mode 100644 index 0000000..07aad17 --- /dev/null +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/ModNPartitioner.java @@ -0,0 +1,28 @@ +package ru.csc.bdse.partitioning; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Selects partition by mod N + * + * @author alesavin + */ +public class ModNPartitioner implements Partitioner { + + private final List partitions; + + public ModNPartitioner(Set partitions) { + List list = new ArrayList<>(partitions); + Collections.sort(list); + this.partitions = list; + } + + @Override + public String getPartition(String key) { + int index = Math.abs(key.hashCode() % partitions.size()); + return partitions.get(index); + } +} diff --git a/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/Partitioner.java b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/Partitioner.java new file mode 100644 index 0000000..e5f69cb --- /dev/null +++ b/bdse-kvnode/src/main/java/ru/csc/bdse/partitioning/Partitioner.java @@ -0,0 +1,14 @@ +package ru.csc.bdse.partitioning; + +/** + * Maps key to partition + * + * @author alesavin + */ +public interface Partitioner { + + /** + * Selects right partition for the key + */ + String getPartition(String key); +} diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java new file mode 100644 index 0000000..96c2b29 --- /dev/null +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashMd5PartitionerTest.java @@ -0,0 +1,42 @@ +package ru.csc.bdse.partitioning; + +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +/** + * Tests for [[ConsistentHashMd5Partitioner]] + * + * @author alesavin + */ +public class ConsistentHashMd5PartitionerTest { + + @Test + public void moveLessThanHalfOfKeysThenRebalance() { + + Set keys = + Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet()); + + final Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); + final Map map = PartitionerUtils.getAll( + new ConsistentHashMd5Partitioner(partitions), + keys); + + final Set partitions2 = new HashSet<>(Arrays.asList("0", "2")); + final Map map2 = PartitionerUtils.getAll( + new ConsistentHashMd5Partitioner(partitions2), + keys); + + // There less than half of the keys to be moved + assertThat(PartitionerUtils.moves(map, map2)).as("moves") + .isLessThan(keys.size() / 2); + } +} \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java new file mode 100644 index 0000000..976880d --- /dev/null +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ConsistentHashTest.java @@ -0,0 +1,138 @@ +package ru.csc.bdse.partitioning; + +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +/** + * Tests for [[ConsistentHash]] + * + * @author alesavin + */ +public class ConsistentHashTest { + + private static final boolean DEBUG_OUTPUT = false; + private int keysCount = 100; + + @Test + public void workForOneNodeAndHashCode() { + Set nodes = Collections.singleton("0"); + final ConsistentHash ch = + new ConsistentHash(HashingFunctions.hashCodeFunction, nodes); + assertThat(ch.get("00")).as("00").isEqualTo("0"); + assertThat(ch.get("0")).as("0").isEqualTo("0"); + assertThat(ch.get("k0")).as("k0").isEqualTo("0"); + assertThat(ch.get("asdjasjkdjka")).as("asdjasjkdjka").isEqualTo("0"); + assertThat(ch.get("-123")).as("-123").isEqualTo("0"); + final ConsistentHash ch2 = + new ConsistentHash(HashingFunctions.hashCodeFunction, nodes); + assertThat(ch2.get("0")).as("0").isEqualTo("0"); + assertThat(ch2.get("k0")).as("k0").isEqualTo("0"); + assertThat(ch2.get("asdjasjkdjka")).as("asdjasjkdjka").isEqualTo("0"); + assertThat(ch2.get("-123")).as("-123").isEqualTo("0"); + } + + @Test + public void nonEvenlyPlaceKeysForThreeNodesAndHashCode() { + Set nodes = new HashSet<>(Arrays.asList("0", "1", "2")); + final ConsistentHash ch = + new ConsistentHash(HashingFunctions.hashCodeFunction, nodes); + Set keys = + Stream.generate(() -> RandomStringUtils.random(10)).limit(keysCount).collect(Collectors.toSet()); + Map statistics = getStatistics(nodes, keys, ch); + assertThat(statistics.get("0")).as("0 counts").isEqualTo(keysCount); + } + + @Test + public void evenlyPlaceKeysForThreeNodesAndMd5() { + Set nodes = new HashSet<>(Arrays.asList("0", "1", "2")); + final ConsistentHash ch = + new ConsistentHash(HashingFunctions.md5Function, nodes); + Set keys = + Stream.generate(() -> RandomStringUtils.random(10)).limit(keysCount).collect(Collectors.toSet()); + Map statistics = getStatistics(nodes, keys, ch); + for (String node: nodes) { + assertThat(statistics.get(node)).as(node + " counts").isLessThan(keysCount * 2 / nodes.size() ); + } + } + + @Test + public void rebalanceKeysAfterRemoveForFiveNodesAndMd5() { + Set nodes = new HashSet<>(Arrays.asList("0", "1", "2", "3", "4")); + final ConsistentHash ch = + new ConsistentHash(HashingFunctions.md5Function, nodes); + Set keys = + Stream.generate(() -> RandomStringUtils.random(10)).limit(keysCount).collect(Collectors.toSet()); + String nodeToRemove = + new ArrayList<>(nodes).get(new Random().nextInt(nodes.size())); + Map statisticsBeforeRemove = getStatistics(nodes, keys, ch); + ch.remove(nodeToRemove); + Map statisticsAfterRemove = getStatistics(nodes, keys, ch); + assertThat(statisticsAfterRemove.get(nodeToRemove)).as(nodeToRemove + " counts").isEqualTo(0); + + int movedKeys = 0; + for (String node: nodes) { + if (!nodeToRemove.equals(node)) { + int diff = statisticsAfterRemove.get(node) - statisticsBeforeRemove.get(node); + assertThat(diff).as(node + " diff").isGreaterThanOrEqualTo(0); + movedKeys += diff; + } + } + assertThat(movedKeys).as("moved keys").isEqualTo(statisticsBeforeRemove.get(nodeToRemove)); + } + + @Test + public void rebalanceKeysForTwoConfigsForFiveNodesAndMd5() { + Set nodes = new HashSet<>(Arrays.asList("0", "1", "2", "3", "4")); + final ConsistentHash ch = + new ConsistentHash( HashingFunctions.md5Function, nodes); + Set keys = + Stream.generate(() -> RandomStringUtils.random(10)).limit(keysCount).collect(Collectors.toSet()); + + Map statisticsBefore = getStatistics(nodes, keys, ch); + + String nodeToRemove = + new ArrayList<>(nodes).get(new Random().nextInt(nodes.size())); + Set nodes2 = + nodes.stream().filter(n -> !nodeToRemove.equals(n)).collect(Collectors.toSet()); + final ConsistentHash ch2 = + new ConsistentHash( HashingFunctions.md5Function, nodes2); + + Map statisticsAfter = getStatistics(nodes2, keys, ch2); + + int movedKeys = 0; + for (String node: nodes) { + if (!nodeToRemove.equals(node)) { + int diff = statisticsAfter.get(node) - statisticsBefore.get(node); + assertThat(diff).as(node + " diff").isGreaterThanOrEqualTo(0); + movedKeys += diff; + } + } + assertThat(movedKeys).as("moved keys").isEqualTo(statisticsBefore.get(nodeToRemove)); + } + + private Map getStatistics(Collection nodes, + Collection keys, + ConsistentHash ch) { + + Map statistics = new HashMap<>(); + for (String node: nodes) statistics.put(node, 0); + + for (String key: keys) { + String node = ch.get(key); + statistics.put(node, statistics.get(node) + 1); + } + + if (DEBUG_OUTPUT) { + for (String node : nodes) { + System.out.println(node + " = " + statistics.get(node)); + } + } + return statistics; + } +} \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java new file mode 100644 index 0000000..e397dc8 --- /dev/null +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/FirstLetterPartitionerTest.java @@ -0,0 +1,83 @@ +package ru.csc.bdse.partitioning; + +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +/** + * Tests for [[FirstLetterPartitioner]] + * + * @author alesavin + */ +public class FirstLetterPartitionerTest { + + @Test + public void mapsByFirstLetter() { + Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); + final Partitioner partitioner = new FirstLetterPartitioner(partitions); + assertThat(partitioner.getPartition("\u0000")).isEqualTo("0"); + assertThat(partitioner.getPartition("\uFFFF")).isEqualTo("2"); + assertThat(partitioner.getPartition("\uFFFA")).isEqualTo("2"); + + assertThat(partitioner.getPartition("Aaaaa")).isEqualTo("0"); + assertThat(partitioner.getPartition("A")).isEqualTo("0"); + assertThat(partitioner.getPartition("affff")).isEqualTo("0"); + assertThat(partitioner.getPartition("a123")).isEqualTo("0"); + assertThat(partitioner.getPartition("b")).isEqualTo("0"); + assertThat(partitioner.getPartition("c")).isEqualTo("0"); + assertThat(partitioner.getPartition("D")).isEqualTo("0"); + assertThat(partitioner.getPartition("z")).isEqualTo("0"); + assertThat(partitioner.getPartition("Zde")).isEqualTo("0"); + + char c1 = (char)(Character.MAX_VALUE / partitions.size()); + assertThat(partitioner.getPartition(new String(new char[]{c1}))).isEqualTo("0"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 + 1)}))).isEqualTo("1"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 + 5)}))).isEqualTo("1"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 2 + 1)}))).isEqualTo("2"); + } + + @Test + public void mapsByFirstLetterInRanges() { + Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); + final Partitioner partitioner = new FirstLetterPartitioner(partitions); + char c1 = (char)(Character.MAX_VALUE / partitions.size()); + assertThat(partitioner.getPartition(new String(new char[]{c1}))).isEqualTo("0"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 + 1)}))).isEqualTo("1"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 + 5)}))).isEqualTo("1"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 2)}))).isEqualTo("1"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 3)}))).isEqualTo("2"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 4)}))).isEqualTo("0"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 5)}))).isEqualTo("1"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 6)}))).isEqualTo("2"); + assertThat(partitioner.getPartition(new String(new char[]{(char)(c1 * 7)}))).isEqualTo("0"); + } + + @Test + public void moveAllOfKeysThenRebalance() { + + Set keys = + Stream.generate(() -> RandomStringUtils.randomNumeric(10)).limit(1000).collect(Collectors.toSet()); + + final Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); + final Map map = PartitionerUtils.getAll( + new FirstLetterPartitioner(partitions), + keys); + + final Set partitions2 = new HashSet<>(Arrays.asList("1", "2")); + final Map map2 = PartitionerUtils.getAll( + new FirstLetterPartitioner(partitions2), + keys); + + // There move all of the keys + assertThat(PartitionerUtils.moves(map, map2)).as("moves") + .isGreaterThanOrEqualTo(keys.size()); + } +} \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java new file mode 100644 index 0000000..da5a6d1 --- /dev/null +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/ModNPartitionerTest.java @@ -0,0 +1,52 @@ +package ru.csc.bdse.partitioning; + +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +/** + * Tests for [[ModNPartitioner]] + * + * @author alesavin + */ +public class ModNPartitionerTest { + + @Test + public void mapsByModN() { + Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); + final Partitioner partitioner = new ModNPartitioner(partitions); + assertThat(partitioner.getPartition("123")).isEqualTo("0"); + assertThat(partitioner.getPartition("1234")).isEqualTo("1"); + assertThat(partitioner.getPartition("12345")).isEqualTo("0"); + assertThat(partitioner.getPartition("Aaaa")).isEqualTo("2"); + } + + @Test + public void moveHalfOfKeysThenRebalance() { + + Set keys = + Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet()); + + final Set partitions = new HashSet<>(Arrays.asList("0", "1", "2")); + final Map map = PartitionerUtils.getAll( + new ModNPartitioner(partitions), + keys); + + final Set partitions2 = new HashSet<>(Arrays.asList("0", "2")); + final Map map2 = PartitionerUtils.getAll( + new ModNPartitioner(partitions2), + keys); + + // There more than half of the keys to be moved + assertThat(PartitionerUtils.moves(map, map2)).as("moves") + .isGreaterThan(keys.size() / 2); + } +} \ No newline at end of file diff --git a/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java new file mode 100644 index 0000000..9c6f99b --- /dev/null +++ b/bdse-kvnode/src/test/java/ru/csc/bdse/partitioning/PartitionerUtils.java @@ -0,0 +1,36 @@ +package ru.csc.bdse.partitioning; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Some convenient methods for tests + * + * @author alesavin + */ +public class PartitionerUtils { + + public static Map getAll(final Partitioner partitioner, + final Set keys) { + return keys.stream().collect(Collectors.toMap(key -> key, partitioner::getPartition)); + } + + public static Map statistics(Map all) { + return all.values().stream().collect(Collectors.groupingBy( + v -> v, + Collectors.reducing(0, e -> 1, Integer::sum))); + } + + public static int moves(Map left, + Map right) { + int moves = 0; + for (String key: left.keySet()) { + String lp = left.get(key); + String rp = right.get(key); + if (!lp.equals(rp)) moves++; + } + return moves; + } + +}