Skip to content

Commit

Permalink
feat(cdp): Google PubSub (#24874)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra authored Sep 11, 2024
1 parent d26be62 commit f1e7601
Show file tree
Hide file tree
Showing 14 changed files with 638 additions and 24 deletions.
2 changes: 1 addition & 1 deletion frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2224,7 +2224,7 @@ const api = {
async get(id: IntegrationType['id']): Promise<IntegrationType> {
return await new ApiRequest().integration(id).get()
},
async create(data: Partial<IntegrationType>): Promise<IntegrationType> {
async create(data: Partial<IntegrationType> | FormData): Promise<IntegrationType> {
return await new ApiRequest().integrations().create({ data })
},
async delete(integrationId: IntegrationType['id']): Promise<IntegrationType> {
Expand Down
37 changes: 36 additions & 1 deletion frontend/src/lib/integrations/integrationsLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { loaders } from 'kea-loaders'
import { router, urlToAction } from 'kea-router'
import api from 'lib/api'
import { fromParamsGivenUrl } from 'lib/utils'
import IconGoogleCloud from 'public/services/google-cloud.png'
import IconHubspot from 'public/services/hubspot.png'
import IconSalesforce from 'public/services/salesforce.png'
import IconSlack from 'public/services/slack.png'
Expand All @@ -18,6 +19,7 @@ const ICONS: Record<IntegrationKind, any> = {
slack: IconSlack,
salesforce: IconSalesforce,
hubspot: IconHubspot,
'google-pubsub': IconGoogleCloud,
}

export const integrationsLogic = kea<integrationsLogicType>([
Expand All @@ -28,10 +30,15 @@ export const integrationsLogic = kea<integrationsLogicType>([

actions({
handleOauthCallback: (kind: IntegrationKind, searchParams: any) => ({ kind, searchParams }),
newGoogleCloudKey: (kind: string, key: File, callback?: (integration: IntegrationType) => void) => ({
kind,
key,
callback,
}),
deleteIntegration: (id: number) => ({ id }),
}),

loaders(() => ({
loaders(({ values }) => ({
integrations: [
null as IntegrationType[] | null,
{
Expand All @@ -48,6 +55,34 @@ export const integrationsLogic = kea<integrationsLogicType>([
}
})
},
newGoogleCloudKey: async ({ kind, key, callback }) => {
try {
const formData = new FormData()
formData.append('kind', kind)
formData.append('key', key)
const response = await api.integrations.create(formData)
const responseWithIcon = { ...response, icon_url: ICONS[kind] ?? ICONS['google-pubsub'] }

// run onChange after updating the integrations loader
window.setTimeout(() => callback?.(responseWithIcon), 0)

if (
values.integrations?.find(
(x) => x.kind === kind && x.display_name === response.display_name
)
) {
lemonToast.success('Google Cloud key updated.')
return values.integrations.map((x) =>
x.kind === kind && x.display_name === response.display_name ? responseWithIcon : x
)
}
lemonToast.success('Google Cloud key created.')
return [...(values.integrations ?? []), responseWithIcon]
} catch (e) {
lemonToast.error('Failed to upload Google Cloud key.')
throw e
}
},
},
],
})),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IconExternal, IconX } from '@posthog/icons'
import { LemonButton, LemonMenu, LemonSkeleton } from '@posthog/lemon-ui'
import { useValues } from 'kea'
import { useActions, useValues } from 'kea'
import api from 'lib/api'
import { integrationsLogic } from 'lib/integrations/integrationsLogic'
import { IntegrationView } from 'lib/integrations/IntegrationView'
Expand All @@ -21,6 +21,7 @@ export function IntegrationChoice({
redirectUrl,
}: IntegrationConfigureProps): JSX.Element | null {
const { integrationsLoading, integrations } = useValues(integrationsLogic)
const { newGoogleCloudKey } = useActions(integrationsLogic)
const kind = integration
const integrationsOfKind = integrations?.filter((x) => x.kind === kind)
const integrationKind = integrationsOfKind?.find((integration) => integration.id === value)
Expand All @@ -33,6 +34,22 @@ export function IntegrationChoice({
return <LemonSkeleton className="h-10" />
}

const kindName = kind == 'google-pubsub' ? 'Google Cloud Pub/Sub' : capitalizeFirstLetter(kind)

function uploadKey(kind: string): void {
const input = document.createElement('input')
input.type = 'file'
input.accept = '.json'
input.onchange = async (e) => {
const file = (e.target as HTMLInputElement).files?.[0]
if (!file) {
return
}
newGoogleCloudKey(kind, file, (integration) => onChange?.(integration.id))
}
input.click()
}

const button = (
<LemonMenu
items={[
Expand All @@ -48,20 +65,29 @@ export function IntegrationChoice({
],
}
: null,
{
items: [
{
to: api.integrations.authorizeUrl({
kind,
next: redirectUrl,
}),
disableClientSideRouting: true,
label: integrationsOfKind?.length
? `Connect to a different ${kind} integration`
: `Connect to ${kind}`,
},
],
},
kind.startsWith('google-')
? {
items: [
{
onClick: () => uploadKey(kind),
label: 'Upload Google Cloud .json key file',
},
],
}
: {
items: [
{
to: api.integrations.authorizeUrl({
kind,
next: redirectUrl,
}),
disableClientSideRouting: true,
label: integrationsOfKind?.length
? `Connect to a different ${kind} integration`
: `Connect to ${kind}`,
},
],
},
{
items: [
{
Expand All @@ -83,7 +109,7 @@ export function IntegrationChoice({
{integrationKind ? (
<LemonButton type="secondary">Change</LemonButton>
) : (
<LemonButton type="secondary">Choose {capitalizeFirstLetter(kind)} connection</LemonButton>
<LemonButton type="secondary">Choose {kindName} connection</LemonButton>
)}
</LemonMenu>
)
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3564,7 +3564,7 @@ export enum EventDefinitionType {
EventPostHog = 'event_posthog',
}

export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot'
export type IntegrationKind = 'slack' | 'salesforce' | 'hubspot' | 'google-pubsub'

export interface IntegrationType {
id: number
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0467_add_web_vitals_allowed_metrics
posthog: 0468_integration_google_pubsub
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
16 changes: 14 additions & 2 deletions posthog/api/integration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

from typing import Any

from django.http import HttpResponse
Expand All @@ -10,7 +12,7 @@

from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.api.shared import UserBasicSerializer
from posthog.models.integration import Integration, OauthIntegration, SlackIntegration
from posthog.models.integration import Integration, OauthIntegration, SlackIntegration, GoogleCloudIntegration


class IntegrationSerializer(serializers.ModelSerializer):
Expand All @@ -27,7 +29,17 @@ def create(self, validated_data: Any) -> Any:
request = self.context["request"]
team_id = self.context["team_id"]

if validated_data["kind"] in OauthIntegration.supported_kinds:
if validated_data["kind"] in GoogleCloudIntegration.supported_kinds:
key_file = request.FILES.get("key")
if not key_file:
raise ValidationError("Key file not provided")
key_info = json.loads(key_file.read().decode("utf-8"))
instance = GoogleCloudIntegration.integration_from_key(
validated_data["kind"], key_info, team_id, request.user
)
return instance

elif validated_data["kind"] in OauthIntegration.supported_kinds:
try:
instance = OauthIntegration.integration_from_oauth_response(
validated_data["kind"], team_id, request.user, validated_data["config"]
Expand Down
2 changes: 2 additions & 0 deletions posthog/api/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ def migrate(self, request: request.Request, **kwargs):
raise ValidationError("No migration available for this plugin")

hog_function_data = migrater.migrate(obj)
hog_function_data["template_id"] = hog_function_data["id"]
del hog_function_data["id"]

if obj.enabled:
hog_function_data["enabled"] = True
Expand Down
3 changes: 3 additions & 0 deletions posthog/cdp/templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .loops.template_loops import template as loops
from .rudderstack.template_rudderstack import template as rudderstack
from .gleap.template_gleap import template as gleap
from .google_pubsub.template_google_pubsub import template as google_pubsub, TemplateGooglePubSubMigrator


HOG_FUNCTION_TEMPLATES = [
Expand All @@ -40,6 +41,7 @@
rudderstack,
avo,
gleap,
google_pubsub,
]


Expand All @@ -49,6 +51,7 @@
TemplateCustomerioMigrator.plugin_url: TemplateCustomerioMigrator,
TemplateIntercomMigrator.plugin_url: TemplateIntercomMigrator,
TemplateSendGridMigrator.plugin_url: TemplateSendGridMigrator,
TemplateGooglePubSubMigrator.plugin_url: TemplateGooglePubSubMigrator,
}

__all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID"]
126 changes: 126 additions & 0 deletions posthog/cdp/templates/google_pubsub/template_google_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import dataclasses
import json
from copy import deepcopy

from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionTemplateMigrator
from posthog.models.integration import GoogleCloudIntegration

template: HogFunctionTemplate = HogFunctionTemplate(
status="beta",
id="template-google-pubsub",
name="Google Pub/Sub",
description="Send data to a Google Pub/Sub topic",
icon_url="/static/services/google-cloud.png",
hog="""
let headers := () -> {
'Authorization': f'Bearer {inputs.auth.access_token}',
'Content-Type': 'application/json'
}
let message := () -> {
'messageId': event.uuid,
'data': base64Encode(jsonStringify(inputs.payload)),
'attributes': inputs.attributes
}
let res := fetch(f'https://pubsub.googleapis.com/v1/{inputs.topicId}:publish', {
'method': 'POST',
'headers': headers(),
'body': jsonStringify({ 'messages': [message()] })
})
if (res.status >= 200 and res.status < 300) {
print('Event sent successfully!')
} else {
throw Error('Error sending event', res)
}
""".strip(),
inputs_schema=[
{
"key": "auth",
"type": "integration",
"integration": "google-pubsub",
"label": "Google Cloud service account",
"secret": False,
"required": True,
},
{
"key": "topicId",
"type": "string",
"label": "Topic name",
"secret": False,
"required": True,
},
{
"key": "payload",
"type": "json",
"label": "Message Payload",
"default": {"event": "{event}", "person": "{person}"},
"secret": False,
"required": False,
},
{
"key": "attributes",
"type": "json",
"label": "Attributes",
"default": {},
"secret": False,
"required": False,
},
],
)


class TemplateGooglePubSubMigrator(HogFunctionTemplateMigrator):
plugin_url = "https://github.com/PostHog/pubsub-plugin"

@classmethod
def migrate(cls, obj):
hf = deepcopy(dataclasses.asdict(template))

exportEventsToIgnore = obj.config.get("exportEventsToIgnore", "")
topicId = obj.config.get("topicId", "")

from posthog.models.plugin import PluginAttachment

attachment: PluginAttachment | None = PluginAttachment.objects.filter(
plugin_config=obj, key="googleCloudKeyJson"
).first()
if not attachment:
raise Exception("Google Cloud Key JSON not found")

keyFile = json.loads(attachment.contents.decode("UTF-8")) # type: ignore
integration = GoogleCloudIntegration.integration_from_key("google-pubsub", keyFile, obj.team.pk)

hf["filters"] = {}
if exportEventsToIgnore:
events = exportEventsToIgnore.split(",")
if len(events) > 0:
event_names = ", ".join(["'{}'".format(event.strip()) for event in events])
query = f"event not in ({event_names})"
hf["filters"]["events"] = [
{
"id": None,
"name": "All events",
"type": "events",
"order": 0,
"properties": [{"key": query, "type": "hogql"}],
}
]

hf["inputs"] = {
"topicId": {"value": topicId},
"payload": {
"value": {
"event": "{event.name}",
"distinct_id": "{event.distinct_id}",
"timestamp": "{event.timestamp}",
"uuid": "{event.uuid}",
"properties": "{event.properties}",
"person_id": "{person.id}",
"person_properties": "{person.properties}",
}
},
"auth": {"value": integration.id},
"attributes": {"value": {}},
}

return hf
Loading

0 comments on commit f1e7601

Please sign in to comment.