diff --git a/posthog/temporal/common/posthog_client.py b/posthog/temporal/common/posthog_client.py new file mode 100644 index 0000000000000..f0f5062a770b5 --- /dev/null +++ b/posthog/temporal/common/posthog_client.py @@ -0,0 +1,47 @@ +from typing import Any, Optional +from posthoganalytics.client import Client +from temporalio.worker import ( + ActivityInboundInterceptor, + ExecuteActivityInput, + ExecuteWorkflowInput, + Interceptor, + WorkflowInboundInterceptor, + WorkflowInterceptorClassInput, +) + + +class _PostHogClientActivityInboundInterceptor(ActivityInboundInterceptor): + async def execute_activity(self, input: ExecuteActivityInput) -> Any: + ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True) + + activity_result = await super().execute_activity(input) + + ph_client.flush() + + return activity_result + + +class _PostHogClientWorkflowInterceptor(WorkflowInboundInterceptor): + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True) + + workflow_result = await super().execute_workflow(input) + + ph_client.flush() + + return workflow_result + + +class PostHogClientInterceptor(Interceptor): + """PostHog Interceptor class which will report workflow & activity exceptions to PostHog""" + + def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.intercept_activity`. + """ + return _PostHogClientActivityInboundInterceptor(super().intercept_activity(next)) + + def workflow_interceptor_class( + self, input: WorkflowInterceptorClassInput + ) -> Optional[type[WorkflowInboundInterceptor]]: + return _PostHogClientWorkflowInterceptor diff --git a/posthog/temporal/common/worker.py b/posthog/temporal/common/worker.py index 2e7118c7934ae..e938a493dcc5c 100644 --- a/posthog/temporal/common/worker.py +++ b/posthog/temporal/common/worker.py @@ -7,6 +7,7 @@ from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.temporal.common.client import connect +from posthog.temporal.common.posthog_client import PostHogClientInterceptor from posthog.temporal.common.sentry import SentryInterceptor @@ -41,7 +42,7 @@ async def start_worker( activities=activities, workflow_runner=UnsandboxedWorkflowRunner(), graceful_shutdown_timeout=timedelta(minutes=5), - interceptors=[SentryInterceptor()], + interceptors=[SentryInterceptor(), PostHogClientInterceptor()], activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), max_concurrent_activities=max_concurrent_activities or 50, max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,