Skip to content

Commit

Permalink
feat(temporal): Added posthog client to temporal to enable exceptions…
Browse files Browse the repository at this point in the history
… capture (#26583)
  • Loading branch information
Gilbert09 authored Jan 3, 2025
1 parent 1018ca8 commit 8599213
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
54 changes: 54 additions & 0 deletions posthog/temporal/common/posthog_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
from typing import Any, Optional
from posthoganalytics.client import Client
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)


class _PostHogClientActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True)

try:
activity_result = await super().execute_activity(input)
except:
raise
finally:
await asyncio.to_thread(ph_client.flush)

return activity_result


class _PostHogClientWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True)

try:
workflow_result = await super().execute_workflow(input)
except:
raise
finally:
await asyncio.to_thread(ph_client.flush)

return workflow_result


class PostHogClientInterceptor(Interceptor):
"""PostHog Interceptor class which will report workflow & activity exceptions to PostHog"""

def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
"""Implementation of
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
"""
return _PostHogClientActivityInboundInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[type[WorkflowInboundInterceptor]]:
return _PostHogClientWorkflowInterceptor
6 changes: 4 additions & 2 deletions posthog/temporal/common/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2
from posthog.temporal.common.client import connect
from posthog.temporal.common.posthog_client import PostHogClientInterceptor
from posthog.temporal.common.sentry import SentryInterceptor


Expand Down Expand Up @@ -35,6 +36,7 @@ async def start_worker(
client_key,
runtime=runtime,
)

if task_queue == DATA_WAREHOUSE_TASK_QUEUE_V2:
worker = Worker(
client,
Expand All @@ -43,7 +45,7 @@ async def start_worker(
activities=activities,
workflow_runner=UnsandboxedWorkflowRunner(),
graceful_shutdown_timeout=timedelta(minutes=5),
interceptors=[SentryInterceptor()],
interceptors=[SentryInterceptor(), PostHogClientInterceptor()],
activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50),
# Only run one workflow at a time
max_concurrent_activities=1,
Expand All @@ -59,7 +61,7 @@ async def start_worker(
activities=activities,
workflow_runner=UnsandboxedWorkflowRunner(),
graceful_shutdown_timeout=timedelta(minutes=5),
interceptors=[SentryInterceptor()],
interceptors=[SentryInterceptor(), PostHogClientInterceptor()],
activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50),
max_concurrent_activities=max_concurrent_activities or 50,
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
Expand Down

0 comments on commit 8599213

Please sign in to comment.