Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-warehouse): Add a billable field to jobs #27435

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions posthog/migrations/0541_externaldatajob_billable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 4.2.15 on 2025-01-10 12:04

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0540_team_human_friendly_comparison_periods"),
]

operations = [
migrations.AddField(
model_name="externaldatajob",
name="billable",
field=models.BooleanField(null=True, blank=True, default=True),
),
migrations.RunSQL(
"""
UPDATE posthog_externaldatajob
SET billable = CASE
WHEN pipeline_version = 'v2-non-dlt' THEN false
ELSE true
END""",
reverse_sql=migrations.RunSQL.noop,
),
]
2 changes: 1 addition & 1 deletion posthog/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0540_team_human_friendly_comparison_periods
0541_externaldatajob_billable
1 change: 1 addition & 0 deletions posthog/tasks/test/test_usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ def test_external_data_rows_synced_response_with_v2_jobs(
rows_synced=10,
pipeline=source,
pipeline_version=ExternalDataJob.PipelineVersion.V2,
billable=False,
)

period = get_previous_day(at=now() + relativedelta(days=1))
Expand Down
3 changes: 1 addition & 2 deletions posthog/tasks/usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,7 @@ def get_teams_with_survey_responses_count_in_period(
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list:
return list(
ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end)
.exclude(pipeline_version=ExternalDataJob.PipelineVersion.V2)
ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end, billable=True)
.values("team_id")
.annotate(total=Sum("rows_synced"))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def create_external_data_job_model_activity(
delete_external_data_schedule(str(inputs.schema_id))
raise Exception("Source or schema no longer exists - deleted temporal schedule")

pipeline_version = get_pipeline_version()

job = ExternalDataJob.objects.create(
team_id=inputs.team_id,
pipeline_id=inputs.source_id,
Expand All @@ -54,7 +56,8 @@ def create_external_data_job_model_activity(
rows_synced=0,
workflow_id=activity.info().workflow_id,
workflow_run_id=activity.info().workflow_run_id,
pipeline_version=get_pipeline_version(),
pipeline_version=pipeline_version,
billable=pipeline_version != ExternalDataJob.PipelineVersion.V2,
)

schema = ExternalDataSchema.objects.get(team_id=inputs.team_id, id=inputs.schema_id)
Expand Down
6 changes: 1 addition & 5 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1236,11 +1236,7 @@ def jobs(self, request: Request, *arg: Any, **kwargs: Any):
after = request.query_params.get("after", None)
before = request.query_params.get("before", None)

jobs = (
instance.jobs.exclude(pipeline_version=ExternalDataJob.PipelineVersion.V2)
.prefetch_related("schema")
.order_by("-created_at")
)
jobs = instance.jobs.filter(billable=True).prefetch_related("schema").order_by("-created_at")

if after:
after_date = parser.parse(after)
Expand Down
3 changes: 2 additions & 1 deletion posthog/warehouse/api/test/test_external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def test_source_jobs(self):
assert data[0]["schema"]["id"] == str(schema.pk)
assert data[0]["workflow_run_id"] is not None

def test_source_jobs_v2_job(self):
def test_source_jobs_billable_job(self):
source = self._create_external_data_source()
schema = self._create_external_data_schema(source.pk)
ExternalDataJob.objects.create(
Expand All @@ -732,6 +732,7 @@ def test_source_jobs_v2_job(self):
rows_synced=100,
workflow_run_id="test_run_id",
pipeline_version=ExternalDataJob.PipelineVersion.V2,
billable=False,
)

response = self.client.get(
Expand Down
1 change: 1 addition & 0 deletions posthog/warehouse/models/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class PipelineVersion(models.TextChoices):
workflow_run_id = models.CharField(max_length=400, null=True, blank=True)

pipeline_version = models.CharField(max_length=400, choices=PipelineVersion.choices, null=True, blank=True)
billable = models.BooleanField(default=True, null=True, blank=True)

__repr__ = sane_repr("id")

Expand Down
Loading