Skip to content

Commit

Permalink
Merge pull request #16 from alesavin/task4
Browse files Browse the repository at this point in the history
Task4
  • Loading branch information
alesavin authored Apr 5, 2018
2 parents 6fa3cdc + 2b890eb commit a889056
Show file tree
Hide file tree
Showing 14 changed files with 726 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@

[Задание 3](TASK3.md)

[Задание 4](TASK4.md)

76 changes: 76 additions & 0 deletions TASK4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
## Задание 4
Необходимо реализовать `KeyValueApi` с функцией распределения хранимых пар ключ-значение по кластеру.
Подобное разделение называется партиционированием: все множество ключей делится на части - партиции, для хранилища в
каждый момент известно на каком из составляющих его Key-value Persistent Storage Unit находится часть. Составляющие
партицированный кластер ноды принято называть шардами, партиции иногда называют [виртуальными нодами](https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archDataDistributeDistribute.html). Последнее
происходит если число партиций сильно больше числа шардов, тогда один шард обрабатывает множество партиций. В данном
задании, для упрощения, предполагается что шард обслуживает одну партицию.

Для облегчения задачи поставляется интерфейс `Partitioner`, описывающий преобразование ключа в партицию (для данного
задания это имя шарда), и три его реализации:
- `FirstLetterPartitioner`, на основе интервалов начальных символов ключа
- `ModNPartitioner`, на основе хеш-кода ключа и деления по модулю
- `ConsistentHashMd5Partitioner`, на основе [консистентного хеширования](https://en.wikipedia.org/wiki/Consistent_hashing)

Партицированный `KeyValueApi`, который требуется реализовать, имеет статическую конфигурацию:
- список шардов `List[KeyValueApi]`, составляющих кластер
- `timeout` - максимальное время выполнения операции с шардом
- `partitioner` - алгоритм определения партиции-шарда из ключа (один из указанных выше)

Логика определения шарда, с которого нужно производить операции для конкретного ключа, содержится в
надстройке-координаторе. Координатор находится на каждом из шардов, клиент, использующий партицированное
хранилище, может обращаться к любому из координаторов с одинаковым результатом. Координаторы инициализируются
одинаковой конфигурацией (описана выше), она не изменяется в процессе работы. Отказ шардов моделируется через
создание кластера с новой конфигурацией, например:
- cluster1: `PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)`
- cluster2: `PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ModNPartitioner)`

Кластер с номером 2 моделирует отказ шарда node1 в кластере с номером 1, при этом работоспособность node1 не нарушается
. В cluster2 ключи, ранее принадлежавшие node1, должны быть перераспределены между node0 и node2. Важной составляющей
практического задания является проверка в тестах количества ключей, перемещенных в результате
"удаления" нод в кластере (в идеальном случае перемещается не более K/N ключей, где K -
общее их число, N - число нод). Для практического задания значения в парах key-value не играют существенной роли,
рекомендуется вариант когда в значении содержатся байтовое представление ключа.

### Требуется:
- синхронизировать {your-awesome-team-fork-repo} c upstream для получения обновленных файлов https://help.github.com/articles/syncing-a-fork/, устранить конфликты в результате merge
- сделать бранч `csc-bdse-task4`, где будет находиться сдаваемый материал для четвертого задания
- создать реализацию партицированного `KeyValueApi`, включающую:
- получения и использование конфигурации: `List[KeyValueApi]`, `timeout`, `partitioner`
- координатор в виде надстройки для каждой из нод с функцией маршрутизации запросов в шарды. Операции
записи, чтения и удаления по ключу производятся с шардом соотнесенным с ключом. Операция получения списка
ключей производит обращение ко всем шардам и объединяет результат. Операции получения информации и выполнения команд
выполняют, соответственно, опрос нод кластера и отправку команды на соотнесенную с командой ноду.
- HTTP API партицированного координатора
- создать расширение `KeyValueApiHttpClient` для работы со списком нод-координаторов. Клиент работает со списком
равнозначных нод, если происходит ошибка связи с координатором 1, то операция исполняется на
координаторе 2 и далее. Если аналогичный клиент уже реализован в задании 3, повтороной реализации не требуется.
- создать интеграционные тесты (наследуются от `AbstractPartitionedKeyValueApiHttpClientTest`) партицированных
кластеров. В тестах получаются число потерянных в результате ребалансировки ключей и число неудаленных
ключей с отказавшего шарда, эти числа сравниваются в соотношении с общим числом ключей, подсказки о границах
соотношений можно найти в тестах `Partitioner`'ов. Предполагается, что кластера для тестов поднимаются в
контейнерах, необходимо рассмотреть ситуации:
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = FirstLetterPartitioner)`,
моделирует отключение одной ноды при партиционировании по начальному символу ключа
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2, node3, node4), timeout = 3s, partitioner =
ModNPartitioner), cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner =
ModNPartitioner)`, моделирует отключение двух нод при партиционировании по modN(хеш-код ключа)
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = FirstLetterPartitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner = ModNPartitioner)`
моделирует смену схемы партиционирования
- `cluster1: PartitionedKeyValueApi(Set(node0, node1, node2), timeout = 3s, partitioner =
ConsistentHashMd5Partitioner),
cluster2: PartitionedKeyValueApi(Set(node0, node2), timeout = 3s, partitioner = ConsistentHashMd5Partitioner)`,
моделирует отключение одной ноды при консистентном хешировании
- описать в `INSTALL_TASK4.md` специфику сборки приложений и запуска интеграционных тестов
- описать в `README_TASK4.md` что было реализовано, описание проблем, не решенных в коде и требующих дальнейшего
рассмотрения, неявных моментов. Обязательно добавить название и список участников команды.
- прислать PR {your-awesome-team-fork-repo}/csc-bdse-task4 => {your-awesome-team-fork-repo}/master (добавить alesavin,
dormidon, semkagtn, 747mmHg в ревьюеры)
- добавить ссылку на PR в топик задания 4 курса на https://compscicenter.ru

### Дополнительно:
- реализация партицированного контроллера, работающего над реплицированными контроллерами из задания 3.
Предполагается репликация самих партиций.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ru.csc.bdse.kv;

import org.junit.Test;

/**
* Test have to be implemented
*
* @author alesavin
*/
public abstract class AbstractPartitionedKeyValueApiHttpClientTest {

/*
protected abstract KeyValueApi newCluster1();
protected abstract KeyValueApi newCluster2();
// Stream.generate(() -> RandomStringUtils.randomAlphanumeric(10)).limit(1000).collect(Collectors.toSet());
protected abstract Set<String> keys();
protected abstract float expectedKeysLossProportion();
protected abstract float expectedUndeletedKeysProportion();
private KeyValueApi cluster1 = newCluster1();
private KeyValueApi cluster2 = newCluster2();
private Set<String> keys = keys();
*/

@Test
public void put1000KeysAndReadItCorrectlyOnCluster1() {
// TODO put 1000 keys to storage ant read it
}

@Test
public void readKeysFromCluster2AndCheckLossProportion() {
// TODO read all keys from cluster2 and made some statistics (related to expectedKeysLossProportion)
}

@Test
public void deleteAllKeysFromCluster2() {
// TODO try to delete all keys on cluster2
}

@Test
public void readKeysFromCluster1AfterDeletionAtCluster2() {
// TODO read all keys from cluster1, made some statistics (related to expectedUndeletedKeysProportion)
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ru.csc.bdse.partitioning;


import java.util.Collection;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;

/**
* Represents consistent hashing circle
* See https://web.archive.org/web/20120605030524/http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html
*
* @author alesavin
*/
public class ConsistentHash {

private final Function<String, Integer> hashFunction;
private final int factor;
private final SortedMap<Integer, String> circle = new TreeMap<>();

public ConsistentHash(final Function<String, Integer> hashFunction,
final Set<String> nodes) {
this(hashFunction, nodes, 1);
}

public ConsistentHash(final Function<String, Integer> hashFunction,
final Set<String> nodes,
int factor) {
this.hashFunction = hashFunction;
this.factor = factor;

for (String node : nodes) {
add(node);
}
}

private void add(String node) {
for (int i = 0; i < factor; i++) {
circle.put(hashFunction.apply(node + i), node);
}
}

public void remove(String node) {
for (int i = 0; i < factor; i++) {
circle.remove(hashFunction.apply(node + i));
}
}

public String get(String key) {
if (circle.isEmpty()) {
throw new IllegalStateException("ConsistentHash circle is empty");
}
int hash = hashFunction.apply(key);
if (!circle.containsKey(hash)) {
SortedMap<Integer, String> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ru.csc.bdse.partitioning;

import java.util.Set;

/**
* Selects partition by consistent hashing circle
*
* @author alesavin
*/
public class ConsistentHashMd5Partitioner implements Partitioner {

private final ConsistentHash ch;

public ConsistentHashMd5Partitioner(Set<String> partitions) {
this.ch = new ConsistentHash(HashingFunctions.md5Function, partitions);
}

@Override
public String getPartition(String key) {
return ch.get(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ru.csc.bdse.partitioning;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

/**
* Selects partition by first letter of a key
*
* @author alesavin
*/
public class FirstLetterPartitioner implements Partitioner {

private final List<String> partitions;

public FirstLetterPartitioner(Set<String> partitions) {
List<String> list = new ArrayList<>(partitions);
Collections.sort(list);
this.partitions = list;
}

@Override
public String getPartition(String key) {
int index = key.charAt(0) * partitions.size() / ((int)Character.MAX_VALUE + 1);
return partitions.get(index);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ru.csc.bdse.partitioning;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.function.Function;

/**
* Provide some hashing for strings
*
* @author alesavin
*/
public final class HashingFunctions {

public final static Function<String, Integer> hashCodeFunction =
String::hashCode;

public final static Function<String, Integer> md5Function = (String s) -> {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(s.getBytes());
byte[] bytes = md.digest();
return (bytes[0] & 0xFF)
| ((bytes[1] & 0xFF) << 8)
| ((bytes[2] & 0xFF) << 16)
| ((bytes[3] & 0xFF) << 24);
} catch (NoSuchAlgorithmException e) {
return 0;
}
};

private static final int DEFAULT_SEED = 104729;
// Constants for 32 bit variant
private static final int C1_32 = 0xcc9e2d51;
private static final int C2_32 = 0x1b873593;
private static final int R1_32 = 15;
private static final int R2_32 = 13;
private static final int M_32 = 5;
private static final int N_32 = 0xe6546b64;

/**
* Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
*
* 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
*/
public final static Function<String, Integer> murmur3Function = (String s) -> {
byte[] data = s.getBytes();
int length = data.length;

int hash = DEFAULT_SEED;
final int nblocks = length >> 2;

// body
for (int i = 0; i < nblocks; i++) {
int i_4 = i << 2;
int k = (data[i_4] & 0xff)
| ((data[i_4 + 1] & 0xff) << 8)
| ((data[i_4 + 2] & 0xff) << 16)
| ((data[i_4 + 3] & 0xff) << 24);

// mix functions
k *= C1_32;
k = Integer.rotateLeft(k, R1_32);
k *= C2_32;
hash ^= k;
hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
}

// tail
int idx = nblocks << 2;
int k1 = 0;
switch (length - idx) {
case 3:
k1 ^= data[idx + 2] << 16;
case 2:
k1 ^= data[idx + 1] << 8;
case 1:
k1 ^= data[idx];

// mix functions
k1 *= C1_32;
k1 = Integer.rotateLeft(k1, R1_32);
k1 *= C2_32;
hash ^= k1;
}

// finalization
hash ^= length;
hash ^= (hash >>> 16);
hash *= 0x85ebca6b;
hash ^= (hash >>> 13);
hash *= 0xc2b2ae35;
hash ^= (hash >>> 16);

return hash;
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ru.csc.bdse.partitioning;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

/**
* Selects partition by mod N
*
* @author alesavin
*/
public class ModNPartitioner implements Partitioner {

private final List<String> partitions;

public ModNPartitioner(Set<String> partitions) {
List<String> list = new ArrayList<>(partitions);
Collections.sort(list);
this.partitions = list;
}

@Override
public String getPartition(String key) {
int index = Math.abs(key.hashCode() % partitions.size());
return partitions.get(index);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ru.csc.bdse.partitioning;

/**
* Maps key to partition
*
* @author alesavin
*/
public interface Partitioner {

/**
* Selects right partition for the key
*/
String getPartition(String key);
}
Loading

0 comments on commit a889056

Please sign in to comment.