Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blogpost about TimescaleDB integration #990

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions _posts/2024-01-11-Debezium-and-TimescaleDB.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
---
layout: post
title: "Debezium and TimescaleDB"
date: 2024-01-11 11:11:11 +0100
tags: [ debezium, features, connectors, postgres, timescaledb ]
featured: true
author: jpechane
---
In this article, we are going to present and demonstrate a new feature delivered in Debezium 2.4 - the integration with the TimescaleDB database.

link:https://github.com/timescale/timescaledb[TimescaleDB] is an open-source database designed to make SQL scalable for time-series data.
It is implemented as an extension for the PostgreSQL database.
This fact leads us to re-use the standard link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html[Debezium PostgreSQL connector] and implement TimescaleDB support as a link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transform (SMT)].

+++<!-- more -->+++

TimescaleDB provides three basic building blocks/concepts:

* Hypertables
* Continuous aggregates
* Compression

Metadata (catalog) that describes the definitions of the instances and the raw data are typically stored in `_timescaledb_internal_schema`.
link:https://debezium.io/documentation/reference/stable/transformations/timescaledb.html[TimescaleDb SMT] connects to the database and reads and processes the metadata.
The raw messages read from the database are then enriched with the metadata stored in Kafka Connect headers, creating the relation between the physical data and the TimescaleDB logical constructs.

== Demonstration

Debezium link:https://github.com/debezium/debezium-examples/tree/main/tutorial[examples repository] contains a Docker Compose-based deployment that provides a full environment to demonstrate the TimescaleDB integration.

The first step is to start the deployment

[source, bash]
----
$ docker-compose -f docker-compose-timescaledb.yaml up --build
----

The command will bring up Debezium (Zookeeper, Kafka, Kafka Connect) and the source TimescaleDB database.

The started database is primed with the following database objects:

* Hypertable `conditions` representing temperature and humidity measurements as time-series data; created with DDL `CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')`
* A single record of the measurement data (`INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)`)
* PostgreSQL publication used to publish time-series data into replication slot as the demo uses `pgoutput` decoding plugin (`CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')`)

In the next step it is necessary to register the Debezium PostgreSQL connector to capture the changes in the database

[source, bash]
----
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml
----

The registration request file differs from the regular one with the addition of these lines

[source, json]
----
{
"name": "inventory-connector",
"config": {
...
"schema.include.list": "_timescaledb_internal",
"transforms": "timescaledb",
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
"transforms.timescaledb.database.hostname": "timescaledb",
"transforms.timescaledb.database.port": "5432",
"transforms.timescaledb.database.user": "postgres",
"transforms.timescaledb.database.password": "postgres",
"transforms.timescaledb.database.dbname": "postgres"
}
}
----

=== Hypertables

The connector will capture the internal TimescaleDB schema with the physical tables containing the raw data and the `TimescaleDb` SMT will be applied to enrich messages and route them to the correctly named topics based on the logical names.
The SMT configuration options contain information needed to connect to the database.
In this case, the `conditions` hypertable will be physically stored in `_timescaledb_internal._hyper_1_1_chunk` and when processed by the SMT, it will be re-routed to `timescaledb.public.conditions` topic that is named according to fixed configured prefix `timescaledb` and logical name `public.conditions` that conforms to the hypertable name.

Let's add a few more measurements to the table

[source, bash]
----
$ docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);
----

and read the captured messages for the topic (printing of key and headers is enabled in the command)
[source, bash]
----
docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property print.headers=true \
--topic timescaledb.public.conditions
----

The messages contain two headers `__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describe the mapping between the logical hypertable name and the physical source table from which they were captured.

=== Continuous aggregates

Continuous aggregates provide automatic statistical calculations over data that is stored in hypertables.
The aggregate is defined as a materialized view that is backed by its own hypertable which in turn is backed by a set of physical tables.
After an aggregate is recalculated (either manually or automatically), the new values are stored in the hypertable, from which they can be captured and streamed.
The connector captures the new values in the physical tables and the SMT again solves the routing by remapping the physical destination back into the aggregate logical name.
Kafka Connect headers with original hypertable and physical table names are added too.

Let's create a continuous aggregate named `conditions_summary` that calculates the average, minimum, and maximum temperature per location and time interval

[source, bash]
----
postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket;
----

and read the captured messages for the topic
[source, bash]
----
docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property print.headers=true \
--topic timescaledb.public.conditions_summary
----

The messages contain two headers `__debezium_timescaledb_hypertable_table:_materialized_hypertable_2,__debezium_timescaledb_hypertable_schema:_timescaledb_internal` that expose which backing hypertable was used to store the aggregates and two addtional headers `__debezium_timescaledb_chunk_table:_hyper_2_2_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that exposes the physical table in which the aggregate was stored.

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

If a new measurement is added and aggregate recomputation is triggered then an updated aggregate is emitted to the topic


[source, bash]
----
postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);
----

that looks like


[source, bash]
----
{
"schema":{
...
},
"payload":{
"before":null,
"after":{
"location":"Ostrava",
"bucket":"2024-01-09T13:00:00.000000Z",
"avg":10.0,
"max":10.0,
"min":10.0
},
"source":{
"version":"2.5.0.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1704806938840,
"snapshot":"false",
"db":"postgres",
"sequence":"[\"29727872\",\"29728440\"]",
"schema":"public",
"table":"conditions_summary",
"txId":764,
"lsn":29728440,
"xmin":null
},
"op":"c",
"ts_ms":1704806939163,
"transaction":null
}
}
----

So the topic contains two or more messages calculated for two different locations.

=== Compression

The TimescaleDB SMT does not enhance compressed chunks of data (physical table records), only as a by-product of them being stored in a hypertable.
The compressed data is captured and stored in the Kafka topic.
Typically, messages with compressed chunks are dropped and are not processed by subsequent jobs in the pipeline.

Let's enable compression for the hypertable and compress it
[source, bash]
----
postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');
show_chunks
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)

postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
----
Messages are written to `timescaledb._timescaledb_internal._compressed_hypertable_3`.
Tear down the environment
[source, bash]
----
docker-compose -f docker-compose-timescaledb.yaml down
----
== Conclusion
In this post, we have demonstrated the capturing of data from TimescaleDB time-series database and their processing by the TimescaleDb SMT.
We have shown how messages are routed and enriched depending on hypertables and continuous aggregates acting as the source of data.
Loading