Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(temporal): Added posthog client to temporal to enable exceptions capture #26583

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions posthog/temporal/common/posthog_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Any, Optional
from posthoganalytics.client import Client
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)


class _PostHogClientActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True)

activity_result = await super().execute_activity(input)

ph_client.flush()

return activity_result


class _PostHogClientWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
ph_client = Client(api_key="sTMFPsFhdP1Ssg", enable_exception_autocapture=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, is this api key public?


workflow_result = await super().execute_workflow(input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we wrap this call in a try, except? Otherwise we won't be calling ph_client.flush() if anything is raised.


ph_client.flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this call blocking? Should we send it to a thread with asyncio.to_thread?


return workflow_result


class PostHogClientInterceptor(Interceptor):
"""PostHog Interceptor class which will report workflow & activity exceptions to PostHog"""

def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
"""Implementation of
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
"""
return _PostHogClientActivityInboundInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[type[WorkflowInboundInterceptor]]:
return _PostHogClientWorkflowInterceptor
3 changes: 2 additions & 1 deletion posthog/temporal/common/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from posthog.temporal.common.client import connect
from posthog.temporal.common.posthog_client import PostHogClientInterceptor
from posthog.temporal.common.sentry import SentryInterceptor


Expand Down Expand Up @@ -41,7 +42,7 @@ async def start_worker(
activities=activities,
workflow_runner=UnsandboxedWorkflowRunner(),
graceful_shutdown_timeout=timedelta(minutes=5),
interceptors=[SentryInterceptor()],
interceptors=[SentryInterceptor(), PostHogClientInterceptor()],
activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50),
max_concurrent_activities=max_concurrent_activities or 50,
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
Expand Down
Loading