Debezium and TimescaleDB
2024-01-11
debezium, features, connectors, postgres, timescaledb
jpechane
+In this article, we are going to present and demonstrate a new feature delivered in Debezium 2.4 - the integration with the TimescaleDB database.
+link:https://github.com/timescale/timescaledb[TimescaleDB] is an open-source database designed to make SQL scalable for time-series data.
+It is implemented as an extension for the PostgreSQL database.
+This fact leads us to re-use the standard link:https://debezium.io/documentation/reference/stable/connectors/postgresql.html[Debezium PostgreSQL connector] and implement TimescaleDB support as a link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transform (SMT)].
+TimescaleDB provides three basic building blocks/concepts:
+* Hypertables
+* Continuous aggregates
+* Compression
+Metadata (catalog) that describes the definitions of the instances and the raw data are typically stored in `_timescaledb_internal_schema`.
+link:https://debezium.io/documentation/reference/stable/transformations/timescaledb.html[TimescaleDb SMT] connects to the database and reads and processes the metadata.
+The raw messages read from the database are then enriched with the metadata stored in Kafka Connect headers, creating the relation between the physical data and the TimescaleDB logical constructs.
+== Demonstration
+Debezium link:https://github.com/debezium/debezium-examples/tree/main/tutorial[examples repository] contains a Docker Compose-based deployment that provides a full environment to demonstrate the TimescaleDB integration.
+The first step is to start the deployment
+[source, bash]
+$ docker-compose -f docker-compose-timescaledb.yaml up --build
+The command will bring up Debezium (Zookeeper, Kafka, Kafka Connect) and the source TimescaleDB database.
+The started database is primed with the following database objects:
+* Hypertable `conditions` representing temperature and humidity measurements as time-series data; created with DDL `CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')`
+* A single record of the measurement data (`INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8,  53.3)`)
+* PostgreSQL publication used to publish time-series data into replication slot as the demo uses `pgoutput` decoding plugin (`CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')`)
+In the next step it is necessary to register the Debezium PostgreSQL connector to capture the changes in the database
+[source, bash]
+$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml
+The registration request file differs from the regular one with the addition of these lines
+[source, json]
+    "name": "inventory-connector",
+    "config": {
+        "schema.include.list": "_timescaledb_internal",
+        "transforms": "timescaledb",
+        "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
+        "transforms.timescaledb.database.hostname": "timescaledb",
+        "transforms.timescaledb.database.port": "5432",
+        "transforms.timescaledb.database.user": "postgres",
+        "transforms.timescaledb.database.password": "postgres",
+        "transforms.timescaledb.database.dbname": "postgres"
+    }
+=== Hypertables
+The connector will capture the internal TimescaleDB schema with the physical tables containing the raw data and the `TimescaleDb` SMT will be applied to enrich messages and route them to the correctly named topics based on the logical names.
+The SMT configuration options contain information needed to connect to the database.
+In this case, the `conditions` hypertable will be physically stored in `_timescaledb_internal._hyper_1_1_chunk` and when processed by the SMT, it will be re-routed to `timescaledb.public.conditions` topic that is named according to fixed configured prefix `timescaledb` and logical name `public.conditions` that conforms to the hypertable name.
+Let's add a few more measurements to the table
+[source, bash]
+$ docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
+postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
+postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
+postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);
+and read the captured messages for the topic (printing of key and headers is enabled in the command)
+[source, bash]
+docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
+    --bootstrap-server kafka:9092 \
+    --from-beginning \
+    --property print.key=true \
+    --property print.headers=true \
+    --topic timescaledb.public.conditions
+The messages contain two headers `__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describe the mapping between the logical hypertable name and the physical source table from which they were captured.
+=== Continuous aggregates
+Continuous aggregates provide automatic statistical calculations over data that is stored in hypertables.
+The aggregate is defined as a materialized view that is backed by its own hypertable which in turn is backed by a set of physical tables.
+After an aggregate is recalculated (either manually or automatically), the new values are stored in the hypertable, from which they can be captured and streamed.
+The connector captures the new values in the physical tables and the SMT again solves the routing by remapping the physical destination back into the aggregate logical name.
+Kafka Connect headers with original hypertable and physical table names are added too.
+Let's create a continuous aggregate named `conditions_summary` that calculates the average, minimum, and maximum temperature per location and time interval
+[source, bash]
+postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
+    location,
+    time_bucket(INTERVAL '1 hour', time) AS bucket,
+    AVG(temperature),
+    MAX(temperature),
+    MIN(temperature)
+  FROM conditions
+  GROUP BY location, bucket;
+and read the captured messages for the topic
+[source, bash]
+docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
+    --bootstrap-server kafka:9092 \
+    --from-beginning \
+    --property print.key=true \
+    --property print.headers=true \
+    --topic timescaledb.public.conditions_summary
+The messages contain two headers `__debezium_timescaledb_hypertable_table:_materialized_hypertable_2,__debezium_timescaledb_hypertable_schema:_timescaledb_internal` that expose which backing hypertable was used to store the aggregates and two addtional headers `__debezium_timescaledb_chunk_table:_hyper_2_2_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that exposes the physical table in which the aggregate was stored.
+ `__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.
+If a new measurement is added and aggregate recomputation is triggered then an updated aggregate is emitted to the topic
+[source, bash]
+postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
+postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);
+that looks like
+[source, bash]
+   "schema":{
+   },
+   "payload":{
+      "before":null,
+      "after":{
+         "location":"Ostrava",
+         "bucket":"2024-01-09T13:00:00.000000Z",
+         "avg":10.0,
+         "max":10.0,
+         "min":10.0
+      },
+      "source":{
+         "version":"2.5.0.Final",
+         "connector":"postgresql",
+         "name":"dbserver1",
+         "ts_ms":1704806938840,
+         "snapshot":"false",
+         "db":"postgres",
+         "sequence":"[\"29727872\",\"29728440\"]",
+         "schema":"public",
+         "table":"conditions_summary",
+         "txId":764,
+         "lsn":29728440,
+         "xmin":null
+      },
+      "op":"c",
+      "ts_ms":1704806939163,
+      "transaction":null
+   }
+So the topic contains two or more messages calculated for two different locations.
+=== Compression
+The TimescaleDB SMT does not enhance compressed chunks of data (physical table records), only as a by-product of them being stored in a hypertable.
+The compressed data is captured and stored in the Kafka topic.
+Typically, messages with compressed chunks are dropped and are not processed by subsequent jobs in the pipeline.
+Let's enable compression for the hypertable and compress it
+[source, bash]
+postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
+postgres=# SELECT show_chunks('conditions');
+              show_chunks               
+ _timescaledb_internal._hyper_1_1_chunk
+(1 row)
+postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
+Messages are written to `timescaledb._timescaledb_internal._compressed_hypertable_3`.
+Tear down the environment
+[source, bash]
+docker-compose -f docker-compose-timescaledb.yaml down
+== Conclusion
+In this post, we have demonstrated the capturing of data from TimescaleDB time-series database and their processing by the TimescaleDb SMT.
+We have shown how messages are routed and enriched depending on hypertables and continuous aggregates acting as the source of data.