Skip to content

Commit

Permalink
complete partitioners tests; improve TASK4.md
Browse files Browse the repository at this point in the history
  • Loading branch information
alesavin committed Apr 4, 2018
1 parent c888f66 commit 01be879
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 111 deletions.
67 changes: 35 additions & 32 deletions TASK4.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
## Задание 4
Необходимо реализовать `KeyValueApi` с функцией разделения пространства хранимых пар ключ-значение по кластеру.
Необходимо реализовать `KeyValueApi` с функцией распределения хранимых пар ключ-значение по кластеру.
Подобное разделение называется партиционированием: все множество ключей делится на части - партиции, для хранилища в
каждый момент известно на каком из составляющих его Key-value Persistent Storage Unit находится часть. Составляющие
партицированный кластер ноды принято называть шардами, партиции иногда называют [виртуальными нодами](https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeDistribute.html). Последнее
происходит если число партиций сильно больше числа шардов, тогда один шард обрабатывает ______.
происходит если число партиций сильно больше числа шардов, тогда один шард обрабатывает множество партиций. В данном
задании, для упрощения, предполагается что шард обслуживает одну партицию.

Для облегчения задачи поставляется интерфейс `Partitioner`, описывающий преобразование ключа в партицию, и три его реализации:
Для облегчения задачи поставляется интерфейс `Partitioner`, описывающий преобразование ключа в партицию (для данного
задания это имя шарда), и три его реализации:
- `FirstLetterPartitioner`, на основе интервалов начальных символов ключа
- `ModNPartitioner`, на основе хеш-кода ключа и деления по модулю
- `ConsistentHashMd5Partitioner`, на основе [консистентного хеширования](https://en.wikipedia.org/wiki/Consistent_hashing)

Партицированный `KeyValueApi`, который требуется реализовать, имеет статическую конфигурацию:
- список шардов `List[KeyValueApi]`, составляющих кластер. Каждый шард обслуживает одну или более партиции,
использование имени шарда в качестве имени партиции допустимо
- список шардов `List[KeyValueApi]`, составляющих кластер
- `timeout` - максимальное время выполнения операции с шардом
- `partitioner` - алгоритм определения партиции из ключа (один из указанных выше)
- `partitioner` - алгоритм определения партиции-шарда из ключа (один из указанных выше)

Логика определения шарда, с которой нужно производить операции для конкретного ключа, содержится в
надстройке-координаторе. Координатор находится на каждом из шардов, клиент, использующий партицированное
Expand All @@ -24,42 +25,44 @@
- cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)
- cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ModNPartitioner)

Кластер версии 2 моделирует отказ шарда node1 в кластере версии 1, при этом работоспособность node1 не нарушается. В
cluster2 ключи, ранее принадлежавшие node1, должны быть перераспределены между node0 и node2. Важной составляющей
Кластер с номером 2 моделирует отказ шарда node1 в кластере с номером 1, при этом работоспособность node1 не нарушается
. В cluster2 ключи, ранее принадлежавшие node1, должны быть перераспределены между node0 и node2. Важной составляющей
практического задания является проверка в тестах количества ключей, перемещенных в результате
"удаления"/"добавления" нод в кластере (в идеальном случае перемещается не более K/N ключей, где K -
"удаления" нод в кластере (в идеальном случае перемещается не более K/N ключей, где K -
общее их число, N - число нод). Для практического задания значения в парах key-value не играют существенной роли,
рекомендуется вариант когда в значении содержится байты адреса шарда/партиции.
рекомендуется вариант когда в значении содержатся байтовое представление ключа.

### Требуется:
- синхронизировать {your-awesome-team-fork-repo} c upstream для получения обновленных файлов https://help.github
.com/articles/syncing-a-fork/, устранить конфликты в результате merge
- синхронизировать {your-awesome-team-fork-repo} c upstream для получения обновленных файлов https://help.github.com/articles/syncing-a-fork/, устранить конфликты в результате merge
- сделать бранч `csc-bdse-task4`, где будет находиться сдаваемый материал для четвертого задания
- создать реализацию партицированного `KeyValueApi`, включающую:
- получения и использование конфигурации - списка адресов нод Persistent Storage Unit (`List[KeyValueApi]`), timeout,
partitioner
- координатор в виде надстройки для каждой из нод с функцией маршрутизации запросов в нижележайшие ноды. Операции
записи, чтения и удаления по ключу производятся с нодой соотнесенной с партицией ключа. Операция получения списка
ключей производит обращение ко всем нодам и объединяет результат. Операции получения информации и выполнения команд
- получения и использование конфигурации: `List[KeyValueApi]`, `timeout`, `partitioner`
- координатор в виде надстройки для каждой из нод с функцией маршрутизации запросов в шарды. Операции
записи, чтения и удаления по ключу производятся с шардом соотнесенным с ключом. Операция получения списка
ключей производит обращение ко всем шардам и объединяет результат. Операции получения информации и выполнения команд
выполняют, соотвественно, опрос нод кластера и отправку команды на соотнесенную с командой ноду.
- HTTP API партицированного координатора
- создать расширение `KeyValueApiHttpClient` для работы со списком нод-координаторов. Клиент работает со списком
равнозначных нод-координаторов, если происходит ошибка связи с координатором 1, то перация исполняется на
координаторе 2 и далее. Если аналогичный клиент уже реализован в задании 3, повтороной реализации не потребуется.
- создать интеграционные тесты (`PartitionedKeyValueApiHttpClientTest`) записи, чтения и удаления на кластере с
партиционированием (предполагается, что он и составляющие поднимаются в контейнерах) в конфигурациях:
- cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = FirstLetterPartitioner).
Моделирует отключение одной ноды при партиционировании по начальному символу ключа
- cluster1: PartitionedKeyValueApi(Set(node0, node1, node2, node3, node4), timeout = 3s, partitioner =
равнозначных нод, если происходит ошибка связи с координатором 1, то операция исполняется на
координаторе 2 и далее. Если аналогичный клиент уже реализован в задании 3, повтороной реализации не требуется.
- создать интеграционные тесты (наследуются от `AbstractPartitionedKeyValueApiHttpClientTest`) партицированных
кластеров. В тестах получаются число потерянных в результате ребалансировки ключей и число неудаленных
ключей с отказавшего шарда, эти числа сравниваются в соотношении с общим числом ключей, подсказки о границах
соотношений можно найти в тестах `Partitioner`'ов. Предполагается, что кластера для тестов поднимаются в
контейнерах, необходимо рассмотреть ситуации:
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = FirstLetterPartitioner)`,
моделирует отключение одной ноды при партиционировании по начальному символу ключа
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2, node3, node4), timeout = 3s, partitioner =
ModNPartitioner), cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner =
ModNPartitioner). Моделирует отключение двух нод при партиционировании по modN(хеш-код ключа)
- cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner).
Моделирует смену схемы партиционирования
- cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner).
Моделирует отключение одной ноды при консистентном хешировании
ModNPartitioner)`, моделирует отключение двух нод при партиционировании по modN(хеш-код ключа)
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)`
моделирует смену схемы партиционирования
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner =
ConsistentHashMd5Partitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner)`,
моделирует отключение одной ноды при консистентном хешировании
- описать в `INSTALL_TASK4.md` специфику сборки приложений и запуска интеграционных тестов
- описать в `README_TASK4.md` что было реализовано, описание проблем, не решенных в коде и требующих дальнейшего
рассмотрения, неявных моментов. Обязательно добавить название и список участников команды.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ru.csc.bdse.kv;

