Skip to content

Commit

Permalink
feat: make possible to use ClickHouse HTTP interface (#27273)
Browse files Browse the repository at this point in the history
  • Loading branch information
orian authored Jan 7, 2025
1 parent 90057e4 commit e823bf9
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 12 deletions.
93 changes: 92 additions & 1 deletion posthog/clickhouse/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from enum import Enum
from functools import cache

from clickhouse_connect import get_client
from clickhouse_connect.driver import Client as HttpClient, httputil
from clickhouse_driver import Client as SyncClient
from clickhouse_pool import ChPool
from django.conf import settings
Expand All @@ -19,7 +21,96 @@ class Workload(Enum):
_default_workload = Workload.ONLINE


def get_pool(workload: Workload, team_id=None, readonly=False):
class ProxyClient:
def __init__(self, client: HttpClient):
self._client = client

def execute(
self,
query,
params=None,
with_column_types=False,
external_tables=None,
query_id=None,
settings=None,
types_check=False,
columnar=False,
):
if query_id:
settings["query_id"] = query_id
result = self._client.query(query=query, parameters=params, settings=settings, column_oriented=columnar)

# we must play with result summary here
written_rows = int(result.summary.get("written_rows", 0))
if written_rows > 0:
return written_rows
if with_column_types:
column_types_driver_format = list(zip(result.column_names, result.column_types))
return result.result_set, column_types_driver_format
return result.result_set

# Implement methods for session managment: https://peps.python.org/pep-0343/ so ProxyClient can be used in all places a clickhouse_driver.Client is.
def __enter__(self):
return self

def __exit__(self, *args):
pass


_clickhouse_http_pool_mgr = httputil.get_pool_manager(
maxsize=settings.CLICKHOUSE_CONN_POOL_MAX, # max number of open connection per pool
block=True, # makes the maxsize limit per pool, keeps connections
num_pools=12, # number of pools
)


def get_http_client(**overrides):
kwargs = {
"host": settings.CLICKHOUSE_HOST,
"database": settings.CLICKHOUSE_DATABASE,
"secure": settings.CLICKHOUSE_SECURE,
"username": settings.CLICKHOUSE_USER,
"password": settings.CLICKHOUSE_PASSWORD,
"ca_cert": settings.CLICKHOUSE_CA,
"verify": settings.CLICKHOUSE_VERIFY,
"settings": {"mutations_sync": "1"} if settings.TEST else {},
# Without this, OPTIMIZE table and other queries will regularly run into timeouts
"send_receive_timeout": 30 if settings.TEST else 999_999_999,
"autogenerate_session_id": True, # beware, this makes each query to run in a separate session - no temporary tables will work
"pool_mgr": _clickhouse_http_pool_mgr,
**overrides,
}
return ProxyClient(get_client(**kwargs))


def get_client_from_pool(workload: Workload = Workload.DEFAULT, team_id=None, readonly=False):
"""
Returns the client for a given workload.
The connection pool for HTTP is managed by a library.
"""
if settings.CLICKHOUSE_USE_HTTP:
if team_id is not None and str(team_id) in settings.CLICKHOUSE_PER_TEAM_SETTINGS:
return get_http_client(**settings.CLICKHOUSE_PER_TEAM_SETTINGS[str(team_id)])

# Note that `readonly` does nothing if the relevant vars are not set!
if readonly and settings.READONLY_CLICKHOUSE_USER is not None and settings.READONLY_CLICKHOUSE_PASSWORD:
return get_http_client(
username=settings.READONLY_CLICKHOUSE_USER,
password=settings.READONLY_CLICKHOUSE_PASSWORD,
)

if (
workload == Workload.OFFLINE or workload == Workload.DEFAULT and _default_workload == Workload.OFFLINE
) and settings.CLICKHOUSE_OFFLINE_CLUSTER_HOST is not None:
return get_http_client(host=settings.CLICKHOUSE_OFFLINE_CLUSTER_HOST, verify=False)

return get_http_client()

return get_pool(workload=workload, team_id=team_id, readonly=readonly).get_client()


def get_pool(workload: Workload = Workload.DEFAULT, team_id=None, readonly=False):
"""
Returns the right connection pool given a workload.
Expand Down
5 changes: 3 additions & 2 deletions posthog/clickhouse/client/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from clickhouse_driver import Client as SyncClient
from django.conf import settings as app_settings

from posthog.clickhouse.client.connection import Workload, get_pool
from posthog.clickhouse.client.connection import Workload, get_client_from_pool
from posthog.clickhouse.client.escape import substitute_params
from posthog.clickhouse.query_tagging import get_query_tag_value, get_query_tags
from posthog.errors import wrap_query_error
Expand Down Expand Up @@ -121,7 +121,7 @@ def sync_execute(
if get_query_tag_value("id") == "posthog.tasks.tasks.process_query_task":
workload = Workload.ONLINE

with sync_client or get_pool(workload, team_id, readonly).get_client() as client:
with sync_client or get_client_from_pool(workload, team_id, readonly) as client:
start_time = perf_counter()

prepared_sql, prepared_args, tags = _prepare_query(client=client, query=query, args=args, workload=workload)
Expand All @@ -137,6 +137,7 @@ def sync_execute(
settings = {
**core_settings,
"log_comment": json.dumps(tags, separators=(",", ":")),
"query_id": query_id,
}

try:
Expand Down
6 changes: 3 additions & 3 deletions posthog/clickhouse/client/test/test_execute_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,12 @@ def test_client_strips_comments_from_request(self):
# request routing information for debugging purposes
self.assertIn(f"/* user_id:{self.user_id} request:1 */", first_query)

@patch("posthog.clickhouse.client.execute.get_pool")
def test_offline_workload_if_personal_api_key(self, mock_get_pool):
@patch("posthog.clickhouse.client.execute.get_client_from_pool")
def test_offline_workload_if_personal_api_key(self, mock_get_client):
from posthog.clickhouse.query_tagging import tag_queries

with self.capture_select_queries():
tag_queries(kind="request", id="1", access_method="personal_api_key")
sync_execute("select 1")

self.assertEqual(mock_get_pool.call_args[0][0], Workload.OFFLINE)
self.assertEqual(mock_get_client.call_args[0][0], Workload.OFFLINE)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from infi.clickhouse_orm import migrations

from posthog.clickhouse.client.connection import ch_pool
from posthog.clickhouse.client.connection import get_client_from_pool
from posthog.settings import CLICKHOUSE_CLUSTER


Expand All @@ -22,7 +22,7 @@


def add_columns_to_required_tables(_):
with ch_pool.get_client() as client:
with get_client_from_pool() as client:
client.execute(ADD_COLUMNS_SHARDED_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))

client.execute(ADD_COLUMNS_EVENTS.format(table="events", cluster=CLICKHOUSE_CLUSTER))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from infi.clickhouse_orm import migrations

from posthog.clickhouse.client.connection import ch_pool
from posthog.clickhouse.client.connection import get_client_from_pool
from posthog.settings import CLICKHOUSE_CLUSTER


Expand All @@ -16,7 +16,7 @@


def add_columns_to_required_tables(_):
with ch_pool.get_client() as client:
with get_client_from_pool() as client:
client.execute(DROP_COLUMNS_SHARDED_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
client.execute(ADD_COLUMNS_SHARDED_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from infi.clickhouse_orm import migrations

from posthog.clickhouse.client.connection import ch_pool
from posthog.clickhouse.client.connection import get_client_from_pool
from posthog.settings import CLICKHOUSE_CLUSTER


Expand All @@ -21,7 +21,7 @@


def add_columns_to_required_tables(_):
with ch_pool.get_client() as client:
with get_client_from_pool() as client:
client.execute(DROP_COLUMNS_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
client.execute(DROP_COLUMNS_EVENTS.format(table="events", cluster=CLICKHOUSE_CLUSTER))
client.execute(ADD_COLUMNS_EVENTS.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
Expand Down
1 change: 1 addition & 0 deletions posthog/settings/data_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def postgres_config(host: str) -> dict:
CLICKHOUSE_OFFLINE_CLUSTER_HOST: str | None = os.getenv("CLICKHOUSE_OFFLINE_CLUSTER_HOST", None)
CLICKHOUSE_USER: str = os.getenv("CLICKHOUSE_USER", "default")
CLICKHOUSE_PASSWORD: str = os.getenv("CLICKHOUSE_PASSWORD", "")
CLICKHOUSE_USE_HTTP: str = get_from_env("CLICKHOUSE_USE_HTTP", False, type_cast=str_to_bool)
CLICKHOUSE_DATABASE: str = CLICKHOUSE_TEST_DB if TEST else os.getenv("CLICKHOUSE_DATABASE", "default")
CLICKHOUSE_CLUSTER: str = os.getenv("CLICKHOUSE_CLUSTER", "posthog")
CLICKHOUSE_CA: str | None = os.getenv("CLICKHOUSE_CA", None)
Expand Down
4 changes: 4 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ googleapis-common-protos==1.60.0
# via
# -c requirements.txt
# opentelemetry-exporter-otlp-proto-grpc
greenlet==3.1.1
# via
# -c requirements.txt
# sqlalchemy
grpcio==1.63.2
# via
# -c requirements.txt
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ boto3==1.28.16
brotli==1.1.0
celery==5.3.4
celery-redbeat==2.1.1
clickhouse-connect==0.8.11
clickhouse-driver==0.2.7
clickhouse-pool==0.5.3
conditional-cache==1.2
Expand Down
11 changes: 11 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ celery-redbeat==2.1.1
# via -r requirements.in
certifi==2019.11.28
# via
# clickhouse-connect
# httpcore
# httpx
# requests
Expand Down Expand Up @@ -110,6 +111,8 @@ click-plugins==1.1.1
# via celery
click-repl==0.3.0
# via celery
clickhouse-connect==0.8.11
# via -r requirements.in
clickhouse-driver==0.2.7
# via
# -r requirements.in
Expand Down Expand Up @@ -270,6 +273,8 @@ googleapis-common-protos==1.60.0
# via
# google-api-core
# grpcio-status
greenlet==3.1.1
# via sqlalchemy
grpcio==1.63.2
# via
# -r requirements.in
Expand Down Expand Up @@ -378,6 +383,8 @@ lxml==4.9.4
# toronado
# xmlsec
# zeep
lz4==4.3.3
# via clickhouse-connect
lzstring==1.0.4
# via -r requirements.in
makefun==1.15.2
Expand Down Expand Up @@ -562,6 +569,7 @@ python3-saml==1.12.0
pytz==2023.3
# via
# -r requirements.in
# clickhouse-connect
# clickhouse-driver
# dlt
# infi-clickhouse-orm
Expand Down Expand Up @@ -776,6 +784,7 @@ uritemplate==4.1.1
urllib3==1.26.18
# via
# botocore
# clickhouse-connect
# geoip2
# google-auth
# pdpyras
Expand Down Expand Up @@ -811,6 +820,8 @@ yarl==1.18.3
# via aiohttp
zeep==4.2.1
# via simple-salesforce
zstandard==0.23.0
# via clickhouse-connect
zstd==1.5.5.1
# via -r requirements.in
zxcvbn==4.4.28
Expand Down

0 comments on commit e823bf9

Please sign in to comment.