Skip to content

Commit

Permalink
feat(data-warehouse): Allow the pipeline to take a billable input (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Jan 24, 2025
1 parent a489e2a commit 9eb35c4
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 3 deletions.
1 change: 1 addition & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
team_id=inputs.team_id,
schema_id=inputs.external_data_schema_id,
source_id=inputs.external_data_source_id,
billable=inputs.billable,
)

job_id, incremental, source_type = await workflow.execute_activity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class CreateExternalDataJobModelActivityInputs:
team_id: int
schema_id: uuid.UUID
source_id: uuid.UUID
billable: bool


def get_pipeline_version() -> str:
Expand Down Expand Up @@ -48,6 +49,12 @@ def create_external_data_job_model_activity(

pipeline_version = get_pipeline_version()

# Temp until V2 is rolled out
if inputs.billable is False:
billable = False
else:
billable = pipeline_version != ExternalDataJob.PipelineVersion.V2

job = ExternalDataJob.objects.create(
team_id=inputs.team_id,
pipeline_id=inputs.source_id,
Expand All @@ -57,7 +64,7 @@ def create_external_data_job_model_activity(
workflow_id=activity.info().workflow_id,
workflow_run_id=activity.info().workflow_run_id,
pipeline_version=pipeline_version,
billable=pipeline_version != ExternalDataJob.PipelineVersion.V2,
billable=billable,
)

schema = ExternalDataSchema.objects.get(team_id=inputs.team_id, id=inputs.schema_id)
Expand Down
19 changes: 19 additions & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def _run(
mock_data_response: Any,
sync_type: Optional[ExternalDataSchema.SyncType] = None,
sync_type_config: Optional[dict] = None,
billable: Optional[bool] = None,
):
source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
Expand All @@ -147,6 +148,7 @@ async def _run(
team_id=team.id,
external_data_source_id=source.pk,
external_data_schema_id=schema.id,
billable=billable if billable is not None else True,
)

await _execute_run(workflow_id, inputs, mock_data_response)
Expand Down Expand Up @@ -1271,3 +1273,20 @@ async def test_delete_table_on_reset(team, stripe_balance_transaction):

assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict)
assert "reset_pipeline" not in schema.sync_type_config.keys()


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_billable_job(team, stripe_balance_transaction):
workflow_id, inputs = await _run(
team=team,
schema_name="BalanceTransaction",
table_name="stripe_balancetransaction",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
mock_data_response=stripe_balance_transaction["data"],
billable=False,
)

run: ExternalDataJob = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=inputs.external_data_source_id)
assert run.billable is False
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_create_external_job_activity(activity_environment, team, **kwargs):
test_1_schema = _create_schema("test-1", new_source, team)

inputs = CreateExternalDataJobModelActivityInputs(
team_id=team.id, source_id=new_source.pk, schema_id=test_1_schema.id
team_id=team.id, source_id=new_source.pk, schema_id=test_1_schema.id, billable=True
)

run_id, _, __ = activity_environment.run(create_external_data_job_model_activity, inputs)
Expand All @@ -199,7 +199,9 @@ def test_create_external_job_activity_schemas_exist(activity_environment, team,
source_id=new_source.pk,
)

inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id)
inputs = CreateExternalDataJobModelActivityInputs(
team_id=team.id, source_id=new_source.pk, schema_id=schema.id, billable=True
)

run_id, _, __ = activity_environment.run(create_external_data_job_model_activity, inputs)

Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ class ExternalDataWorkflowInputs:
team_id: int
external_data_source_id: uuid.UUID
external_data_schema_id: uuid.UUID | None = None
billable: bool = True

0 comments on commit 9eb35c4

Please sign in to comment.