From 8599213ff7ee7b371bdbaf1b81482df62b5d3358 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 3 Jan 2025 09:14:50 +0100 Subject: [PATCH] feat(temporal): Added posthog client to temporal to enable exceptions capture (#26583) --- posthog/temporal/common/posthog_client.py | 54 +++++++++++++++++++++++ posthog/temporal/common/worker.py | 6 ++- 2 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 posthog/temporal/common/posthog_client.py diff --git a/posthog/temporal/common/posthog_client.py b/posthog/temporal/common/posthog_client.py new file mode 100644 index 0000000000000..2680c3c4b9021 --- /dev/null +++ b/posthog/temporal/common/posthog_client.py @@ -0,0 +1,54 @@ +import asyncio +from typing import Any, Optional +from posthoganalytics.client import Client +from temporalio.worker import ( + ActivityInboundInterceptor, + ExecuteActivityInput, + ExecuteWorkflowInput, + Interceptor, + WorkflowInboundInterceptor, + WorkflowInterceptorClassInput, +) + + +class _PostHogClientActivityInboundInterceptor(ActivityInboundInterceptor): + async def execute_activity(self, input: ExecuteActivityInput) -> Any: + ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True) + + try: + activity_result = await super().execute_activity(input) + except: + raise + finally: + await asyncio.to_thread(ph_client.flush) + + return activity_result + + +class _PostHogClientWorkflowInterceptor(WorkflowInboundInterceptor): + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True) + + try: + workflow_result = await super().execute_workflow(input) + except: + raise + finally: + await asyncio.to_thread(ph_client.flush) + + return workflow_result + + +class PostHogClientInterceptor(Interceptor): + """PostHog Interceptor class which will report workflow & activity exceptions to PostHog""" + + def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.intercept_activity`. + """ + return _PostHogClientActivityInboundInterceptor(super().intercept_activity(next)) + + def workflow_interceptor_class( + self, input: WorkflowInterceptorClassInput + ) -> Optional[type[WorkflowInboundInterceptor]]: + return _PostHogClientWorkflowInterceptor diff --git a/posthog/temporal/common/worker.py b/posthog/temporal/common/worker.py index f5db3d6b0417d..398fdfa09dc2e 100644 --- a/posthog/temporal/common/worker.py +++ b/posthog/temporal/common/worker.py @@ -8,6 +8,7 @@ from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2 from posthog.temporal.common.client import connect +from posthog.temporal.common.posthog_client import PostHogClientInterceptor from posthog.temporal.common.sentry import SentryInterceptor @@ -35,6 +36,7 @@ async def start_worker( client_key, runtime=runtime, ) + if task_queue == DATA_WAREHOUSE_TASK_QUEUE_V2: worker = Worker( client, @@ -43,7 +45,7 @@ async def start_worker( activities=activities, workflow_runner=UnsandboxedWorkflowRunner(), graceful_shutdown_timeout=timedelta(minutes=5), - interceptors=[SentryInterceptor()], + interceptors=[SentryInterceptor(), PostHogClientInterceptor()], activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), # Only run one workflow at a time max_concurrent_activities=1, @@ -59,7 +61,7 @@ async def start_worker( activities=activities, workflow_runner=UnsandboxedWorkflowRunner(), graceful_shutdown_timeout=timedelta(minutes=5), - interceptors=[SentryInterceptor()], + interceptors=[SentryInterceptor(), PostHogClientInterceptor()], activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), max_concurrent_activities=max_concurrent_activities or 50, max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,