Skip to content

Commit

Permalink
chore(data-warehouse): Add a reset pipeline input to the external dat…
Browse files Browse the repository at this point in the history
…a job workflow (#27881)
  • Loading branch information
Gilbert09 authored Jan 24, 2025
1 parent 75a906d commit 2265e87
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
run_id=job_id,
schema_id=inputs.external_data_schema_id,
source_id=inputs.external_data_source_id,
reset_pipeline=inputs.reset_pipeline,
)

timeout_params = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from datetime import datetime
from dateutil import parser
from typing import Any
from typing import Any, Optional

from django.conf import settings
from django.db import close_old_connections
Expand Down Expand Up @@ -35,6 +35,7 @@ class ImportDataActivityInputs:
schema_id: uuid.UUID
source_id: uuid.UUID
run_id: str
reset_pipeline: Optional[bool] = None


def process_incremental_last_value(value: Any | None, field_type: IncrementalFieldType | None) -> Any | None:
Expand Down Expand Up @@ -92,7 +93,11 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs):

schema: ExternalDataSchema | None = model.schema
assert schema is not None
reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True

if inputs.reset_pipeline is not None:
reset_pipeline = inputs.reset_pipeline
else:
reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True

logger.debug(f"schema.sync_type_config = {schema.sync_type_config}")
logger.debug(f"reset_pipeline = {reset_pipeline}")
Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dataclasses
from typing import Optional
import uuid


Expand All @@ -9,3 +10,4 @@ class ExternalDataWorkflowInputs:
external_data_source_id: uuid.UUID
external_data_schema_id: uuid.UUID | None = None
billable: bool = True
reset_pipeline: Optional[bool] = None

0 comments on commit 2265e87

Please sign in to comment.