Skip to content

Commit

Permalink
Merge branch 'optimize-after'
Browse files Browse the repository at this point in the history
  • Loading branch information
akurdyukov committed Jan 24, 2024
2 parents 5d36e9b + 696abd2 commit 7ddfbb5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
45 changes: 23 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,30 @@ target-clickhouse --about --format=markdown
```
-->

| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sqlalchemy_url | False | None | The SQLAlchemy connection string for the ClickHouse database. Used if set, otherwise separate settings are used |
| driver | False | http | Driver type |
| username | False | default | Database user |
| password | False | None | Username password |
| host | False | localhost | Database host |
| port | False | 8123 | Database connection port |
| database | False | default | Database name |
| secure | False | 0 | Should the connection be secure |
| verify | False | 1 | Should secure connection need to verify SSL/TLS |
| engine_type | False | None | The engine type to use for the table. |
| table_name | False | None | The name of the table to write to. Defaults to stream name. |
| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sqlalchemy_url | False | None | The SQLAlchemy connection string for the ClickHouse database. Used if set, otherwise separate settings are used |
| driver | False | http | Driver type |
| username | False | default | Database user |
| password | False | None | Username password |
| host | False | localhost | Database host |
| port | False | 8123 | Database connection port |
| database | False | default | Database name |
| secure | False | 0 | Should the connection be secure |
| verify | False | 1 | Should secure connection need to verify SSL/TLS |
| engine_type | False | None | The engine type to use for the table. |
| table_name | False | None | The name of the table to write to. Defaults to stream name. |
| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information. Use `$table_name` to substitute the table name. |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| add_record_metadata | False | None | Add metadata to records. |
| load_method | False | TargetLoadMethods.APPEND_ONLY | The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| optimize_after | False | 0 | Run 'OPTIMIZE TABLE' after data insert |
| optimize_after | False | 0 | Run 'OPTIMIZE TABLE' after data insert. Useful whentable engine removes duplicate rows. |
| load_method | False | TargetLoadMethods.APPEND_ONLY | The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `target-clickhouse --about`

Expand Down
10 changes: 9 additions & 1 deletion target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ def bulk_insert_records(
if isinstance(value, (dict, list)):
record[key] = json.dumps(value)

return super().bulk_insert_records(full_table_name, schema, records)
res = super().bulk_insert_records(full_table_name, schema, records)

if self.config.get("optimize_after", False):
with self.connector._connect() as conn, conn.begin(): # noqa: SLF001
self.logger.info("Optimizing table: %s", self.full_table_name)
conn.execute(sqlalchemy.text(
f"OPTIMIZE TABLE {self.full_table_name}"))

return res

def activate_version(self, new_version: int) -> None:
"""Bump the active version of the target table.
Expand Down
8 changes: 8 additions & 0 deletions target_clickhouse/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ class TargetClickhouse(SQLTarget):
description="The default target database schema name to use for "
"all streams.",
),
th.Property(
"optimize_after",
th.BooleanType,
required=False,
default=False,
description="Run 'OPTIMIZE TABLE' after data insert. Useful when"
"table engine removes duplicate rows.",
),
).to_dict()

default_sink_class = ClickhouseSink
Expand Down
1 change: 1 addition & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"database": "default",
"secure": False,
"verify": False,
"optimize_after": True,
}

TEST_CONFIG_NATIVE: dict[str, t.Any] = {
Expand Down

0 comments on commit 7ddfbb5

Please sign in to comment.