import org.junit.Test;

/**
* Test have to be implemented
*
* @author alesavin
*/
public abstract class AbstractPartitionedKeyValueApiHttpClientTest {

/*
protected abstract KeyValueApi newCluster1();
protected abstract KeyValueApi newCluster2();
// Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet());
protected abstract Set<String> keys();
protected abstract float expectedKeysLossProportion();
protected abstract float expectedUndeletedKeysProportion();
private KeyValueApi cluster1 = newCluster1();
private KeyValueApi cluster2 = newCluster2();
private Set<String> keys = keys();
*/

@Test
public void put1000KeysAndReadItCorrectlyOnCluster1() {
// TODO put 1000 keys to storage ant read it
}

@Test
public void readKeysFromCluster2AndCheckLossProportion() {
// TODO read all keys from cluster2 and made some statistics (related to expectedKeysLossProportion)
}

@Test
public void deleteAllKeysFromCluster2() {
// TODO try to delete all keys on cluster2
}

@Test
public void readKeysFromCluster1AfterDeletionAtCluster2() {
// TODO read all keys from cluster1, made some statistics (related to expectedUndeletedKeysProportion)
}
}


This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import java.util.Collection;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
Expand All @@ -19,12 +20,12 @@ public class ConsistentHash {
private final SortedMap<Integer, String> circle = new TreeMap<>();

public ConsistentHash(final Function<String, Integer> hashFunction,
final Collection<String> nodes) {
this(hashFunction, nodes, 3);
final Set<String> nodes) {
this(hashFunction, nodes, 1);
}

public ConsistentHash(final Function<String, Integer> hashFunction,
final Collection<String> nodes,
final Set<String> nodes,
int factor) {
this.hashFunction = hashFunction;
this.factor = factor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ru.csc.bdse.partitioning;

import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;

import java.util.Arrays;
Expand All @@ -19,10 +20,10 @@
public class ConsistentHashMd5PartitionerTest {

@Test
public void mapsToNodeByConsistentHashingForSerialKeys() {
public void moveLessThanHalfOfKeysThenRebalance() {

Set<String> keys =
Stream.iterate(1000, n -> n + 3).limit(100).map(String::valueOf).collect(Collectors.toSet());
Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet());

final Set<String> partitions = new HashSet<>(Arrays.asList("0", "1", "2"));
final Map<String, String> map = PartitionerUtils.getAll(
Expand All @@ -35,7 +36,7 @@ public void mapsToNodeByConsistentHashingForSerialKeys() {
keys);

// There less than half of the keys to be moved
assertThat(PartitionerUtils.diffStatistics(partitions, map, map2).values().stream().mapToLong(Math::abs).sum()).as("no diffs")
assertThat(PartitionerUtils.moves(map, map2)).as("moves")
.isLessThan(keys.size() / 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.junit.Test;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down
Loading

0 comments on commit 01be879

Please sign in to comment.