From f1e760149769b67d55083be77a1db42a27281072 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Wed, 11 Sep 2024 10:58:06 +0200 Subject: [PATCH] feat(cdp): Google PubSub (#24874) --- frontend/src/lib/api.ts | 2 +- .../src/lib/integrations/integrationsLogic.ts | 37 +++- .../integrations/IntegrationChoice.tsx | 58 +++-- frontend/src/types.ts | 2 +- latest_migrations.manifest | 2 +- posthog/api/integration.py | 16 +- posthog/api/plugin.py | 2 + posthog/cdp/templates/__init__.py | 3 + .../google_pubsub/template_google_pubsub.py | 126 +++++++++++ .../test_template_google_pubsub.py | 204 ++++++++++++++++++ .../0468_integration_google_pubsub.py | 25 +++ posthog/models/integration.py | 86 +++++++- posthog/models/test/test_integration_model.py | 87 +++++++- posthog/tasks/integrations.py | 12 ++ 14 files changed, 638 insertions(+), 24 deletions(-) create mode 100644 posthog/cdp/templates/google_pubsub/template_google_pubsub.py create mode 100644 posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py create mode 100644 posthog/migrations/0468_integration_google_pubsub.py diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 54ccdd6a2189b..8d68e16f4bb54 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -2224,7 +2224,7 @@ const api = { async get(id: IntegrationType['id']): Promise { return await new ApiRequest().integration(id).get() }, - async create(data: Partial): Promise { + async create(data: Partial | FormData): Promise { return await new ApiRequest().integrations().create({ data }) }, async delete(integrationId: IntegrationType['id']): Promise { diff --git a/frontend/src/lib/integrations/integrationsLogic.ts b/frontend/src/lib/integrations/integrationsLogic.ts index 63e826bbff90b..78628e3e82716 100644 --- a/frontend/src/lib/integrations/integrationsLogic.ts +++ b/frontend/src/lib/integrations/integrationsLogic.ts @@ -4,6 +4,7 @@ import { loaders } from 'kea-loaders' import { router, urlToAction } from 'kea-router' import api from 'lib/api' import { fromParamsGivenUrl } from 'lib/utils' +import IconGoogleCloud from 'public/services/google-cloud.png' import IconHubspot from 'public/services/hubspot.png' import IconSalesforce from 'public/services/salesforce.png' import IconSlack from 'public/services/slack.png' @@ -18,6 +19,7 @@ const ICONS: Record = { slack: IconSlack, salesforce: IconSalesforce, hubspot: IconHubspot, + 'google-pubsub': IconGoogleCloud, } export const integrationsLogic = kea([ @@ -28,10 +30,15 @@ export const integrationsLogic = kea([ actions({ handleOauthCallback: (kind: IntegrationKind, searchParams: any) => ({ kind, searchParams }), + newGoogleCloudKey: (kind: string, key: File, callback?: (integration: IntegrationType) => void) => ({ + kind, + key, + callback, + }), deleteIntegration: (id: number) => ({ id }), }), - loaders(() => ({ + loaders(({ values }) => ({ integrations: [ null as IntegrationType[] | null, { @@ -48,6 +55,34 @@ export const integrationsLogic = kea([ } }) }, + newGoogleCloudKey: async ({ kind, key, callback }) => { + try { + const formData = new FormData() + formData.append('kind', kind) + formData.append('key', key) + const response = await api.integrations.create(formData) + const responseWithIcon = { ...response, icon_url: ICONS[kind] ?? ICONS['google-pubsub'] } + + // run onChange after updating the integrations loader + window.setTimeout(() => callback?.(responseWithIcon), 0) + + if ( + values.integrations?.find( + (x) => x.kind === kind && x.display_name === response.display_name + ) + ) { + lemonToast.success('Google Cloud key updated.') + return values.integrations.map((x) => + x.kind === kind && x.display_name === response.display_name ? responseWithIcon : x + ) + } + lemonToast.success('Google Cloud key created.') + return [...(values.integrations ?? []), responseWithIcon] + } catch (e) { + lemonToast.error('Failed to upload Google Cloud key.') + throw e + } + }, }, ], })), diff --git a/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx b/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx index af8d9c66bb128..1376f1897ebc9 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/integrations/IntegrationChoice.tsx @@ -1,6 +1,6 @@ import { IconExternal, IconX } from '@posthog/icons' import { LemonButton, LemonMenu, LemonSkeleton } from '@posthog/lemon-ui' -import { useValues } from 'kea' +import { useActions, useValues } from 'kea' import api from 'lib/api' import { integrationsLogic } from 'lib/integrations/integrationsLogic' import { IntegrationView } from 'lib/integrations/IntegrationView' @@ -21,6 +21,7 @@ export function IntegrationChoice({ redirectUrl, }: IntegrationConfigureProps): JSX.Element | null { const { integrationsLoading, integrations } = useValues(integrationsLogic) + const { newGoogleCloudKey } = useActions(integrationsLogic) const kind = integration const integrationsOfKind = integrations?.filter((x) => x.kind === kind) const integrationKind = integrationsOfKind?.find((integration) => integration.id === value) @@ -33,6 +34,22 @@ export function IntegrationChoice({ return } + const kindName = kind == 'google-pubsub' ? 'Google Cloud Pub/Sub' : capitalizeFirstLetter(kind) + + function uploadKey(kind: string): void { + const input = document.createElement('input') + input.type = 'file' + input.accept = '.json' + input.onchange = async (e) => { + const file = (e.target as HTMLInputElement).files?.[0] + if (!file) { + return + } + newGoogleCloudKey(kind, file, (integration) => onChange?.(integration.id)) + } + input.click() + } + const button = ( uploadKey(kind), + label: 'Upload Google Cloud .json key file', + }, + ], + } + : { + items: [ + { + to: api.integrations.authorizeUrl({ + kind, + next: redirectUrl, + }), + disableClientSideRouting: true, + label: integrationsOfKind?.length + ? `Connect to a different ${kind} integration` + : `Connect to ${kind}`, + }, + ], + }, { items: [ { @@ -83,7 +109,7 @@ export function IntegrationChoice({ {integrationKind ? ( Change ) : ( - Choose {capitalizeFirstLetter(kind)} connection + Choose {kindName} connection )} ) diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 94cedbd1c1566..f5c24f30f0508 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3564,7 +3564,7 @@ export enum EventDefinitionType { EventPostHog = 'event_posthog', } -export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot' +export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot' | 'google-pubsub' export interface IntegrationType { id: number diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 1b9ca11ec4e13..1f49620172660 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0467_add_web_vitals_allowed_metrics +posthog: 0468_integration_google_pubsub sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/api/integration.py b/posthog/api/integration.py index 6c343633ff7da..8b269de3b8280 100644 --- a/posthog/api/integration.py +++ b/posthog/api/integration.py @@ -1,3 +1,5 @@ +import json + from typing import Any from django.http import HttpResponse @@ -10,7 +12,7 @@ from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.shared import UserBasicSerializer -from posthog.models.integration import Integration, OauthIntegration, SlackIntegration +from posthog.models.integration import Integration, OauthIntegration, SlackIntegration, GoogleCloudIntegration class IntegrationSerializer(serializers.ModelSerializer): @@ -27,7 +29,17 @@ def create(self, validated_data: Any) -> Any: request = self.context["request"] team_id = self.context["team_id"] - if validated_data["kind"] in OauthIntegration.supported_kinds: + if validated_data["kind"] in GoogleCloudIntegration.supported_kinds: + key_file = request.FILES.get("key") + if not key_file: + raise ValidationError("Key file not provided") + key_info = json.loads(key_file.read().decode("utf-8")) + instance = GoogleCloudIntegration.integration_from_key( + validated_data["kind"], key_info, team_id, request.user + ) + return instance + + elif validated_data["kind"] in OauthIntegration.supported_kinds: try: instance = OauthIntegration.integration_from_oauth_response( validated_data["kind"], team_id, request.user, validated_data["config"] diff --git a/posthog/api/plugin.py b/posthog/api/plugin.py index bb19663d88589..930fc1a6c67fe 100644 --- a/posthog/api/plugin.py +++ b/posthog/api/plugin.py @@ -914,6 +914,8 @@ def migrate(self, request: request.Request, **kwargs): raise ValidationError("No migration available for this plugin") hog_function_data = migrater.migrate(obj) + hog_function_data["template_id"] = hog_function_data["id"] + del hog_function_data["id"] if obj.enabled: hog_function_data["enabled"] = True diff --git a/posthog/cdp/templates/__init__.py b/posthog/cdp/templates/__init__.py index 3df8e99677965..aa26040e42da6 100644 --- a/posthog/cdp/templates/__init__.py +++ b/posthog/cdp/templates/__init__.py @@ -18,6 +18,7 @@ from .loops.template_loops import template as loops from .rudderstack.template_rudderstack import template as rudderstack from .gleap.template_gleap import template as gleap +from .google_pubsub.template_google_pubsub import template as google_pubsub, TemplateGooglePubSubMigrator HOG_FUNCTION_TEMPLATES = [ @@ -40,6 +41,7 @@ rudderstack, avo, gleap, + google_pubsub, ] @@ -49,6 +51,7 @@ TemplateCustomerioMigrator.plugin_url: TemplateCustomerioMigrator, TemplateIntercomMigrator.plugin_url: TemplateIntercomMigrator, TemplateSendGridMigrator.plugin_url: TemplateSendGridMigrator, + TemplateGooglePubSubMigrator.plugin_url: TemplateGooglePubSubMigrator, } __all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID"] diff --git a/posthog/cdp/templates/google_pubsub/template_google_pubsub.py b/posthog/cdp/templates/google_pubsub/template_google_pubsub.py new file mode 100644 index 0000000000000..ceb853c752c5a --- /dev/null +++ b/posthog/cdp/templates/google_pubsub/template_google_pubsub.py @@ -0,0 +1,126 @@ +import dataclasses +import json +from copy import deepcopy + +from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionTemplateMigrator +from posthog.models.integration import GoogleCloudIntegration + +template: HogFunctionTemplate = HogFunctionTemplate( + status="beta", + id="template-google-pubsub", + name="Google Pub/Sub", + description="Send data to a Google Pub/Sub topic", + icon_url="/static/services/google-cloud.png", + hog=""" +let headers := () -> { + 'Authorization': f'Bearer {inputs.auth.access_token}', + 'Content-Type': 'application/json' +} +let message := () -> { + 'messageId': event.uuid, + 'data': base64Encode(jsonStringify(inputs.payload)), + 'attributes': inputs.attributes +} +let res := fetch(f'https://pubsub.googleapis.com/v1/{inputs.topicId}:publish', { + 'method': 'POST', + 'headers': headers(), + 'body': jsonStringify({ 'messages': [message()] }) +}) + +if (res.status >= 200 and res.status < 300) { + print('Event sent successfully!') +} else { + throw Error('Error sending event', res) +} +""".strip(), + inputs_schema=[ + { + "key": "auth", + "type": "integration", + "integration": "google-pubsub", + "label": "Google Cloud service account", + "secret": False, + "required": True, + }, + { + "key": "topicId", + "type": "string", + "label": "Topic name", + "secret": False, + "required": True, + }, + { + "key": "payload", + "type": "json", + "label": "Message Payload", + "default": {"event": "{event}", "person": "{person}"}, + "secret": False, + "required": False, + }, + { + "key": "attributes", + "type": "json", + "label": "Attributes", + "default": {}, + "secret": False, + "required": False, + }, + ], +) + + +class TemplateGooglePubSubMigrator(HogFunctionTemplateMigrator): + plugin_url = "https://github.com/PostHog/pubsub-plugin" + + @classmethod + def migrate(cls, obj): + hf = deepcopy(dataclasses.asdict(template)) + + exportEventsToIgnore = obj.config.get("exportEventsToIgnore", "") + topicId = obj.config.get("topicId", "") + + from posthog.models.plugin import PluginAttachment + + attachment: PluginAttachment | None = PluginAttachment.objects.filter( + plugin_config=obj, key="googleCloudKeyJson" + ).first() + if not attachment: + raise Exception("Google Cloud Key JSON not found") + + keyFile = json.loads(attachment.contents.decode("UTF-8")) # type: ignore + integration = GoogleCloudIntegration.integration_from_key("google-pubsub", keyFile, obj.team.pk) + + hf["filters"] = {} + if exportEventsToIgnore: + events = exportEventsToIgnore.split(",") + if len(events) > 0: + event_names = ", ".join(["'{}'".format(event.strip()) for event in events]) + query = f"event not in ({event_names})" + hf["filters"]["events"] = [ + { + "id": None, + "name": "All events", + "type": "events", + "order": 0, + "properties": [{"key": query, "type": "hogql"}], + } + ] + + hf["inputs"] = { + "topicId": {"value": topicId}, + "payload": { + "value": { + "event": "{event.name}", + "distinct_id": "{event.distinct_id}", + "timestamp": "{event.timestamp}", + "uuid": "{event.uuid}", + "properties": "{event.properties}", + "person_id": "{person.id}", + "person_properties": "{person.properties}", + } + }, + "auth": {"value": integration.id}, + "attributes": {"value": {}}, + } + + return hf diff --git a/posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py b/posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py new file mode 100644 index 0000000000000..077cdeae5d348 --- /dev/null +++ b/posthog/cdp/templates/google_pubsub/test_template_google_pubsub.py @@ -0,0 +1,204 @@ +from datetime import datetime +from unittest.mock import patch + +from inline_snapshot import snapshot + +from posthog.cdp.templates.google_pubsub.template_google_pubsub import TemplateGooglePubSubMigrator +from posthog.cdp.templates.helpers import BaseHogFunctionTemplateTest +from posthog.cdp.templates.hubspot.template_hubspot import template as template_hubspot +from posthog.models import PluginConfig, PluginAttachment, Plugin, Integration +from posthog.test.base import BaseTest + + +class TestTemplateGooglePubSub(BaseHogFunctionTemplateTest): + template = template_hubspot + + def _inputs(self, **kwargs): + inputs = { + "oauth": {"access_token": "TOKEN"}, + "email": "example@posthog.com", + "properties": { + "company": "PostHog", + }, + } + inputs.update(kwargs) + return inputs + + def test_function_works(self): + self.mock_fetch_response = lambda *args: {"status": 200, "body": {"status": "success"}} # type: ignore + + res = self.run_function(inputs=self._inputs()) + + assert res.result is None + + assert self.get_mock_fetch_calls() == [ + ( + "https://api.hubapi.com/crm/v3/objects/contacts", + { + "method": "POST", + "headers": {"Authorization": "Bearer TOKEN", "Content-Type": "application/json"}, + "body": {"properties": {"company": "PostHog", "email": "example@posthog.com"}}, + }, + ) + ] + assert self.get_mock_print_calls() == [("Contact created successfully!",)] + + def test_exits_if_no_email(self): + for email in [None, ""]: + self.mock_print.reset_mock() + res = self.run_function(inputs=self._inputs(email=email)) + + assert res.result is None + assert self.get_mock_fetch_calls() == [] + assert self.get_mock_print_calls() == [("`email` input is empty. Not creating a contact.",)] + + def test_handles_updates(self): + call_count = 0 + + # First call respond with 409, second one 200 and increment call_count + def mock_fetch(*args): + nonlocal call_count + call_count += 1 + return ( + {"status": 409, "body": {"message": "Contact already exists. Existing ID: 12345"}} + if call_count == 1 + else {"status": 200, "body": {"status": "success"}} + ) + + self.mock_fetch_response = mock_fetch # type: ignore + + res = self.run_function(inputs=self._inputs()) + + assert res.result is None + + assert len(self.get_mock_fetch_calls()) == 2 + + assert self.get_mock_fetch_calls()[0] == ( + "https://api.hubapi.com/crm/v3/objects/contacts", + { + "method": "POST", + "headers": {"Authorization": "Bearer TOKEN", "Content-Type": "application/json"}, + "body": {"properties": {"company": "PostHog", "email": "example@posthog.com"}}, + }, + ) + + assert self.get_mock_fetch_calls()[1] == ( + "https://api.hubapi.com/crm/v3/objects/contacts/12345", + { + "method": "PATCH", + "headers": {"Authorization": "Bearer TOKEN", "Content-Type": "application/json"}, + "body": {"properties": {"company": "PostHog", "email": "example@posthog.com"}}, + }, + ) + + +class TestTemplateMigration(BaseTest): + def get_plugin_config(self, config: dict): + _config = { + "topicId": "TOPIC_ID", + "exportEventsToIgnore": "", + } + _config.update(config) + return PluginConfig(enabled=True, order=0, config=_config) + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_integration(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + plugin = Plugin() + plugin.save() + obj = self.get_plugin_config({}) + obj.plugin = plugin + obj.team = self.team + obj.save() + PluginAttachment.objects.create( + plugin_config=obj, contents=b'{"cloud": "key"}', key="googleCloudKeyJson", file_size=10 + ) + + template = TemplateGooglePubSubMigrator.migrate(obj) + template["inputs"]["auth"]["value"] = 1 # mock the ID + assert template["inputs"] == snapshot( + { + "auth": {"value": 1}, + "topicId": {"value": "TOPIC_ID"}, + "payload": { + "value": { + "event": "{event.name}", + "distinct_id": "{event.distinct_id}", + "timestamp": "{event.timestamp}", + "uuid": "{event.uuid}", + "properties": "{event.properties}", + "person_properties": "{person.properties}", + "person_id": "{person.id}", + } + }, + "attributes": {"value": {}}, + } + ) + assert template["filters"] == {} + + integration = Integration.objects.last() + assert integration is not None + assert integration.kind == "google-pubsub" + assert integration.sensitive_config == {"cloud": "key"} + assert integration.config.get("access_token") == "ACCESS_TOKEN" + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_ignore_events(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + plugin = Plugin() + plugin.save() + obj = self.get_plugin_config( + { + "exportEventsToIgnore": "event1, event2", + } + ) + obj.plugin = plugin + obj.team = self.team + obj.save() + PluginAttachment.objects.create( + plugin_config=obj, contents=b'{"cloud": "key"}', key="googleCloudKeyJson", file_size=10 + ) + + template = TemplateGooglePubSubMigrator.migrate(obj) + template["inputs"]["auth"]["value"] = 1 # mock the ID + assert template["inputs"] == snapshot( + { + "auth": {"value": 1}, + "topicId": {"value": "TOPIC_ID"}, + "payload": { + "value": { + "event": "{event.name}", + "distinct_id": "{event.distinct_id}", + "timestamp": "{event.timestamp}", + "uuid": "{event.uuid}", + "properties": "{event.properties}", + "person_id": "{person.id}", + "person_properties": "{person.properties}", + } + }, + "attributes": {"value": {}}, + } + ) + assert template["filters"] == snapshot( + { + "events": [ + { + "id": None, + "name": "All events", + "type": "events", + "order": 0, + "properties": [{"key": "event not in ('event1', 'event2')", "type": "hogql"}], + } + ] + } + ) diff --git a/posthog/migrations/0468_integration_google_pubsub.py b/posthog/migrations/0468_integration_google_pubsub.py new file mode 100644 index 0000000000000..1ace80ef4ab2d --- /dev/null +++ b/posthog/migrations/0468_integration_google_pubsub.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.15 on 2024-09-10 10:43 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0467_add_web_vitals_allowed_metrics"), + ] + + operations = [ + migrations.AlterField( + model_name="integration", + name="kind", + field=models.CharField( + choices=[ + ("slack", "Slack"), + ("salesforce", "Salesforce"), + ("hubspot", "Hubspot"), + ("google-pubsub", "Google Pubsub"), + ], + max_length=20, + ), + ), + ] diff --git a/posthog/models/integration.py b/posthog/models/integration.py index c613bd5aa70c8..2cbc24f30a430 100644 --- a/posthog/models/integration.py +++ b/posthog/models/integration.py @@ -8,8 +8,11 @@ from django.db import models import requests +from rest_framework.exceptions import ValidationError from rest_framework.request import Request from slack_sdk import WebClient +from google.oauth2 import service_account +from google.auth.transport.requests import Request as GoogleRequest from django.conf import settings from posthog.cache_utils import cache_for @@ -39,11 +42,12 @@ class IntegrationKind(models.TextChoices): SLACK = "slack" SALESFORCE = "salesforce" HUBSPOT = "hubspot" + GOOGLE_PUBSUB = "google-pubsub" team = models.ForeignKey("Team", on_delete=models.CASCADE) # The integration type identifier - kind = models.CharField(max_length=10, choices=IntegrationKind.choices) + kind = models.CharField(max_length=20, choices=IntegrationKind.choices) # The ID of the integration in the external system integration_id = models.TextField(null=True, blank=True) # Any config that COULD be passed to the frontend @@ -69,6 +73,8 @@ def display_name(self) -> str: if self.kind in OauthIntegration.supported_kinds: oauth_config = OauthIntegration.oauth_config_for_kind(self.kind) return dot_get(self.config, oauth_config.name_path, self.integration_id) + if self.kind in GoogleCloudIntegration.supported_kinds: + return self.integration_id or "unknown ID" return f"ID: {self.integration_id}" @@ -382,3 +388,81 @@ def slack_config(cls): ) return config + + +class GoogleCloudIntegration: + supported_kinds = ["google-pubsub"] + integration: Integration + + def __init__(self, integration: Integration) -> None: + self.integration = integration + + @classmethod + def integration_from_key( + cls, kind: str, key_info: dict, team_id: int, created_by: Optional[User] = None + ) -> Integration: + if kind == "google-pubsub": + scope = "https://www.googleapis.com/auth/pubsub" + else: + raise NotImplementedError(f"Google Cloud integration kind {kind} not implemented") + + try: + credentials = service_account.Credentials.from_service_account_info(key_info, scopes=[scope]) + credentials.refresh(GoogleRequest()) + except Exception: + raise ValidationError(f"Failed to authenticate with provided service account key") + + integration, created = Integration.objects.update_or_create( + team_id=team_id, + kind=kind, + integration_id=credentials.service_account_email, + defaults={ + "config": { + "expires_in": credentials.expiry.timestamp() - int(time.time()), + "refreshed_at": int(time.time()), + "access_token": credentials.token, + }, + "sensitive_config": key_info, + "created_by": created_by, + }, + ) + + if integration.errors: + integration.errors = "" + integration.save() + + return integration + + def access_token_expired(self, time_threshold: Optional[timedelta] = None) -> bool: + expires_in = self.integration.config.get("expires_in") + refreshed_at = self.integration.config.get("refreshed_at") + if not expires_in or not refreshed_at: + return False + + # To be really safe we refresh if its half way through the expiry + time_threshold = time_threshold or timedelta(seconds=expires_in / 2) + + return time.time() > refreshed_at + expires_in - time_threshold.total_seconds() + + def refresh_access_token(self): + """ + Refresh the access token for the integration if necessary + """ + credentials = service_account.Credentials.from_service_account_info( + self.integration.sensitive_config, scopes=["https://www.googleapis.com/auth/pubsub"] + ) + + try: + credentials.refresh(GoogleRequest()) + except Exception: + raise ValidationError(f"Failed to authenticate with provided service account key") + + self.integration.config = { + "expires_in": credentials.expiry.timestamp() - int(time.time()), + "refreshed_at": int(time.time()), + "access_token": credentials.token, + } + self.integration.save() + reload_integrations_on_workers(self.integration.team_id, [self.integration.id]) + + logger.info(f"Refreshed access token for {self}") diff --git a/posthog/models/test/test_integration_model.py b/posthog/models/test/test_integration_model.py index cad8b798df03e..d4af7badfea46 100644 --- a/posthog/models/test/test_integration_model.py +++ b/posthog/models/test/test_integration_model.py @@ -6,7 +6,7 @@ from freezegun import freeze_time import pytest from posthog.models.instance_setting import set_instance_setting -from posthog.models.integration import Integration, OauthIntegration, SlackIntegration +from posthog.models.integration import Integration, OauthIntegration, SlackIntegration, GoogleCloudIntegration from posthog.test.base import BaseTest @@ -231,3 +231,88 @@ def test_refresh_access_token_handles_errors(self, mock_post, mock_reload): assert integration.errors == "TOKEN_REFRESH_FAILED" mock_reload.assert_not_called() + + +class TestGoogleCloudIntegrationModel(BaseTest): + mock_keyfile = { + "type": "service_account", + "project_id": "posthog-616", + "private_key_id": "df3e129a722a865cc3539b4e69507bad", + "private_key": "-----BEGIN PRIVATE KEY-----\nTHISISTHEKEY==\n-----END PRIVATE KEY-----\n", + "client_email": "hog-pubsub-test@posthog-301601.iam.gserviceaccount.com", + "client_id": "11223344556677889900", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/not-a-topic%40posthog-616.iam.gserviceaccount.com", + "universe_domain": "googleapis.com", + } + + def create_integration( + self, kind: str, config: Optional[dict] = None, sensitive_config: Optional[dict] = None + ) -> Integration: + _config = {"refreshed_at": int(time.time()), "expires_in": 3600} + _sensitive_config = self.mock_keyfile + _config.update(config or {}) + _sensitive_config.update(sensitive_config or {}) + + return Integration.objects.create(team=self.team, kind=kind, config=_config, sensitive_config=_sensitive_config) + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_integration_from_key(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + with freeze_time("2024-01-01T12:00:00Z"): + integration = GoogleCloudIntegration.integration_from_key( + "google-pubsub", + self.mock_keyfile, + self.team.id, + self.user, + ) + + assert integration.team == self.team + assert integration.created_by == self.user + + assert integration.config == { + "access_token": "ACCESS_TOKEN", + "refreshed_at": 1704110400, + "expires_in": 3600, + } + assert integration.sensitive_config == self.mock_keyfile + + @patch("google.oauth2.service_account.Credentials.from_service_account_info") + def test_integration_refresh_token(self, mock_credentials): + mock_credentials.return_value.project_id = "posthog-616" + mock_credentials.return_value.service_account_email = "posthog@" + mock_credentials.return_value.token = "ACCESS_TOKEN" + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600) + mock_credentials.return_value.refresh = lambda _: None + + with freeze_time("2024-01-01T12:00:00Z"): + integration = GoogleCloudIntegration.integration_from_key( + "google-pubsub", + self.mock_keyfile, + self.team.id, + self.user, + ) + + with freeze_time("2024-01-01T12:00:00Z"): + assert GoogleCloudIntegration(integration).access_token_expired() is False + + with freeze_time("2024-01-01T14:00:00Z"): + assert GoogleCloudIntegration(integration).access_token_expired() is True + + mock_credentials.return_value.expiry = datetime.fromtimestamp(1704110400 + 3600 * 3) + + GoogleCloudIntegration(integration).refresh_access_token() + assert GoogleCloudIntegration(integration).access_token_expired() is False + + assert integration.config == { + "access_token": "ACCESS_TOKEN", + "refreshed_at": 1704110400 + 3600 * 2, + "expires_in": 3600, + } diff --git a/posthog/tasks/integrations.py b/posthog/tasks/integrations.py index e0f543874bfe8..1ebef062cf0cd 100644 --- a/posthog/tasks/integrations.py +++ b/posthog/tasks/integrations.py @@ -1,5 +1,6 @@ from celery import shared_task +from posthog.models.integration import GoogleCloudIntegration from posthog.tasks.utils import CeleryQueue @@ -15,6 +16,14 @@ def refresh_integrations() -> int: if oauth_integration.access_token_expired(): refresh_integration.delay(integration.id) + gcloud_integrations = Integration.objects.filter(kind__in=GoogleCloudIntegration.supported_kinds).all() + + for integration in gcloud_integrations: + gcloud_integration = GoogleCloudIntegration(integration) + + if gcloud_integration.access_token_expired(): + refresh_integration.delay(integration.id) + return 0 @@ -27,5 +36,8 @@ def refresh_integration(id: int) -> int: if integration.kind in OauthIntegration.supported_kinds: oauth_integration = OauthIntegration(integration) oauth_integration.refresh_access_token() + elif integration.kind in GoogleCloudIntegration.supported_kinds: + gcloud_integration = GoogleCloudIntegration(integration) + gcloud_integration.refresh_access_token() return 0