diff --git a/_posts/2024-01-16-Debezium-and-TimescaleDB.adoc b/_posts/2024-01-16-Debezium-and-TimescaleDB.adoc new file mode 100644 index 00000000000..780c442906f --- /dev/null +++ b/_posts/2024-01-16-Debezium-and-TimescaleDB.adoc @@ -0,0 +1,220 @@ +--- +layout: post +title: "Debezium and TimescaleDB" +date: 2024-01-16 11:11:11 +0100 +tags: [ debezium, features, connectors, postgres, timescaledb ] +featured: true +author: jpechane +--- +In this article, we are going to present and demonstrate a new feature delivered in Debezium 2.4 - the integration with the TimescaleDB database. + +link:https://github.com/timescale/timescaledb[TimescaleDB] is an open-source database designed to make SQL scalable for time-series data. +It is implemented as an extension for the PostgreSQL database. +This fact leads us to re-use the standard link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html[Debezium PostgreSQL connector] and implement TimescaleDB support as a link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transform (SMT)]. + +++++++ + +TimescaleDB provides three basic building blocks/concepts: + +* Hypertables +* Continuous aggregates +* Compression + +Metadata (catalog) that describes the definitions of the instances and the raw data are typically stored in `_timescaledb_internal_schema`. +link:https://debezium.io/documentation/reference/stable/transformations/timescaledb.html[TimescaleDb SMT] connects to the database and reads and processes the metadata. +Based on them the raw messages read from the database are enriched with the metadata stored in Kafka Connect headers that create the relation between physical data and the TimescaleDB logical constructs. + +== Demonstration + +Debezium link:https://github.com/debezium/debezium-examples/tree/main/tutorial[examples repository] contains a Docker Compose-based deployment that provides a full environment to demonstrate the TimescaleDB integration. + +The first step is to start the deployment + +[source, bash] +---- +$ docker-compose -f docker-compose-timescaledb.yaml up --build +---- + +The command will bring up Debezium (Zookeeper, Kafka, Kafka Connect) and the source TimescaleDB database. + +The started database is primed with the following database objects: + +* Hypertable `conditions` representing temperature and humidity measurements as time-series data; created with DDL `CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')` +* A single record of the measurement data (`INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)`) +* PostgreSQL publication used to publish time-series data into replication slot as the demo uses `pgoutput` decoding plugin (`CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')`) + +In the next step it is necessary to register the Debezium PostgreSQL connector to capture the changes in the database + +[source, bash] +---- +$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml +---- + +The registration request file differs from the regular one with the addition of lines + +[source, json] +---- +{ + "name": "inventory-connector", + "config": { +... + "schema.include.list": "_timescaledb_internal", + "transforms": "timescaledb", + "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", + "transforms.timescaledb.database.hostname": "timescaledb", + "transforms.timescaledb.database.port": "5432", + "transforms.timescaledb.database.user": "postgres", + "transforms.timescaledb.database.password": "postgres", + "transforms.timescaledb.database.dbname": "postgres" + } +} +---- + +=== Hypertables + +The connector will capture the internal TimescaleDB schema with the physical tables containing the raw data and the `TimescaleDb` SMT will be applied to enrich messages and route them to correctly named topics based on the logical names. +The SMT configuration options contain information needed to connect to the database. +In this case, the `conditions` hypertable will be physically stored in `_timescaledb_internal._hyper_1_1_chunk` and when processed by the SMT it will be re-routed to `timescaledb.public.conditions` topic that is named according to fixed configured prefix `timescaledb` and logical name `public.conditions` that conforms to the hypertable name. + +Let's add a few more measurements to the table + +[source, bash] +---- +$ docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres' +postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50); +postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55); +postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60); +---- + +and read the captured messages for the topic (printing of key and headers is enabled in the command) +[source, bash] +---- +docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server kafka:9092 \ + --from-beginning \ + --property print.key=true \ + --property print.headers=true \ + --topic timescaledb.public.conditions +---- + +The messages contain two headers `__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describe the mapping between the logical hypertable name and the physical source table from which they were captured. + +=== Continuous aggregates + +Continuous aggregates provide automatic statistical calculations over data that is stored in hypertables. +The aggregate is defined as a materialized view that is backed by its own hypertable which in turn is backed by a set of physical tables. +After an aggregate is recalculated (either manually or automatically), the new values are stored in the hypertable, from which they can be captured and streamed. +The connector captures the new values in the physical tables and the SMT again solves the routing by remapping the physical destination back into the aggregate logical name. +Kafka Connect headers with original hypertable and physical table names are added too. + +Let's create a continuous aggregate named `conditions_summary` that calculates the average, minimum, and maximum temperature per location and time interval + +[source, bash] +---- +postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS + SELECT + location, + time_bucket(INTERVAL '1 hour', time) AS bucket, + AVG(temperature), + MAX(temperature), + MIN(temperature) + FROM conditions + GROUP BY location, bucket; +---- + +and read the captured messages for the topic +[source, bash] +---- +docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ + --bootstrap-server kafka:9092 \ + --from-beginning \ + --property print.key=true \ + --property print.headers=true \ + --topic timescaledb.public.conditions_summary +---- + +The messages contain two headers `__debezium_timescaledb_hypertable_table:_materialized_hypertable_2,__debezium_timescaledb_hypertable_schema:_timescaledb_internal` that expose which backing hypertable was used to store the aggregates and two addtional headers `__debezium_timescaledb_chunk_table:_hyper_2_2_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that exposes the physical table in which the aggregate was stored. + + `__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured. + +If a new measurement is added and aggregate recomputation is triggered then an updated aggregate is emitted to the topic + + +[source, bash] +---- +postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50); +postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1); +---- + +that looks like + + +[source, bash] +---- +{ + "schema":{ +... + }, + "payload":{ + "before":null, + "after":{ + "location":"Ostrava", + "bucket":"2024-01-09T13:00:00.000000Z", + "avg":10.0, + "max":10.0, + "min":10.0 + }, + "source":{ + "version":"2.5.0.Final", + "connector":"postgresql", + "name":"dbserver1", + "ts_ms":1704806938840, + "snapshot":"false", + "db":"postgres", + "sequence":"[\"29727872\",\"29728440\"]", + "schema":"public", + "table":"conditions_summary", + "txId":764, + "lsn":29728440, + "xmin":null + }, + "op":"c", + "ts_ms":1704806939163, + "transaction":null + } +} +---- + +So the topic contains two or more messages calculated for two different locations. + +=== Compression + +The TimescaleDB SMT does not enhance compressed chunks of data (physical table records), only as a by-product of them being stored in hypertable. +The compressed data are captured and stored in the Kafka topic. +Typically, messages with compressed chunks are dropped and are not processed by subsequent jobs in the pipeline. + +Let's enable compression for the hypertable and compress it +[source, bash] +---- +postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location'); +postgres=# SELECT show_chunks('conditions'); + show_chunks +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk +(1 row) + +postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk'); +---- + +Messages are written to `timescaledb._timescaledb_internal._compressed_hypertable_3`. + +Tear down the environment + +[source, bash] +---- +docker-compose -f docker-compose-timescaledb.yaml down +---- + +== Conclusion +In this post, we have demonstrated the capturing of data from TimescaleDB time-series database and their processing by TimescaleDb SMT. +We have shown how messages are routed and enriched depending on hypertables and continuous aggregates acting as the source of data.