Skip to content

Commit

Permalink
refactor: Redshift batch export uses spmc consumer (#26897)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Jan 2, 2025
1 parent c916c96 commit c53f931
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 278 deletions.
4 changes: 4 additions & 0 deletions posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
BATCH_EXPORT_BIGQUERY_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
"BATCH_EXPORT_BIGQUERY_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 0, type_cast=int
)
BATCH_EXPORT_REDSHIFT_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 8 # 8MB
BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
"BATCH_EXPORT_REDSHIFT_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES", 1024 * 1024 * 300, type_cast=int
)
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_HTTP_BATCH_SIZE: int = 5000
BATCH_EXPORT_BUFFER_QUEUE_MAX_SIZE_BYTES: int = 1024 * 1024 * 300 # 300MB
Expand Down
32 changes: 20 additions & 12 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
Consumer,
Producer,
RecordBatchQueue,
run_consumer_loop,
run_consumer,
wait_for_schema_or_producer,
)
from posthog.temporal.batch_exports.temporary_file import (
Expand Down Expand Up @@ -519,12 +519,19 @@ def __init__(
heartbeater: Heartbeater,
heartbeat_details: BigQueryHeartbeatDetails,
data_interval_start: dt.datetime | str | None,
data_interval_end: dt.datetime | str,
writer_format: WriterFormat,
bigquery_client: BigQueryClient,
bigquery_table: bigquery.Table,
table_schema: list[BatchExportField],
table_schema: list[bigquery.SchemaField],
):
super().__init__(heartbeater, heartbeat_details, data_interval_start, writer_format)
super().__init__(
heartbeater=heartbeater,
heartbeat_details=heartbeat_details,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
writer_format=writer_format,
)
self.bigquery_client = bigquery_client
self.bigquery_table = bigquery_table
self.table_schema = table_schema
Expand Down Expand Up @@ -629,11 +636,10 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
)
records_completed = 0

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return records_completed
return 0

record_batch_schema = pa.schema(
# NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other
Expand Down Expand Up @@ -700,21 +706,23 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
create=can_perform_merge,
delete=can_perform_merge,
) as bigquery_stage_table:
records_completed = await run_consumer_loop(
queue=queue,
consumer_cls=BigQueryConsumer,
producer_task=producer_task,
consumer = BigQueryConsumer(
heartbeater=heartbeater,
heartbeat_details=details,
data_interval_end=data_interval_end,
data_interval_start=data_interval_start,
schema=record_batch_schema,
writer_format=WriterFormat.PARQUET if can_perform_merge else WriterFormat.JSONL,
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=() if can_perform_merge else json_columns,
bigquery_client=bq_client,
bigquery_table=bigquery_stage_table if can_perform_merge else bigquery_table,
table_schema=stage_schema if can_perform_merge else schema,
)
records_completed = await run_consumer(
consumer=consumer,
queue=queue,
producer_task=producer_task,
schema=record_batch_schema,
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=() if can_perform_merge else json_columns,
writer_file_kwargs={"compression": "zstd"} if can_perform_merge else {},
multiple_files=True,
)
Expand Down
Loading

0 comments on commit c53f931

Please sign in to comment.