-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkafka.install
114 lines (106 loc) · 2.95 KB
/
kafka.install
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
<?php
/**
* @file
* Installer for Kafka module.
*/
use Drupal\Core\Link;
/**
* Implements hook_requirements().
*/
function kafka_requirements($phase) {
$ret = [
'title' => 'Kafka',
'value' => Link::createFromRoute(t('A-OK'), 'kafka.report'),
'severity' => REQUIREMENT_OK,
];
$ret = ['kafka' => $ret];
return $ret;
}
/**
* Implements hook_schema().
*/
function kafka_schema() {
$schema['kafka_queue'] = [
'description' => 'The DB queues decorating Kafka topics to support releasing items',
'fields' => [
'name' => [
'description' => 'The topic/queue machine name',
'type' => 'varchar_ascii',
// https://github.com/apache/kafka/commit/ad3dfc6a
'length' => 249,
'not null' => TRUE,
'default' => '',
],
// Kafka prevents short term re-creation of deleted topics.
'deleted' => [
'description' => 'Is the topic is deleted in Kafka ?',
'type' => 'int',
'size' => 'tiny',
'not null' => TRUE,
'default' => 0,
],
],
'primary key' => ['name'],
];
$schema['kafka_item'] = [
'description' => 'The items fetched from topics.',
'fields' => [
'queue' => $schema['kafka_queue']['fields']['name'],
// https://github.com/apache/kafka/blob/0.10.1/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
'partition_id' => [
'description' => 'The Kafka partition number',
'type' => 'int',
'size' => 'normal',
'not null' => TRUE,
'default' => 0,
],
// https://github.com/apache/kafka/blob/0.10.1/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
'offset' => [
'description' => 'The Kafka item offset within a partition',
'type' => 'int',
'size' => 'big',
'not null' => TRUE,
'default' => 0,
],
'item_id' => [
'description' => 'The UUID of the item',
'type' => 'varchar_ascii',
// See \Drupal\Component\Uuid\Uuid::VALID_PATTERN.
'length' => 36,
'not null' => TRUE,
],
'created' => [
'description' => 'The item creation timestamp',
'type' => 'int',
'size' => 'normal',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 0,
],
'expires' => [
'description' => 'The item expiration timestamp',
'type' => 'int',
'size' => 'normal',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 0,
],
'data' => [
'description' => 'The serialized item contents',
'type' => 'text',
'size' => 'normal',
],
],
'primary key' => ['queue', 'partition_id', 'offset'],
'unique keys' => [
'item_id' => ['item_id'],
],
'foreign keys' => [
'queue_name' => [
'table' => 'kafka_queue',
'columns' => ['queue_name' => 'name'],
],
],
];
return $schema;
}