From f85dcbca2f7a14fedb36b507ae5c4f2f2cb51be4 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 10 Jan 2025 13:42:38 +0100 Subject: [PATCH 1/2] Add a billable field to jobs --- .../0541_externaldatajob_billable.py | 26 +++++++++++++++++++ posthog/migrations/max_migration.txt | 2 +- posthog/tasks/test/test_usage_report.py | 1 + posthog/tasks/usage_report.py | 3 +-- .../workflow_activities/create_job_model.py | 5 +++- posthog/warehouse/api/external_data_source.py | 6 +---- .../api/test/test_external_data_source.py | 3 ++- posthog/warehouse/models/external_data_job.py | 1 + 8 files changed, 37 insertions(+), 10 deletions(-) create mode 100644 posthog/migrations/0541_externaldatajob_billable.py diff --git a/posthog/migrations/0541_externaldatajob_billable.py b/posthog/migrations/0541_externaldatajob_billable.py new file mode 100644 index 0000000000000..cbc0a93390572 --- /dev/null +++ b/posthog/migrations/0541_externaldatajob_billable.py @@ -0,0 +1,26 @@ +# Generated by Django 4.2.15 on 2025-01-10 12:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0540_team_human_friendly_comparison_periods"), + ] + + operations = [ + migrations.AddField( + model_name="externaldatajob", + name="billable", + field=models.BooleanField(blank=True, default=True), + ), + migrations.RunSQL( + """ + UPDATE posthog_externaldatajob + SET billable = CASE + WHEN pipeline_version = 'v2-non-dlt' THEN false + ELSE true + END""", + reverse_sql=migrations.RunSQL.noop, + ), + ] diff --git a/posthog/migrations/max_migration.txt b/posthog/migrations/max_migration.txt index 602ce56966064..f5abcb8ffb04b 100644 --- a/posthog/migrations/max_migration.txt +++ b/posthog/migrations/max_migration.txt @@ -1 +1 @@ -0540_team_human_friendly_comparison_periods +0541_externaldatajob_billable diff --git a/posthog/tasks/test/test_usage_report.py b/posthog/tasks/test/test_usage_report.py index d5b2a8b1777a5..dd4c9c0c5b38e 100644 --- a/posthog/tasks/test/test_usage_report.py +++ b/posthog/tasks/test/test_usage_report.py @@ -1388,6 +1388,7 @@ def test_external_data_rows_synced_response_with_v2_jobs( rows_synced=10, pipeline=source, pipeline_version=ExternalDataJob.PipelineVersion.V2, + billable=False, ) period = get_previous_day(at=now() + relativedelta(days=1)) diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index 3ec1917f17826..a844bdf969764 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -605,8 +605,7 @@ def get_teams_with_survey_responses_count_in_period( @retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list: return list( - ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end) - .exclude(pipeline_version=ExternalDataJob.PipelineVersion.V2) + ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end, billable=True) .values("team_id") .annotate(total=Sum("rows_synced")) ) diff --git a/posthog/temporal/data_imports/workflow_activities/create_job_model.py b/posthog/temporal/data_imports/workflow_activities/create_job_model.py index 78e85dfdd182f..729ff9960ef44 100644 --- a/posthog/temporal/data_imports/workflow_activities/create_job_model.py +++ b/posthog/temporal/data_imports/workflow_activities/create_job_model.py @@ -46,6 +46,8 @@ def create_external_data_job_model_activity( delete_external_data_schedule(str(inputs.schema_id)) raise Exception("Source or schema no longer exists - deleted temporal schedule") + pipeline_version = get_pipeline_version() + job = ExternalDataJob.objects.create( team_id=inputs.team_id, pipeline_id=inputs.source_id, @@ -54,7 +56,8 @@ def create_external_data_job_model_activity( rows_synced=0, workflow_id=activity.info().workflow_id, workflow_run_id=activity.info().workflow_run_id, - pipeline_version=get_pipeline_version(), + pipeline_version=pipeline_version, + billable=pipeline_version != ExternalDataJob.PipelineVersion.V2, ) schema = ExternalDataSchema.objects.get(team_id=inputs.team_id, id=inputs.schema_id) diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 47cf23d80b38c..b04a4571cfb9e 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -1236,11 +1236,7 @@ def jobs(self, request: Request, *arg: Any, **kwargs: Any): after = request.query_params.get("after", None) before = request.query_params.get("before", None) - jobs = ( - instance.jobs.exclude(pipeline_version=ExternalDataJob.PipelineVersion.V2) - .prefetch_related("schema") - .order_by("-created_at") - ) + jobs = instance.jobs.filter(billable=True).prefetch_related("schema").order_by("-created_at") if after: after_date = parser.parse(after) diff --git a/posthog/warehouse/api/test/test_external_data_source.py b/posthog/warehouse/api/test/test_external_data_source.py index bc60e36459f7a..12dc1263633ed 100644 --- a/posthog/warehouse/api/test/test_external_data_source.py +++ b/posthog/warehouse/api/test/test_external_data_source.py @@ -721,7 +721,7 @@ def test_source_jobs(self): assert data[0]["schema"]["id"] == str(schema.pk) assert data[0]["workflow_run_id"] is not None - def test_source_jobs_v2_job(self): + def test_source_jobs_billable_job(self): source = self._create_external_data_source() schema = self._create_external_data_schema(source.pk) ExternalDataJob.objects.create( @@ -732,6 +732,7 @@ def test_source_jobs_v2_job(self): rows_synced=100, workflow_run_id="test_run_id", pipeline_version=ExternalDataJob.PipelineVersion.V2, + billable=False, ) response = self.client.get( diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index d9949e00d4423..3d9a5ea3e607a 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -30,6 +30,7 @@ class PipelineVersion(models.TextChoices): workflow_run_id = models.CharField(max_length=400, null=True, blank=True) pipeline_version = models.CharField(max_length=400, choices=PipelineVersion.choices, null=True, blank=True) + billable = models.BooleanField(default=True, blank=True) __repr__ = sane_repr("id") From 0d315ae49d22a28b3abd477dfd2d5e11700b04e2 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 10 Jan 2025 14:02:27 +0100 Subject: [PATCH 2/2] Added null=true to the new field --- posthog/migrations/0541_externaldatajob_billable.py | 2 +- posthog/warehouse/models/external_data_job.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/migrations/0541_externaldatajob_billable.py b/posthog/migrations/0541_externaldatajob_billable.py index cbc0a93390572..895c37ce0fd9e 100644 --- a/posthog/migrations/0541_externaldatajob_billable.py +++ b/posthog/migrations/0541_externaldatajob_billable.py @@ -12,7 +12,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name="externaldatajob", name="billable", - field=models.BooleanField(blank=True, default=True), + field=models.BooleanField(null=True, blank=True, default=True), ), migrations.RunSQL( """ diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index 3d9a5ea3e607a..b7bd910b54097 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -30,7 +30,7 @@ class PipelineVersion(models.TextChoices): workflow_run_id = models.CharField(max_length=400, null=True, blank=True) pipeline_version = models.CharField(max_length=400, choices=PipelineVersion.choices, null=True, blank=True) - billable = models.BooleanField(default=True, blank=True) + billable = models.BooleanField(default=True, null=True, blank=True) __repr__ = sane_repr("id")