From 2c979a99e03af325bd392f29a53e4ca3aabf535c Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Mon, 16 Dec 2024 22:04:37 -0800 Subject: [PATCH 1/7] local main --- synth_sdk/tracing/upload.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 5ee112f..244ac53 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -87,11 +87,11 @@ def load_signed_url(signed_url: str, dataset: Dataset, traces: List[SystemTrace] else: print(f"Successfully loaded signed URL Status Code: {response.status_code} Response: {response.text}, Signed URL: {signed_url}") -def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, verbose: bool = False): +def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, system_id: str, verbose: bool = False): # Create async function that contains all async operations async def _async_operations(): - upload_id, signed_url = await get_upload_id(base_url, api_key, verbose) + upload_id, signed_url = await get_upload_id(base_url, api_key, system_id, verbose) load_signed_url(signed_url, dataset, traces) token_url = f"{base_url}/v1/auth/token" @@ -141,13 +141,13 @@ async def _async_operations(): loop = asyncio.get_event_loop() return loop.run_until_complete(_async_operations()) -async def get_upload_id(base_url: str, api_key: str, verbose: bool = False): +async def get_upload_id(base_url: str, api_key: str, system_id, verbose: bool = False): token_url = f"{base_url}/v1/auth/token" token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key}) token_response.raise_for_status() access_token = token_response.json()["access_token"] - api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url" + api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?system_id={system_id}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}", @@ -390,6 +390,7 @@ def upload_helper( traces=traces, base_url="https://agent-learning.onrender.com", api_key=api_key, + special_system_id=special_system_id, verbose=verbose, ) From 9f160b97534db0b208ef921fa83cd50972ba0db1 Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Tue, 17 Dec 2024 01:14:37 -0800 Subject: [PATCH 2/7] finished --- openapi.json | 4 +- synth_sdk/tracing/abstractions.py | 60 +-- synth_sdk/tracing/decorators.py | 254 +++++++------ synth_sdk/tracing/events/manage.py | 68 ++-- synth_sdk/tracing/events/scope.py | 31 +- synth_sdk/tracing/events/store.py | 105 +++--- synth_sdk/tracing/local.py | 13 +- synth_sdk/tracing/upload.py | 164 ++++---- synth_sdk/tracing/utils.py | 11 + .../records/episode_classic_0.json | 24 ++ .../records/episode_classic_1.json | 24 ++ .../records/episode_classic_2.json | 24 ++ tutorials/AsyncAgentExample.py | 57 +-- tutorials/SimpleAgentExample.py | 60 +-- tutorials/SyncInAsyncExample.py | 62 +-- tutorials/reward_signals.json | 21 +- tutorials/reward_signals_async.json | 21 +- tutorials/traces.json | 241 +++++++++++- tutorials/traces_async.json | 352 +++++++++++++++++- 19 files changed, 1206 insertions(+), 390 deletions(-) create mode 100644 synth_sdk/tracing/utils.py create mode 100644 tests/iteration/craftax/generate_data/records/episode_classic_0.json create mode 100644 tests/iteration/craftax/generate_data/records/episode_classic_1.json create mode 100644 tests/iteration/craftax/generate_data/records/episode_classic_2.json diff --git a/openapi.json b/openapi.json index 3cf3b6f..8a47144 100644 --- a/openapi.json +++ b/openapi.json @@ -204,7 +204,7 @@ "Classes" ], "summary": "abstractions.TrainingQuestion", - "description": "A training question is a question that an agent (system_id) is trying to answer.\nIt contains an intent and criteria that the agent is trying to meet.\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", + "description": "A training question is a question that an agent (instance_system_id) is trying to answer.\nIt contains an intent and criteria that the agent is trying to meet.\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", "externalDocs": { "description": "View source on GitHub", "url": "https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py" @@ -235,7 +235,7 @@ "Classes" ], "summary": "abstractions.RewardSignal", - "description": "A reward signal tells us how well an agent (system_id) is doing on a particular question (question_id).\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", + "description": "A reward signal tells us how well an agent (instance_system_id) is doing on a particular question (question_id).\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", "externalDocs": { "description": "View source on GitHub", "url": "https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py" diff --git a/synth_sdk/tracing/abstractions.py b/synth_sdk/tracing/abstractions.py index 55cd523..9918264 100644 --- a/synth_sdk/tracing/abstractions.py +++ b/synth_sdk/tracing/abstractions.py @@ -1,9 +1,10 @@ +import logging from dataclasses import dataclass -from typing import Any, List, Dict, Optional, Union from datetime import datetime +from typing import Any, Dict, List, Optional, Union + from pydantic import BaseModel -import logging -from synth_sdk.tracing.config import VALID_TYPES + logger = logging.getLogger(__name__) @@ -39,13 +40,15 @@ class ComputeStep: def to_dict(self): # Serialize compute_input serializable_input = [ - input_item.__dict__ for input_item in self.compute_input + input_item.__dict__ + for input_item in self.compute_input if isinstance(input_item, (MessageInputs, ArbitraryInputs)) ] # Serialize compute_output serializable_output = [ - output_item.__dict__ for output_item in self.compute_output + output_item.__dict__ + for output_item in self.compute_output if isinstance(output_item, (MessageOutputs, ArbitraryOutputs)) ] @@ -59,8 +62,12 @@ def to_dict(self): return { "event_order": self.event_order, - "compute_ended": self.compute_ended.isoformat() if isinstance(self.compute_ended, datetime) else self.compute_ended, - "compute_began": self.compute_began.isoformat() if isinstance(self.compute_began, datetime) else self.compute_began, + "compute_ended": self.compute_ended.isoformat() + if isinstance(self.compute_ended, datetime) + else self.compute_ended, + "compute_began": self.compute_began.isoformat() + if isinstance(self.compute_began, datetime) + else self.compute_began, "compute_input": serializable_input, "compute_output": serializable_output, } @@ -86,7 +93,7 @@ class EnvironmentComputeStep(ComputeStep): @dataclass class Event: - system_id: str + instance_system_id: str event_type: str opened: Any # timestamp closed: Any # timestamp @@ -97,8 +104,12 @@ class Event: def to_dict(self): return { "event_type": self.event_type, - "opened": self.opened.isoformat() if isinstance(self.opened, datetime) else self.opened, - "closed": self.closed.isoformat() if isinstance(self.closed, datetime) else self.closed, + "opened": self.opened.isoformat() + if isinstance(self.opened, datetime) + else self.opened, + "closed": self.closed.isoformat() + if isinstance(self.closed, datetime) + else self.closed, "partition_index": self.partition_index, "agent_compute_steps": [ step.to_dict() for step in self.agent_compute_steps @@ -124,6 +135,7 @@ def to_dict(self): @dataclass class SystemTrace: system_id: str + instance_system_id: str metadata: Optional[Dict[str, Any]] partition: List[EventPartitionElement] current_partition_index: int = 0 # Track current partition @@ -131,17 +143,19 @@ class SystemTrace: def to_dict(self): return { "system_id": self.system_id, + "instance_system_id": self.instance_system_id, "partition": [element.to_dict() for element in self.partition], "current_partition_index": self.current_partition_index, - "metadata": self.metadata if self.metadata else None + "metadata": self.metadata if self.metadata else None, } class TrainingQuestion(BaseModel): - ''' - A training question is a question that an agent (system_id) is trying to answer. + """ + A training question is a question that an agent (instance_system_id) is trying to answer. It contains an intent and criteria that the agent is trying to meet. - ''' + """ + id: str intent: str criteria: str @@ -155,28 +169,30 @@ def to_dict(self): class RewardSignal(BaseModel): - ''' - A reward signal tells us how well an agent (system_id) is doing on a particular question (question_id). - ''' + """ + A reward signal tells us how well an agent (instance_system_id) is doing on a particular question (question_id). + """ + question_id: str - system_id: str + instance_system_id: str reward: Union[float, int, bool] annotation: Optional[str] = None def to_dict(self): return { "question_id": self.question_id, - "system_id": self.system_id, + "instance_system_id": self.instance_system_id, "reward": self.reward, "annotation": self.annotation, } class Dataset(BaseModel): - ''' - A dataset is a collection of training questions and reward signals. + """ + A dataset is a collection of training questions and reward signals. This better represents the data that is used to train a model, and gives us more information about the data. - ''' + """ + questions: List[TrainingQuestion] reward_signals: List[RewardSignal] diff --git a/synth_sdk/tracing/decorators.py b/synth_sdk/tracing/decorators.py index 6358f3f..309a0c7 100644 --- a/synth_sdk/tracing/decorators.py +++ b/synth_sdk/tracing/decorators.py @@ -1,43 +1,39 @@ # synth_sdk/tracing/decorators.py -from typing import Callable, Optional, Set, Literal, Any, Dict, Tuple, Union, List -from functools import wraps -import time +import inspect import logging - -from synth_sdk.tracing.abstractions import ( - Event, - AgentComputeStep, - EnvironmentComputeStep, -) -from synth_sdk.tracing.events.store import event_store -from synth_sdk.tracing.local import _local, logger -from synth_sdk.tracing.trackers import synth_tracker_sync, synth_tracker_async, SynthTracker -from synth_sdk.tracing.events.manage import set_current_event - -from typing import Callable, Optional, Set, Literal, Any, Dict, Tuple, Union -from functools import wraps import time -import logging -from pydantic import BaseModel +from functools import wraps +from typing import Any, Callable, Dict, List, Literal + from synth_sdk.tracing.abstractions import ( + AgentComputeStep, ArbitraryInputs, ArbitraryOutputs, + EnvironmentComputeStep, + Event, MessageInputs, MessageOutputs, - Event, - AgentComputeStep, - EnvironmentComputeStep, ) -from synth_sdk.tracing.events.store import event_store -from synth_sdk.tracing.local import system_id_var, active_events_var -from synth_sdk.tracing.trackers import synth_tracker_async from synth_sdk.tracing.events.manage import set_current_event - -import inspect +from synth_sdk.tracing.events.store import event_store +from synth_sdk.tracing.local import ( + _local, + active_events_var, + instance_system_id_var, + logger, + system_name_var, + system_id_var, +) +from synth_sdk.tracing.utils import get_system_id +from synth_sdk.tracing.trackers import ( + synth_tracker_async, + synth_tracker_sync, +) logger = logging.getLogger(__name__) + # # This decorator is used to trace synchronous functions def trace_system_sync( origin: Literal["agent", "environment"], @@ -49,9 +45,10 @@ def trace_system_sync( finetune_step: bool = True, ) -> Callable: """Decorator for tracing synchronous functions. - + Purpose is to keep track of inputs and outputs for compute steps for sync functions. """ + def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): @@ -65,11 +62,18 @@ def wrapper(*args, **kwargs): else: self_instance = func.__self__ - if not hasattr(self_instance, "system_id"): - raise ValueError("Instance missing required system_id attribute") + # Ensure required attributes are present + required_attrs = ["instance_system_id", "system_name"] + for attr in required_attrs: + if not hasattr(self_instance, attr): + raise ValueError(f"Instance missing required attribute '{attr}'") - _local.system_id = self_instance.system_id - #logger.debug(f"Set system_id in thread local: {_local.system_id}") + # Set thread-local variables + _local.instance_system_id = self_instance.instance_system_id + _local.system_name = self_instance.system_name + _local.system_id = get_system_id( + self_instance.system_name + )#self_instance.system_id # Initialize Trace synth_tracker_sync.initialize() @@ -77,15 +81,15 @@ def wrapper(*args, **kwargs): # Initialize active_events if not present if not hasattr(_local, "active_events"): _local.active_events = {} - #logger.debug("Initialized active_events in thread local storage") + # logger.debug("Initialized active_events in thread local storage") event = None compute_began = time.time() try: if manage_event == "create": - #logger.debug("Creating new event") + # logger.debug("Creating new event") event = Event( - system_id=_local.system_id, + instance_system_id=_local.instance_system_id, event_type=event_type, opened=compute_began, closed=None, @@ -95,7 +99,8 @@ def wrapper(*args, **kwargs): ) if increment_partition: event.partition_index = event_store.increment_partition( - _local.system_id + _local.instance_system_id, + _local.system_id, ) logger.debug( f"Incremented partition to: {event.partition_index}" @@ -110,9 +115,7 @@ def wrapper(*args, **kwargs): if param == "self": continue synth_tracker_sync.track_state( - variable_name=param, - variable_value=value, - origin=origin + variable_name=param, variable_value=value, origin=origin ) # Execute the function @@ -133,16 +136,18 @@ def wrapper(*args, **kwargs): # Organize traced data by origin for item in traced_inputs: - var_origin = item['origin'] - if 'variable_value' in item and 'variable_name' in item: + var_origin = item["origin"] + if "variable_value" in item and "variable_name" in item: # Standard variable input compute_steps_by_origin[var_origin]["inputs"].append( - ArbitraryInputs(inputs={item['variable_name']: item['variable_value']}) + ArbitraryInputs( + inputs={item["variable_name"]: item["variable_value"]} + ) ) - elif 'messages' in item: + elif "messages" in item: # Message input from track_lm compute_steps_by_origin[var_origin]["inputs"].append( - MessageInputs(messages=item['messages']) + MessageInputs(messages=item["messages"]) ) compute_steps_by_origin[var_origin]["inputs"].append( ArbitraryInputs(inputs={"model_name": item["model_name"]}) @@ -155,16 +160,18 @@ def wrapper(*args, **kwargs): logger.warning(f"Unhandled traced input item: {item}") for item in traced_outputs: - var_origin = item['origin'] - if 'variable_value' in item and 'variable_name' in item: + var_origin = item["origin"] + if "variable_value" in item and "variable_name" in item: # Standard variable output compute_steps_by_origin[var_origin]["outputs"].append( - ArbitraryOutputs(outputs={item['variable_name']: item['variable_value']}) + ArbitraryOutputs( + outputs={item["variable_name"]: item["variable_value"]} + ) ) - elif 'messages' in item: + elif "messages" in item: # Message output from track_lm compute_steps_by_origin[var_origin]["outputs"].append( - MessageOutputs(messages=item['messages']) + MessageOutputs(messages=item["messages"]) ) else: logger.warning(f"Unhandled traced output item: {item}") @@ -215,17 +222,14 @@ def wrapper(*args, **kwargs): logger.info(f"Function result: {result}") # Handle event management after function execution - if ( - manage_event == "end" - and event_type in _local.active_events - ): + if manage_event == "end" and event_type in _local.active_events: current_event = _local.active_events[event_type] current_event.closed = compute_ended # Store the event - if hasattr(_local, "system_id"): - event_store.add_event(_local.system_id, current_event) + if hasattr(_local, "instance_system_id"): + event_store.add_event(_local.instance_system_id, _local.system_id, current_event) # logger.debug( - # f"Stored and closed event {event_type} for system {_local.system_id}" + # f"Stored and closed event {event_type} for system {_local.instance_system_id}" # ) del _local.active_events[event_type] @@ -234,15 +238,16 @@ def wrapper(*args, **kwargs): logger.error(f"Exception in traced function '{func.__name__}': {e}") raise finally: - #synth_tracker_sync.finalize() - if hasattr(_local, "system_id"): - #logger.debug(f"Cleaning up system_id: {_local.system_id}") - delattr(_local, "system_id") + # synth_tracker_sync.finalize() + if hasattr(_local, "instance_system_id"): + # logger.debug(f"Cleaning up instance_system_id: {_local.instance_system_id}") + delattr(_local, "instance_system_id") return wrapper return decorator + def trace_system_async( origin: Literal["agent", "environment"], event_type: str, @@ -253,33 +258,13 @@ def trace_system_async( finetune_step: bool = True, ) -> Callable: """Decorator for tracing asynchronous functions. - + Purpose is to keep track of inputs and outputs for compute steps for async functions. """ def decorator(func: Callable) -> Callable: @wraps(func) async def async_wrapper(*args, **kwargs): - # logger.debug(f"Starting async_wrapper for {func.__name__}") - # logger.debug(f"Args: {args}") - # logger.debug(f"Kwargs: {kwargs}") - - # Automatically trace function inputs - bound_args = inspect.signature(func).bind(*args, **kwargs) - bound_args.apply_defaults() - #logger.debug(f"Bound args: {bound_args.arguments}") - - for param, value in bound_args.arguments.items(): - if param == "self": - continue - #logger.debug(f"Tracking input param: {param} = {value}") - synth_tracker_async.track_state( - variable_name=param, - variable_value=value, - origin=origin, - io_type="input" - ) - # Determine the instance (self) if it's a method if not hasattr(func, "__self__") or not func.__self__: if not args: @@ -290,12 +275,18 @@ async def async_wrapper(*args, **kwargs): else: self_instance = func.__self__ - if not hasattr(self_instance, "system_id"): - raise ValueError("Instance missing required system_id attribute") + # Ensure required attributes are present + required_attrs = ["instance_system_id", "system_name"] + for attr in required_attrs: + if not hasattr(self_instance, attr): + raise ValueError(f"Instance missing required attribute '{attr}'") - # Set system_id using context variable - system_id_token = system_id_var.set(self_instance.system_id) - #logger.debug(f"Set system_id in context vars: {self_instance.system_id}") + # Set context variables + instance_system_id_token = instance_system_id_var.set( + self_instance.instance_system_id + ) + system_name_token = system_name_var.set(self_instance.system_name) + system_id_token = system_id_var.set(get_system_id(self_instance.system_name)) # Initialize AsyncTrace synth_tracker_async.initialize() @@ -304,15 +295,15 @@ async def async_wrapper(*args, **kwargs): current_active_events = active_events_var.get() if not current_active_events: active_events_var.set({}) - #logger.debug("Initialized active_events in context vars") + # logger.debug("Initialized active_events in context vars") event = None compute_began = time.time() try: if manage_event == "create": - #logger.debug("Creating new event") + # logger.debug("Creating new event") event = Event( - system_id=self_instance.system_id, + instance_system_id=self_instance.instance_system_id, event_type=event_type, opened=compute_began, closed=None, @@ -322,9 +313,12 @@ async def async_wrapper(*args, **kwargs): ) if increment_partition: event.partition_index = event_store.increment_partition( + instance_system_id_var.get(), system_id_var.get() ) - logger.debug(f"Incremented partition to: {event.partition_index}") + logger.debug( + f"Incremented partition to: {event.partition_index}" + ) set_current_event(event, decorator_type="async") logger.debug(f"Created and set new event: {event_type}") @@ -339,7 +333,7 @@ async def async_wrapper(*args, **kwargs): variable_name=param, variable_value=value, origin=origin, - io_type="input" + io_type="input", ) # Execute the coroutine @@ -360,16 +354,18 @@ async def async_wrapper(*args, **kwargs): # Organize traced data by origin for item in traced_inputs: - var_origin = item['origin'] - if 'variable_value' in item and 'variable_name' in item: + var_origin = item["origin"] + if "variable_value" in item and "variable_name" in item: # Standard variable input compute_steps_by_origin[var_origin]["inputs"].append( - ArbitraryInputs(inputs={item['variable_name']: item['variable_value']}) + ArbitraryInputs( + inputs={item["variable_name"]: item["variable_value"]} + ) ) - elif 'messages' in item: + elif "messages" in item: # Message input from track_lm compute_steps_by_origin[var_origin]["inputs"].append( - MessageInputs(messages=item['messages']) + MessageInputs(messages=item["messages"]) ) compute_steps_by_origin[var_origin]["inputs"].append( ArbitraryInputs(inputs={"model_name": item["model_name"]}) @@ -382,16 +378,18 @@ async def async_wrapper(*args, **kwargs): logger.warning(f"Unhandled traced input item: {item}") for item in traced_outputs: - var_origin = item['origin'] - if 'variable_value' in item and 'variable_name' in item: + var_origin = item["origin"] + if "variable_value" in item and "variable_name" in item: # Standard variable output compute_steps_by_origin[var_origin]["outputs"].append( - ArbitraryOutputs(outputs={item['variable_name']: item['variable_value']}) + ArbitraryOutputs( + outputs={item["variable_name"]: item["variable_value"]} + ) ) - elif 'messages' in item: + elif "messages" in item: # Message output from track_lm compute_steps_by_origin[var_origin]["outputs"].append( - MessageOutputs(messages=item['messages']) + MessageOutputs(messages=item["messages"]) ) else: logger.warning(f"Unhandled traced output item: {item}") @@ -440,18 +438,16 @@ async def async_wrapper(*args, **kwargs): logger.info(f"Function result: {result}") # Handle event management after function execution - if ( - manage_event == "end" - and event_type in active_events_var.get() - ): + if manage_event == "end" and event_type in active_events_var.get(): current_event = active_events_var.get()[event_type] current_event.closed = compute_ended # Store the event - if system_id_var.get(): - event_store.add_event(system_id_var.get(), current_event) - # logger.debug( - # f"Stored and closed event {event_type} for system {system_id_var.get()}" - # ) + if instance_system_id_var.get(): + event_store.add_event( + instance_system_id_var.get(), + system_id_var.get(), + current_event + ) active_events = active_events_var.get() del active_events[event_type] active_events_var.set(active_events) @@ -461,13 +457,18 @@ async def async_wrapper(*args, **kwargs): logger.error(f"Exception in traced function '{func.__name__}': {e}") raise finally: - #synth_tracker_async.finalize() - # Reset context variable for system_id + # synth_tracker_async.finalize() + # Reset context variables + instance_system_id_var.reset(instance_system_id_token) + system_name_var.reset(system_name_token) system_id_var.reset(system_id_token) - #logger.debug("Cleaning up system_id from context vars") + # logger.debug("Cleaning up instance_system_id from context vars") + return async_wrapper + return decorator + def trace_system( origin: Literal["agent", "environment"], event_type: str, @@ -482,25 +483,37 @@ def trace_system( Purpose is to keep track of inputs and outputs for compute steps for both sync and async functions. """ + def decorator(func: Callable) -> Callable: # Check if the function is async or sync if inspect.iscoroutinefunction(func) or inspect.isasyncgenfunction(func): # Use async tracing - #logger.debug("Using async tracing") + # logger.debug("Using async tracing") async_decorator = trace_system_async( - origin, event_type, log_result, manage_event, increment_partition, verbose + origin, + event_type, + log_result, + manage_event, + increment_partition, + verbose, ) return async_decorator(func) else: # Use sync tracing - #logger.debug("Using sync tracing") + # logger.debug("Using sync tracing") sync_decorator = trace_system_sync( - origin, event_type, log_result, manage_event, increment_partition, verbose + origin, + event_type, + log_result, + manage_event, + increment_partition, + verbose, ) return sync_decorator(func) return decorator + def track_result(result, tracker, origin): # Helper function to track results, including tuple unpacking if isinstance(result, tuple): @@ -508,9 +521,7 @@ def track_result(result, tracker, origin): for i, item in enumerate(result): try: tracker.track_state( - variable_name=f"result_{i}", - variable_value=item, - origin=origin + variable_name=f"result_{i}", variable_value=item, origin=origin ) except Exception as e: logger.warning(f"Could not track tuple element {i}: {str(e)}") @@ -518,10 +529,7 @@ def track_result(result, tracker, origin): # Track single result as before try: tracker.track_state( - variable_name="result", - variable_value=result, - origin=origin + variable_name="result", variable_value=result, origin=origin ) except Exception as e: logger.warning(f"Could not track result: {str(e)}") - diff --git a/synth_sdk/tracing/events/manage.py b/synth_sdk/tracing/events/manage.py index 2dfc89f..e5026c0 100644 --- a/synth_sdk/tracing/events/manage.py +++ b/synth_sdk/tracing/events/manage.py @@ -1,14 +1,10 @@ -from typing import Callable, Optional, Set, Literal, Any, Dict, Tuple, Union -from functools import wraps -import threading import time -import logging -import inspect -import contextvars -from pydantic import BaseModel -from synth_sdk.tracing.local import _local, logger -from synth_sdk.tracing.events.store import event_store +from typing import Literal, Optional + + from synth_sdk.tracing.abstractions import Event +from synth_sdk.tracing.events.store import event_store +from synth_sdk.tracing.local import _local, logger def get_current_event(event_type: str) -> "Event": @@ -22,7 +18,9 @@ def get_current_event(event_type: str) -> "Event": return events[event_type] -def set_current_event(event: Optional["Event"], decorator_type: Literal["sync", "async"]=None): +def set_current_event( + event: Optional["Event"], decorator_type: Literal["sync", "async"] = None +): """ Set the current event, ending any existing events of the same type. If event is None, it clears the current event of that type. @@ -30,11 +28,12 @@ def set_current_event(event: Optional["Event"], decorator_type: Literal["sync", if event is None: raise ValueError("Event cannot be None when setting current event.") - #logger.debug(f"Setting current event of type {event.event_type}") + # logger.debug(f"Setting current event of type {event.event_type}") # Check if we're in an async context try: import asyncio + asyncio.get_running_loop() is_async = True except RuntimeError: @@ -48,7 +47,10 @@ def set_current_event(event: Optional["Event"], decorator_type: Literal["sync", # If there's an existing event of the same type, end it if event.event_type in _local.active_events: - if _local.active_events[event.event_type].system_id == event.system_id: + if ( + _local.active_events[event.event_type].instance_system_id + == event.instance_system_id + ): logger.debug(f"Found existing event of type {event.event_type}") existing_event = _local.active_events[event.event_type] existing_event.closed = time.time() @@ -56,11 +58,15 @@ def set_current_event(event: Optional["Event"], decorator_type: Literal["sync", f"Closed existing event of type {event.event_type} at {existing_event.closed}" ) - # Store the closed event if system_id is present - if hasattr(_local, "system_id"): - logger.debug(f"Storing closed event for system {_local.system_id}") + # Store the closed event if instance_system_id is present + if hasattr(_local, "instance_system_id"): + logger.debug( + f"Storing closed event for system {_local.instance_system_id}" + ) try: - event_store.add_event(_local.system_id, existing_event) + event_store.add_event( + _local.instance_system_id, _local.system_id, existing_event + ) logger.debug("Successfully stored closed event") except Exception as e: logger.error(f"Failed to store closed event: {str(e)}") @@ -68,31 +74,36 @@ def set_current_event(event: Optional["Event"], decorator_type: Literal["sync", # Set the new event _local.active_events[event.event_type] = event - #logger.debug("New event set as current in thread local") + # logger.debug("New event set as current in thread local") else: - from synth_sdk.tracing.local import active_events_var, system_id_var + from synth_sdk.tracing.local import active_events_var, instance_system_id_var, system_id_var + # Get current active events from context var active_events = active_events_var.get() - + # If there's an existing event of the same type, end it if event.event_type in active_events: existing_event = active_events[event.event_type] - # Check that the active event has the same system_id as the one we're settting - if existing_event.system_id == event.system_id: + # Check that the active event has the same instance_system_id as the one we're settting + if existing_event.instance_system_id == event.instance_system_id: logger.debug(f"Found existing event of type {event.event_type}") existing_event.closed = time.time() logger.debug( f"Closed existing event of type {event.event_type} at {existing_event.closed}" ) - # Store the closed event if system_id is present - system_id = system_id_var.get() - if system_id: - logger.debug(f"Storing closed event for system {system_id}") + # Store the closed event if instance_system_id is present + instance_system_id = instance_system_id_var.get() + if instance_system_id: + logger.debug( + f"Storing closed event for system {instance_system_id}" + ) try: - event_store.add_event(system_id, existing_event) + event_store.add_event(instance_system_id, + system_id_var.get(), + existing_event) logger.debug("Successfully stored closed event") except Exception as e: logger.error(f"Failed to store closed event: {str(e)}") @@ -103,6 +114,7 @@ def set_current_event(event: Optional["Event"], decorator_type: Literal["sync", active_events_var.set(active_events) logger.debug("New event set as current in context vars") + def clear_current_event(event_type: str): if hasattr(_local, "active_events"): _local.active_events.pop(event_type, None) @@ -115,7 +127,7 @@ def end_event(event_type: str) -> Optional[Event]: if current_event: current_event.closed = time.time() # Store the event - if hasattr(_local, "system_id"): - event_store.add_event(_local.system_id, current_event) + if hasattr(_local, "instance_system_id"): + event_store.add_event(_local.instance_system_id, _local.system_id, current_event) clear_current_event(event_type) return current_event diff --git a/synth_sdk/tracing/events/scope.py b/synth_sdk/tracing/events/scope.py index bdefb86..ba0a3c8 100644 --- a/synth_sdk/tracing/events/scope.py +++ b/synth_sdk/tracing/events/scope.py @@ -1,9 +1,10 @@ -from contextlib import contextmanager import time +from contextlib import contextmanager + from synth_sdk.tracing.abstractions import Event -from synth_sdk.tracing.decorators import set_current_event, clear_current_event, _local +from synth_sdk.tracing.decorators import _local, clear_current_event, set_current_event from synth_sdk.tracing.events.store import event_store -from synth_sdk.tracing.local import system_id_var +from synth_sdk.tracing.local import instance_system_id_var, system_id_var @contextmanager @@ -18,16 +19,26 @@ def event_scope(event_type: str): # Check if we're in an async context try: import asyncio + asyncio.get_running_loop() is_async = True except RuntimeError: is_async = False - # Get system_id from appropriate source - system_id = system_id_var.get() if is_async else getattr(_local, "system_id", None) - + # Get instance_system_id from appropriate source + instance_system_id = ( + instance_system_id_var.get() + if is_async + else getattr(_local, "instance_system_id", None) + ) + system_id = ( + system_id_var.get() + if is_async + else getattr(_local, "system_id", None) + ) + event = Event( - system_id=system_id, + instance_system_id=instance_system_id, event_type=event_type, opened=time.time(), closed=None, @@ -42,6 +53,6 @@ def event_scope(event_type: str): finally: event.closed = time.time() clear_current_event(event_type) - # Store the event if system_id is available - if system_id: - event_store.add_event(system_id, event) + # Store the event if instance_system_id is available + if instance_system_id: + event_store.add_event(instance_system_id, system_id, event) diff --git a/synth_sdk/tracing/events/store.py b/synth_sdk/tracing/events/store.py index 7972b33..7371b50 100644 --- a/synth_sdk/tracing/events/store.py +++ b/synth_sdk/tracing/events/store.py @@ -1,13 +1,17 @@ import json -import threading import logging import time -from typing import Dict, List, Optional -from synth_sdk.tracing.abstractions import SystemTrace, EventPartitionElement, Event -from synth_sdk.tracing.config import tracer # Update this import line from threading import RLock # Change this import -from synth_sdk.tracing.local import _local, system_id_var, active_events_var # Import context variables +from typing import Dict, List +from synth_sdk.tracing.abstractions import Event, EventPartitionElement, SystemTrace +from synth_sdk.tracing.local import ( # Import context variables + _local, + active_events_var, + instance_system_id_var, + system_id_var, + system_name_var, +) logger = logging.getLogger(__name__) @@ -19,48 +23,49 @@ def __init__(self): self.logger = logging.getLogger(__name__) def get_or_create_system_trace( - self, system_id: str, _already_locked: bool = False + self, instance_system_id: str, system_id: str, _already_locked: bool = False ) -> SystemTrace: - """Get or create a SystemTrace for the given system_id.""" + """Get or create a SystemTrace for the given instance_system_id.""" logger = logging.getLogger(__name__) - #logger.debug(f"Starting get_or_create_system_trace for {system_id}") + # logger.debug(f"Starting get_or_create_system_trace for {instance_system_id}") def _get_or_create(): - #logger.debug("Inside _get_or_create") - if system_id not in self._traces: - #logger.debug(f"Creating new system trace for {system_id}") - self._traces[system_id] = SystemTrace( + # logger.debug("Inside _get_or_create") + if instance_system_id not in self._traces: + # logger.debug(f"Creating new system trace for {instance_system_id}") + self._traces[instance_system_id] = SystemTrace( system_id=system_id, + instance_system_id=instance_system_id, metadata={}, partition=[EventPartitionElement(partition_index=0, events=[])], current_partition_index=0, ) - #logger.debug("Returning system trace") - return self._traces[system_id] + # logger.debug("Returning system trace") + return self._traces[instance_system_id] if _already_locked: return _get_or_create() else: with self._lock: - #logger.debug("Lock acquired in get_or_create_system_trace") + # logger.debug("Lock acquired in get_or_create_system_trace") return _get_or_create() - def increment_partition(self, system_id: str) -> int: + def increment_partition(self, instance_system_id: str, system_id: str) -> int: """Increment the partition index for a system and create new partition element.""" logger = logging.getLogger(__name__) - #logger.debug(f"Starting increment_partition for system {system_id}") + # logger.debug(f"Starting increment_partition for system {instance_system_id}") with self._lock: - #logger.debug("Lock acquired in increment_partition") + # logger.debug("Lock acquired in increment_partition") system_trace = self.get_or_create_system_trace( - system_id, _already_locked=True + instance_system_id, system_id, _already_locked=True ) - #logger.debug( + # logger.debug( # f"Got system trace, current index: {system_trace.current_partition_index}" # ) system_trace.current_partition_index += 1 - #logger.debug( + # logger.debug( # f"Incremented index to: {system_trace.current_partition_index}" # ) @@ -69,25 +74,27 @@ def increment_partition(self, system_id: str) -> int: partition_index=system_trace.current_partition_index, events=[] ) ) - #logger.debug("Added new partition element") + # logger.debug("Added new partition element") return system_trace.current_partition_index - def add_event(self, system_id: str, event: Event): + def add_event(self, instance_system_id: str, system_id: str, event: Event): """Add an event to the appropriate partition of the system trace.""" - #self.#logger.debug(f"Adding event type {event.event_type} to system {system_id}") + # self.#logger.debug(f"Adding event type {event.event_type} to system {instance_system_id}") # self.#logger.debug( # f"Event details: opened={event.opened}, closed={event.closed}, partition={event.partition_index}" # ) - #print("Adding event to partition") + # print("Adding event to partition") - #try: + # try: if not self._lock.acquire(timeout=5): self.logger.error("Failed to acquire lock within timeout period") return try: - system_trace = self.get_or_create_system_trace(system_id) + system_trace = self.get_or_create_system_trace( + instance_system_id, system_id + ) # self.#logger.debug( # f"Got system trace with {len(system_trace.partition)} partitions" # ) @@ -109,7 +116,6 @@ def add_event(self, system_id: str, event: Event): f"No partition found for index {event.partition_index}" ) - current_partition.events.append(event) # self.#logger.debug( # f"Added event to partition {event.partition_index}. Total events: {len(current_partition.events)}" @@ -124,39 +130,40 @@ def get_system_traces(self) -> List[SystemTrace]: """Get all system traces.""" with self._lock: self.end_all_active_events() - + return list(self._traces.values()) def end_all_active_events(self): """End all active events and store them.""" - #self.#logger.debug("Ending all active events") - + # self.#logger.debug("Ending all active events") + # For synchronous code if hasattr(_local, "active_events"): active_events = _local.active_events + instance_system_id = getattr(_local, "instance_system_id", None) system_id = getattr(_local, "system_id", None) - if active_events:# and system_id: + if active_events: # and instance_system_id: for event_type, event in list(active_events.items()): if event.closed is None: event.closed = time.time() - self.add_event(event.system_id, event) - #self.#logger.debug(f"Stored and closed event {event_type}") + self.add_event(event.instance_system_id, system_id, event) + # self.#logger.debug(f"Stored and closed event {event_type}") _local.active_events.clear() - # For asynchronous code - active_events_async = active_events_var.get() - # Use preserved system ID if available, otherwise try to get from context - # system_id_async = preserved_system_id or system_id_var.get(None) - # print("System ID async: ", system_id_async) - # raise ValueError("Test error") - - if active_events_async:# and system_id_async: - for event_type, event in list(active_events_async.items()): - if event.closed is None: - event.closed = time.time() - self.add_event(event.system_id, event) - #self.#logger.debug(f"Stored and closed event {event_type}") - active_events_var.set({}) + else: + # For asynchronous code + active_events_async = active_events_var.get() + + + + if active_events_async: # and instance_system_id_async: + for event_type, event in list(active_events_async.items()): + system_id = system_id_var.get() + if event.closed is None: + event.closed = time.time() + self.add_event(event.instance_system_id, system_id, event) + # self.#logger.debug(f"Stored and closed event {event_type}") + active_events_var.set({}) def get_system_traces_json(self) -> str: """Get all system traces as JSON.""" @@ -164,7 +171,7 @@ def get_system_traces_json(self) -> str: return json.dumps( [ { - "system_id": trace.system_id, + "instance_system_id": trace.instance_system_id, "current_partition_index": trace.current_partition_index, "partition": [ { diff --git a/synth_sdk/tracing/local.py b/synth_sdk/tracing/local.py index 9ae21e7..1780177 100644 --- a/synth_sdk/tracing/local.py +++ b/synth_sdk/tracing/local.py @@ -1,20 +1,15 @@ -from typing import Callable, Optional, Set, Literal, Any, Dict, Tuple, Union -from functools import wraps -import threading -import time import logging -import inspect -import contextvars +import threading from contextvars import ContextVar -from pydantic import BaseModel - logger = logging.getLogger(__name__) -# Thread-local storage for active events and system_id +# Thread-local storage for active events and instance_system_id # Used for synchronous tracing _local = threading.local() # Used for asynchronous tracing +system_name_var: ContextVar[str] = ContextVar("system_name") system_id_var: ContextVar[str] = ContextVar("system_id") +instance_system_id_var: ContextVar[str] = ContextVar("instance_system_id") active_events_var: ContextVar[dict] = ContextVar("active_events", default={}) diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 244ac53..04c83fb 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -2,27 +2,24 @@ import json import logging import os +import ssl import time +import uuid from pprint import pprint -import asyncio -import sys -import boto3 -from datetime import datetime -from dotenv import load_dotenv from typing import Any, Dict, List import requests +from dotenv import load_dotenv from pydantic import BaseModel, validator from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry from urllib3.poolmanager import PoolManager -import ssl from synth_sdk.tracing.abstractions import Dataset, SystemTrace from synth_sdk.tracing.events.store import event_store load_dotenv() + # NOTE: This may cause memory issues in the future def validate_json(data: dict) -> None: # Validate that a dictionary contains only JSON-serializable values. @@ -53,28 +50,27 @@ class TLSAdapter(HTTPAdapter): def init_poolmanager(self, connections, maxsize, block=False): """Create and initialize the urllib3 PoolManager.""" ctx = ssl.create_default_context() - ctx.set_ciphers('DEFAULT@SECLEVEL=1') + ctx.set_ciphers("DEFAULT@SECLEVEL=1") self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, block=block, ssl_version=ssl.PROTOCOL_TLSv1_2, - ssl_context=ctx + ssl_context=ctx, ) + def load_signed_url(signed_url: str, dataset: Dataset, traces: List[SystemTrace]): payload = createPayload(dataset, traces) validate_json(payload) - + session = requests.Session() adapter = TLSAdapter() - session.mount('https://', adapter) - + session.mount("https://", adapter) + try: response = session.put( - signed_url, - json=payload, - headers={'Content-Type': 'application/json'} + signed_url, json=payload, headers={"Content-Type": "application/json"} ) response.raise_for_status() except requests.exceptions.RequestException as e: @@ -83,71 +79,80 @@ def load_signed_url(signed_url: str, dataset: Dataset, traces: List[SystemTrace] raise if response.status_code != 200: - raise ValueError(f"Failed to load signed URL Status Code: {response.status_code} Response: {response.text}, Signed URL: {signed_url}") + raise ValueError( + f"Failed to load signed URL Status Code: {response.status_code} Response: {response.text}, Signed URL: {signed_url}" + ) else: - print(f"Successfully loaded signed URL Status Code: {response.status_code} Response: {response.text}, Signed URL: {signed_url}") + print( + f"Successfully loaded signed URL Status Code: {response.status_code} Response: {response.text}, Signed URL: {signed_url}" + ) -def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, system_id: str, verbose: bool = False): - # Create async function that contains all async operations - async def _async_operations(): - upload_id, signed_url = await get_upload_id(base_url, api_key, system_id, verbose) - load_signed_url(signed_url, dataset, traces) +def send_system_traces_s3( + dataset: Dataset, + traces: List[SystemTrace], + base_url: str, + api_key: str, + system_id: str, + verbose: bool = False, +): + upload_id, signed_url = get_upload_id( + base_url, api_key, system_id, verbose + ) + load_signed_url(signed_url, dataset, traces) - token_url = f"{base_url}/v1/auth/token" - token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key}) + token_url = f"{base_url}/v1/auth/token" + try: + token_response = requests.get( + token_url, headers={"customer_specific_api_key": api_key} + ) token_response.raise_for_status() access_token = token_response.json()["access_token"] + except requests.exceptions.RequestException as e: + logging.error(f"Error obtaining access token: {e}") + raise - api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}" - data = {"signed_url": signed_url} - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {access_token}", - } + api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}" + data = {"signed_url": signed_url} + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {access_token}", + } - try: - response = requests.post(api_url, headers=headers, json=data) - response.raise_for_status() - - upload_id = response.json()["upload_id"] - signed_url = response.json()["signed_url"] - status = response.json()["status"] - - if verbose: - print(f"Status: {status}") - print(f"Upload ID retrieved: {upload_id}") - print(f"Signed URL: {signed_url}") - - return upload_id, signed_url - except requests.exceptions.HTTPError as e: - logging.error(f"HTTP error occurred: {e}") - raise - except Exception as e: - logging.error(f"An error occurred: {e}") - raise - - # Run the async operations in an event loop - if not is_event_loop_running(): - # If no event loop is running, create a new one - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(_async_operations()) - finally: - loop.close() - else: - # If an event loop is already running, use it - loop = asyncio.get_event_loop() - return loop.run_until_complete(_async_operations()) + try: + response = requests.post(api_url, headers=headers, json=data) + response.raise_for_status() + + response_data = response.json() + upload_id = response_data.get("upload_id") + signed_url = response_data.get("signed_url") + status = response_data.get("status") + + if verbose: + print(f"Status: {status}") + print(f"Upload ID retrieved: {upload_id}") + print(f"Signed URL: {signed_url}") + + return upload_id, signed_url + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise + except Exception as e: + logging.error(f"An error occurred: {e}") + raise -async def get_upload_id(base_url: str, api_key: str, system_id, verbose: bool = False): + +def get_upload_id( + base_url: str, api_key: str, instance_system_id, verbose: bool = False +): token_url = f"{base_url}/v1/auth/token" - token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key}) + token_response = requests.get( + token_url, headers={"customer_specific_api_key": api_key} + ) token_response.raise_for_status() access_token = token_response.json()["access_token"] - api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?system_id={system_id}" + api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?instance_system_id={instance_system_id}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}", @@ -169,6 +174,7 @@ async def get_upload_id(base_url: str, api_key: str, system_id, verbose: bool = logging.error(f"An error occurred: {e}") raise + class UploadValidator(BaseModel): traces: List[Dict[str, Any]] dataset: Dict[str, Any] @@ -180,8 +186,8 @@ def validate_traces(cls, traces): for trace in traces: # Validate required fields in each trace - if "system_id" not in trace: - raise ValueError("Each trace must have a system_id") + if "instance_system_id" not in trace: + raise ValueError("Each trace must have a instance_system_id") if "partition" not in trace: raise ValueError("Each trace must have a partition") @@ -245,7 +251,7 @@ def validate_dataset(cls, dataset): raise ValueError("Reward signals must be a list") for signal in reward_signals: - required_signal_fields = ["question_id", "system_id", "reward"] + required_signal_fields = ["question_id", "instance_system_id", "reward"] missing_fields = [f for f in required_signal_fields if f not in signal] if missing_fields: raise ValueError( @@ -284,7 +290,7 @@ def format_upload_output(dataset, traces): # Format reward signals array with error handling reward_signals_data = [ { - "system_id": rs.system_id, + "instance_system_id": rs.instance_system_id, "reward": rs.reward, "question_id": rs.question_id, "annotation": rs.annotation if hasattr(rs, "annotation") else None, @@ -295,7 +301,7 @@ def format_upload_output(dataset, traces): # Format traces array traces_data = [ { - "system_id": t.system_id, + "instance_system_id": t.instance_system_id, "metadata": t.metadata if t.metadata else None, "partition": [ { @@ -347,9 +353,11 @@ def upload_helper( for event_type, event in _local.active_events.items(): if event and event.closed is None: event.closed = time.time() - if hasattr(_local, "system_id"): + if hasattr(_local, "instance_system_id"): try: - event_store.add_event(_local.system_id, event) + event_store.add_event( + _local.instance_system_id, _local.system_id, event + ) if verbose: print(f"Closed and stored active event: {event_type}") except Exception as e: @@ -366,7 +374,7 @@ def upload_helper( for event in partition.events: if event.closed is None: event.closed = current_time - event_store.add_event(trace.system_id, event) + event_store.add_event(trace.instance_system_id, trace.system_id, event) if verbose: print(f"Closed existing unclosed event: {event.event_type}") @@ -390,7 +398,7 @@ def upload_helper( traces=traces, base_url="https://agent-learning.onrender.com", api_key=api_key, - special_system_id=special_system_id, + system_id=traces[0].system_id, verbose=verbose, ) @@ -406,7 +414,9 @@ def upload_helper( print("Payload sent to server: ") pprint(payload) - questions_json, reward_signals_json, traces_json = format_upload_output(dataset, traces) + questions_json, reward_signals_json, traces_json = format_upload_output( + dataset, traces + ) return response, questions_json, reward_signals_json, traces_json except ValueError as e: diff --git a/synth_sdk/tracing/utils.py b/synth_sdk/tracing/utils.py new file mode 100644 index 0000000..d65a8da --- /dev/null +++ b/synth_sdk/tracing/utils.py @@ -0,0 +1,11 @@ +import hashlib + + +def get_system_id(system_name: str) -> str: + """Create a deterministic instance_system_id from system_name using SHA-256.""" + if not system_name: + raise ValueError("system_name cannot be empty") + # Create SHA-256 hash of system_name + hash_object = hashlib.sha256(system_name.encode()) + # Take the first 16 characters of the hex digest for a shorter but still unique ID + return hash_object.hexdigest()[:16] diff --git a/tests/iteration/craftax/generate_data/records/episode_classic_0.json b/tests/iteration/craftax/generate_data/records/episode_classic_0.json new file mode 100644 index 0000000..1ae3717 --- /dev/null +++ b/tests/iteration/craftax/generate_data/records/episode_classic_0.json @@ -0,0 +1,24 @@ +{ + "Collect Wood": true, + "Place Table": false, + "Eat Cow": false, + "Collect Sapling": false, + "Collect Drink": false, + "Make Wood Pickaxe": false, + "Make Wood Sword": false, + "Place Plant": false, + "Defeat Zombie": false, + "Collect Stone": false, + "Place Stone": false, + "Eat Plant": false, + "Defeat Skeleton": false, + "Make Stone Pickaxe": false, + "Make Stone Sword": false, + "Wake Up": false, + "Place Furnace": false, + "Collect Coal": false, + "Collect Iron": false, + "Collect Diamond": false, + "Make Iron Pickaxe": false, + "Make Iron Sword": false +} \ No newline at end of file diff --git a/tests/iteration/craftax/generate_data/records/episode_classic_1.json b/tests/iteration/craftax/generate_data/records/episode_classic_1.json new file mode 100644 index 0000000..0b93190 --- /dev/null +++ b/tests/iteration/craftax/generate_data/records/episode_classic_1.json @@ -0,0 +1,24 @@ +{ + "Collect Wood": true, + "Place Table": false, + "Eat Cow": false, + "Collect Sapling": true, + "Collect Drink": false, + "Make Wood Pickaxe": false, + "Make Wood Sword": false, + "Place Plant": false, + "Defeat Zombie": false, + "Collect Stone": false, + "Place Stone": false, + "Eat Plant": false, + "Defeat Skeleton": false, + "Make Stone Pickaxe": false, + "Make Stone Sword": false, + "Wake Up": false, + "Place Furnace": false, + "Collect Coal": false, + "Collect Iron": false, + "Collect Diamond": false, + "Make Iron Pickaxe": false, + "Make Iron Sword": false +} \ No newline at end of file diff --git a/tests/iteration/craftax/generate_data/records/episode_classic_2.json b/tests/iteration/craftax/generate_data/records/episode_classic_2.json new file mode 100644 index 0000000..b0a8f50 --- /dev/null +++ b/tests/iteration/craftax/generate_data/records/episode_classic_2.json @@ -0,0 +1,24 @@ +{ + "Collect Wood": false, + "Place Table": false, + "Eat Cow": false, + "Collect Sapling": false, + "Collect Drink": false, + "Make Wood Pickaxe": false, + "Make Wood Sword": false, + "Place Plant": false, + "Defeat Zombie": false, + "Collect Stone": false, + "Place Stone": false, + "Eat Plant": false, + "Defeat Skeleton": false, + "Make Stone Pickaxe": false, + "Make Stone Sword": false, + "Wake Up": false, + "Place Furnace": false, + "Collect Coal": false, + "Collect Iron": false, + "Collect Diamond": false, + "Make Iron Pickaxe": false, + "Make Iron Sword": false +} \ No newline at end of file diff --git a/tutorials/AsyncAgentExample.py b/tutorials/AsyncAgentExample.py index 0b5e4ea..8c5cfb2 100644 --- a/tutorials/AsyncAgentExample.py +++ b/tutorials/AsyncAgentExample.py @@ -1,15 +1,17 @@ -from synth_sdk.tracing.decorators import trace_system, _local -from synth_sdk.tracing.trackers import SynthTracker -from synth_sdk.tracing.upload import upload -from synth_sdk.tracing.abstractions import TrainingQuestion, RewardSignal, Dataset -from synth_sdk.tracing.events.store import event_store import asyncio -import time import json import logging -from openai import AsyncOpenAI -from dotenv import load_dotenv import os +import time + +from dotenv import load_dotenv +from openai import AsyncOpenAI + +from synth_sdk.tracing.abstractions import Dataset, RewardSignal, TrainingQuestion +from synth_sdk.tracing.decorators import _local, trace_system +from synth_sdk.tracing.events.store import event_store +from synth_sdk.tracing.trackers import SynthTracker +from synth_sdk.tracing.upload import upload # Load environment variables load_dotenv() @@ -26,8 +28,11 @@ class TestAgent: def __init__(self): - self.system_id = "test_agent_async" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) + self.instance_system_id = "test_agent_async" + logger.debug( + "Initializing TestAgent with instance_system_id: %s", + self.instance_system_id, + ) self.client = AsyncOpenAI() logger.debug("OpenAI client initialized") @@ -40,19 +45,23 @@ def __init__(self): ) async def make_lm_call(self, user_message: str) -> str: # Only pass the user message, not self - SynthTracker.track_state(variable_name="user_message", variable_value=user_message, origin="agent") + SynthTracker.track_state( + variable_name="user_message", variable_value=user_message, origin="agent" + ) logger.debug("Starting LM call with message: %s", user_message) response = await self.client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": user_message} - ] + {"role": "user", "content": user_message}, + ], ) response_text = response.choices[0].message.content - SynthTracker.track_state(variable_name="response", variable_value=response_text, origin="agent") + SynthTracker.track_state( + variable_name="response", variable_value=response_text, origin="agent" + ) logger.debug("LM response received: %s", response_text) time.sleep(0.1) @@ -66,11 +75,15 @@ async def make_lm_call(self, user_message: str) -> str: ) async def process_environment(self, input_data: str) -> dict: # Only pass the input data, not self - SynthTracker.track_state(variable_name="input_data", variable_value=input_data, origin="environment") + SynthTracker.track_state( + variable_name="input_data", variable_value=input_data, origin="environment" + ) result = {"processed": input_data, "timestamp": time.time()} - SynthTracker.track_state(variable_name="result", variable_value=result, origin="environment") + SynthTracker.track_state( + variable_name="result", variable_value=result, origin="environment" + ) return result @@ -119,7 +132,7 @@ async def run_test(): reward_signals=[ RewardSignal( question_id=f"q{i}", - system_id=agent.system_id, + instance_system_id=agent.instance_system_id, reward=1.0, annotation="Test reward", ) @@ -135,7 +148,9 @@ async def run_test(): # Upload traces try: logger.info("Attempting to upload traces") - response, questions_json, reward_signals_json, traces_json = upload(dataset=dataset, verbose=True) + response, questions_json, reward_signals_json, traces_json = upload( + dataset=dataset, verbose=True + ) logger.info("Upload successful!") print("Upload successful!") @@ -171,9 +186,9 @@ async def run_test(): logger.debug("Cleaning up event: %s", event_type) if event.closed is None: event.closed = time.time() - if hasattr(_local, "system_id"): + if hasattr(_local, "instance_system_id"): try: - event_store.add_event(_local.system_id, event) + event_store.add_event(_local.instance_system_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) @@ -189,9 +204,9 @@ async def run_test(): ) logger.info("Cleanup completed") + # Run a sample agent using the async decorator and tracker if __name__ == "__main__": logger.info("Starting main execution") asyncio.run(run_test()) logger.info("Main execution completed") - diff --git a/tutorials/SimpleAgentExample.py b/tutorials/SimpleAgentExample.py index 379ba02..378d8af 100644 --- a/tutorials/SimpleAgentExample.py +++ b/tutorials/SimpleAgentExample.py @@ -1,14 +1,16 @@ -from synth_sdk.tracing.decorators import trace_system, trace_system_sync, _local -from synth_sdk.tracing.trackers import SynthTracker -from synth_sdk.tracing.upload import upload -from synth_sdk.tracing.abstractions import TrainingQuestion, RewardSignal, Dataset -from synth_sdk.tracing.events.store import event_store -import time import json import logging -from openai import OpenAI -from dotenv import load_dotenv import os +import time + +from dotenv import load_dotenv +from openai import OpenAI + +from synth_sdk.tracing.abstractions import Dataset, RewardSignal, TrainingQuestion +from synth_sdk.tracing.decorators import _local, trace_system +from synth_sdk.tracing.events.store import event_store +from synth_sdk.tracing.trackers import SynthTracker +from synth_sdk.tracing.upload import upload # Load environment variables load_dotenv() @@ -22,11 +24,15 @@ ) logger = logging.getLogger(__name__) + class TestAgent: def __init__(self): - self.system_id = "test_agent_sync" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) - + self.instance_system_id = "test_agent_sync" + logger.debug( + "Initializing TestAgent with instance_system_id: %s", + self.instance_system_id, + ) + # Initialize OpenAI client instead of LM self.client = OpenAI(api_key=openai_api_key) logger.debug("OpenAI client initialized") @@ -39,22 +45,26 @@ def __init__(self): verbose=True, ) def make_lm_call(self, user_message: str) -> str: - SynthTracker.track_state(variable_name="user_message", variable_value=user_message, origin="agent") + SynthTracker.track_state( + variable_name="user_message", variable_value=user_message, origin="agent" + ) logger.debug("Starting LM call with message: %s", user_message) response = self.client.chat.completions.create( model="gpt-4-turbo-preview", messages=[ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": user_message} + {"role": "user", "content": user_message}, ], - temperature=1 + temperature=1, ) # Extract the response content response_text = response.choices[0].message.content - SynthTracker.track_state(variable_name="response", variable_value=response_text, origin="agent") + SynthTracker.track_state( + variable_name="response", variable_value=response_text, origin="agent" + ) logger.debug("LM response received: %s", response_text) time.sleep(0.1) @@ -68,11 +78,15 @@ def make_lm_call(self, user_message: str) -> str: ) def process_environment(self, input_data: str) -> dict: # Only pass the input data, not self - SynthTracker.track_state(variable_name="input_data", variable_value=input_data, origin="environment") + SynthTracker.track_state( + variable_name="input_data", variable_value=input_data, origin="environment" + ) result = {"processed": input_data, "timestamp": time.time()} - SynthTracker.track_state(variable_name="result", variable_value=result, origin="environment") + SynthTracker.track_state( + variable_name="result", variable_value=result, origin="environment" + ) return result @@ -121,7 +135,7 @@ def run_test(): reward_signals=[ RewardSignal( question_id=f"q{i}", - system_id=agent.system_id, + instance_system_id=agent.instance_system_id, reward=1.0, annotation="Test reward", ) @@ -141,7 +155,9 @@ def run_test(): # Upload traces try: logger.info("Attempting to upload traces") - response, questions_json, reward_signals_json, traces_json = upload(dataset=dataset, verbose=True) + response, questions_json, reward_signals_json, traces_json = upload( + dataset=dataset, verbose=True + ) logger.info("Upload successful!") print("Upload successful!") @@ -178,9 +194,9 @@ def run_test(): logger.debug("Cleaning up event: %s", event_type) if event.closed is None: event.closed = time.time() - if hasattr(_local, "system_id"): + if hasattr(_local, "instance_system_id"): try: - event_store.add_event(_local.system_id, event) + event_store.add_event(_local.instance_system_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) @@ -196,9 +212,9 @@ def run_test(): ) logger.info("Cleanup completed") + # Run a sample agent using the sync decorator and tracker if __name__ == "__main__": logger.info("Starting main execution") run_test() logger.info("Main execution completed") - diff --git a/tutorials/SyncInAsyncExample.py b/tutorials/SyncInAsyncExample.py index 53639bb..2025db0 100644 --- a/tutorials/SyncInAsyncExample.py +++ b/tutorials/SyncInAsyncExample.py @@ -1,15 +1,17 @@ -from synth_sdk.tracing.decorators import trace_system, trace_system_sync, _local -from synth_sdk.tracing.trackers import SynthTrackerSync, SynthTrackerAsync, SynthTracker -from synth_sdk.tracing.upload import upload -from synth_sdk.tracing.abstractions import TrainingQuestion, RewardSignal, Dataset -from synth_sdk.tracing.events.store import event_store -import time +import asyncio import json import logging -from openai import OpenAI -from dotenv import load_dotenv import os -import asyncio +import time + +from dotenv import load_dotenv +from openai import OpenAI + +from synth_sdk.tracing.abstractions import Dataset, RewardSignal, TrainingQuestion +from synth_sdk.tracing.decorators import _local, trace_system, trace_system_sync +from synth_sdk.tracing.events.store import event_store +from synth_sdk.tracing.trackers import SynthTracker, SynthTrackerSync +from synth_sdk.tracing.upload import upload # Load environment variables load_dotenv() @@ -23,11 +25,15 @@ ) logger = logging.getLogger(__name__) + class TestAgent: def __init__(self): - self.system_id = "test_agent_sync" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) - + self.instance_system_id = "test_agent_sync" + logger.debug( + "Initializing TestAgent with instance_system_id: %s", + self.instance_system_id, + ) + # Initialize OpenAI client instead of LM self.client = OpenAI(api_key=openai_api_key) logger.debug("OpenAI client initialized") @@ -40,20 +46,24 @@ def __init__(self): verbose=True, ) def make_lm_call(self, user_message: str) -> str: - SynthTrackerSync.track_state(variable_name="user_message", variable_value=user_message, origin="agent") + SynthTrackerSync.track_state( + variable_name="user_message", variable_value=user_message, origin="agent" + ) logger.debug("Starting LM call with message: %s", user_message) response = self.client.chat.completions.create( model="gpt-4-turbo-preview", messages=[ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": user_message} + {"role": "user", "content": user_message}, ], - temperature=1 + temperature=1, ) # Extract the response content response_text = response.choices[0].message.content - SynthTrackerSync.track_state(variable_name="response", variable_value=response_text, origin="agent") + SynthTrackerSync.track_state( + variable_name="response", variable_value=response_text, origin="agent" + ) logger.debug("LM response received: %s", response_text) time.sleep(0.1) @@ -67,11 +77,15 @@ def make_lm_call(self, user_message: str) -> str: ) def process_environment(self, input_data: str) -> dict: # Only pass the input data, not self - SynthTracker.track_state(variable_name="input_data", variable_value=input_data, origin="environment") + SynthTracker.track_state( + variable_name="input_data", variable_value=input_data, origin="environment" + ) result = {"processed": input_data, "timestamp": time.time()} - SynthTracker.track_state(variable_name="result", variable_value=result, origin="environment") + SynthTracker.track_state( + variable_name="result", variable_value=result, origin="environment" + ) return result @@ -120,7 +134,7 @@ async def run_test(): reward_signals=[ RewardSignal( question_id=f"q{i}", - system_id=agent.system_id, + instance_system_id=agent.instance_system_id, reward=1.0, annotation="Test reward", ) @@ -140,7 +154,9 @@ async def run_test(): # Upload traces try: logger.info("Attempting to upload traces") - response, questions_json, reward_signals_json, traces_json = upload(dataset=dataset, verbose=True) + response, questions_json, reward_signals_json, traces_json = upload( + dataset=dataset, verbose=True + ) logger.info("Upload successful!") print("Upload successful!") @@ -177,9 +193,9 @@ async def run_test(): logger.debug("Cleaning up event: %s", event_type) if event.closed is None: event.closed = time.time() - if hasattr(_local, "system_id"): + if hasattr(_local, "instance_system_id"): try: - event_store.add_event(_local.system_id, event) + event_store.add_event(_local.instance_system_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) @@ -195,9 +211,9 @@ async def run_test(): ) logger.info("Cleanup completed") + # Run a sample agent using the sync decorator and tracker if __name__ == "__main__": logger.info("Starting main execution") asyncio.run(run_test()) logger.info("Main execution completed") - diff --git a/tutorials/reward_signals.json b/tutorials/reward_signals.json index 5087bc5..32f8fe3 100644 --- a/tutorials/reward_signals.json +++ b/tutorials/reward_signals.json @@ -1 +1,20 @@ -[{"system_id": "test_agent_sync", "reward": 1.0, "question_id": "q0", "annotation": "Test reward"}, {"system_id": "test_agent_sync", "reward": 1.0, "question_id": "q1", "annotation": "Test reward"}, {"system_id": "test_agent_sync", "reward": 1.0, "question_id": "q2", "annotation": "Test reward"}] \ No newline at end of file +[ + { + "instance_system_id": "test_agent_sync", + "reward": 1.0, + "question_id": "q0", + "annotation": "Test reward" + }, + { + "instance_system_id": "test_agent_sync", + "reward": 1.0, + "question_id": "q1", + "annotation": "Test reward" + }, + { + "instance_system_id": "test_agent_sync", + "reward": 1.0, + "question_id": "q2", + "annotation": "Test reward" + } +] \ No newline at end of file diff --git a/tutorials/reward_signals_async.json b/tutorials/reward_signals_async.json index 5f5c0ba..ba17f5a 100644 --- a/tutorials/reward_signals_async.json +++ b/tutorials/reward_signals_async.json @@ -1 +1,20 @@ -[{"system_id": "test_agent_async", "reward": 1.0, "question_id": "q0", "annotation": "Test reward"}, {"system_id": "test_agent_async", "reward": 1.0, "question_id": "q1", "annotation": "Test reward"}, {"system_id": "test_agent_async", "reward": 1.0, "question_id": "q2", "annotation": "Test reward"}] \ No newline at end of file +[ + { + "instance_system_id": "test_agent_async", + "reward": 1.0, + "question_id": "q0", + "annotation": "Test reward" + }, + { + "instance_system_id": "test_agent_async", + "reward": 1.0, + "question_id": "q1", + "annotation": "Test reward" + }, + { + "instance_system_id": "test_agent_async", + "reward": 1.0, + "question_id": "q2", + "annotation": "Test reward" + } +] \ No newline at end of file diff --git a/tutorials/traces.json b/tutorials/traces.json index 2cd2954..8f548ee 100644 --- a/tutorials/traces.json +++ b/tutorials/traces.json @@ -1 +1,240 @@ -[{"system_id": "test_agent_sync", "partition": [{"partition_index": 0, "events": [{"event_type": "environment_processing", "opened": 1732238453.367889, "closed": 1732238454.127567, "partition_index": 0, "agent_compute_steps": [], "environment_compute_steps": [{"event_order": 1, "compute_ended": 1732238453.367924, "compute_began": 1732238453.367889, "compute_input": [], "compute_output": [{"outputs": {"input_data": "What's the capital of France?"}}, {"outputs": {"input_data": "What's the capital of France?"}}, {"outputs": {"result": {"processed": "What's the capital of France?", "timestamp": 1732238453.367918}}}, {"outputs": {"result": {"processed": "What's the capital of France?", "timestamp": 1732238453.367918}}}]}]}, {"event_type": "environment_processing", "opened": 1732238454.127528, "closed": 1732238454.836017, "partition_index": 0, "agent_compute_steps": [], "environment_compute_steps": [{"event_order": 1, "compute_ended": 1732238454.127908, "compute_began": 1732238454.127528, "compute_input": [], "compute_output": [{"outputs": {"input_data": "What's 2+2?"}}, {"outputs": {"input_data": "What's 2+2?"}}, {"outputs": {"result": {"processed": "What's 2+2?", "timestamp": 1732238454.127887}}}, {"outputs": {"result": {"processed": "What's 2+2?", "timestamp": 1732238454.127887}}}]}]}]}, {"partition_index": 1, "events": [{"event_type": "lm_call", "opened": 1732238453.367946, "closed": 1732238454.128041, "partition_index": 1, "agent_compute_steps": [{"event_order": 1, "compute_ended": 1732238454.1273339, "compute_began": 1732238453.367946, "compute_input": [{"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What's the capital of France?"}, {"role": "assistant", "content": "The capital of France is Paris."}]}, {"inputs": {"model_name": "gpt-4-0125-preview"}}, {"inputs": {"finetune": true}}], "compute_output": [{"outputs": {"user_message": "What's the capital of France?"}}, {"outputs": {"user_message": "What's the capital of France?"}}, {"outputs": {"response": "The capital of France is Paris."}}, {"outputs": {"result": "The capital of France is Paris."}}]}], "environment_compute_steps": []}]}, {"partition_index": 2, "events": [{"event_type": "lm_call", "opened": 1732238454.127991, "closed": 1732238454.836467, "partition_index": 2, "agent_compute_steps": [{"event_order": 1, "compute_ended": 1732238454.835826, "compute_began": 1732238454.127991, "compute_input": [{"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What's 2+2?"}, {"role": "assistant", "content": "2+2 equals 4."}]}, {"inputs": {"model_name": "gpt-4-0125-preview"}}, {"inputs": {"finetune": true}}], "compute_output": [{"outputs": {"user_message": "What's 2+2?"}}, {"outputs": {"user_message": "What's 2+2?"}}, {"outputs": {"response": "2+2 equals 4."}}, {"outputs": {"result": "2+2 equals 4."}}]}], "environment_compute_steps": []}]}, {"partition_index": 3, "events": []}]}] \ No newline at end of file +[ + { + "instance_system_id": "test_agent_sync", + "partition": [ + { + "partition_index": 0, + "events": [ + { + "event_type": "environment_processing", + "opened": 1732238453.367889, + "closed": 1732238454.127567, + "partition_index": 0, + "agent_compute_steps": [], + "environment_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732238453.367924, + "compute_began": 1732238453.367889, + "compute_input": [], + "compute_output": [ + { + "outputs": { + "input_data": "What's the capital of France?" + } + }, + { + "outputs": { + "input_data": "What's the capital of France?" + } + }, + { + "outputs": { + "result": { + "processed": "What's the capital of France?", + "timestamp": 1732238453.367918 + } + } + }, + { + "outputs": { + "result": { + "processed": "What's the capital of France?", + "timestamp": 1732238453.367918 + } + } + } + ] + } + ] + }, + { + "event_type": "environment_processing", + "opened": 1732238454.127528, + "closed": 1732238454.836017, + "partition_index": 0, + "agent_compute_steps": [], + "environment_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732238454.127908, + "compute_began": 1732238454.127528, + "compute_input": [], + "compute_output": [ + { + "outputs": { + "input_data": "What's 2+2?" + } + }, + { + "outputs": { + "input_data": "What's 2+2?" + } + }, + { + "outputs": { + "result": { + "processed": "What's 2+2?", + "timestamp": 1732238454.127887 + } + } + }, + { + "outputs": { + "result": { + "processed": "What's 2+2?", + "timestamp": 1732238454.127887 + } + } + } + ] + } + ] + } + ] + }, + { + "partition_index": 1, + "events": [ + { + "event_type": "lm_call", + "opened": 1732238453.367946, + "closed": 1732238454.128041, + "partition_index": 1, + "agent_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732238454.1273339, + "compute_began": 1732238453.367946, + "compute_input": [ + { + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "What's the capital of France?" + }, + { + "role": "assistant", + "content": "The capital of France is Paris." + } + ] + }, + { + "inputs": { + "model_name": "gpt-4-0125-preview" + } + }, + { + "inputs": { + "finetune": true + } + } + ], + "compute_output": [ + { + "outputs": { + "user_message": "What's the capital of France?" + } + }, + { + "outputs": { + "user_message": "What's the capital of France?" + } + }, + { + "outputs": { + "response": "The capital of France is Paris." + } + }, + { + "outputs": { + "result": "The capital of France is Paris." + } + } + ] + } + ], + "environment_compute_steps": [] + } + ] + }, + { + "partition_index": 2, + "events": [ + { + "event_type": "lm_call", + "opened": 1732238454.127991, + "closed": 1732238454.836467, + "partition_index": 2, + "agent_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732238454.835826, + "compute_began": 1732238454.127991, + "compute_input": [ + { + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "What's 2+2?" + }, + { + "role": "assistant", + "content": "2+2 equals 4." + } + ] + }, + { + "inputs": { + "model_name": "gpt-4-0125-preview" + } + }, + { + "inputs": { + "finetune": true + } + } + ], + "compute_output": [ + { + "outputs": { + "user_message": "What's 2+2?" + } + }, + { + "outputs": { + "user_message": "What's 2+2?" + } + }, + { + "outputs": { + "response": "2+2 equals 4." + } + }, + { + "outputs": { + "result": "2+2 equals 4." + } + } + ] + } + ], + "environment_compute_steps": [] + } + ] + }, + { + "partition_index": 3, + "events": [] + } + ] + } +] \ No newline at end of file diff --git a/tutorials/traces_async.json b/tutorials/traces_async.json index ebe9223..6b8c52a 100644 --- a/tutorials/traces_async.json +++ b/tutorials/traces_async.json @@ -1 +1,351 @@ -[{"system_id": "test_agent_async", "partition": [{"partition_index": 0, "events": [{"event_type": "environment_processing", "opened": 1732235276.8035352, "closed": 1732235277.7255611, "partition_index": 0, "agent_compute_steps": [], "environment_compute_steps": [{"event_order": 1, "compute_ended": 1732235276.803606, "compute_began": 1732235276.8035352, "compute_input": [{"inputs": {"input_data": "What's the capital of France?"}}], "compute_output": [{"outputs": {"input_data": "What's the capital of France?"}}, {"outputs": {"result": {"processed": "What's the capital of France?", "timestamp": 1732235276.803597}}}, {"outputs": {"result": {"processed": "What's the capital of France?", "timestamp": 1732235276.803597}}}]}]}, {"event_type": "environment_processing", "opened": 1732235277.725513, "closed": 1732235278.565709, "partition_index": 0, "agent_compute_steps": [], "environment_compute_steps": [{"event_order": 1, "compute_ended": 1732235277.72592, "compute_began": 1732235277.725513, "compute_input": [{"inputs": {"input_data": "What's 2+2?"}}], "compute_output": [{"outputs": {"input_data": "What's 2+2?"}}, {"outputs": {"result": {"processed": "What's 2+2?", "timestamp": 1732235277.725895}}}, {"outputs": {"result": {"processed": "What's 2+2?", "timestamp": 1732235277.725895}}}]}]}, {"event_type": "environment_processing", "opened": 1732235278.5656729, "closed": 1732235279.19231, "partition_index": 0, "agent_compute_steps": [], "environment_compute_steps": [{"event_order": 1, "compute_ended": 1732235278.566205, "compute_began": 1732235278.5656729, "compute_input": [{"inputs": {"input_data": "Who wrote Romeo and Juliet?"}}], "compute_output": [{"outputs": {"input_data": "Who wrote Romeo and Juliet?"}}, {"outputs": {"result": {"processed": "Who wrote Romeo and Juliet?", "timestamp": 1732235278.566155}}}, {"outputs": {"result": {"processed": "Who wrote Romeo and Juliet?", "timestamp": 1732235278.566155}}}]}]}]}, {"partition_index": 1, "events": [{"event_type": "lm_call", "opened": 1732235276.803635, "closed": 1732235277.726039, "partition_index": 1, "agent_compute_steps": [{"event_order": 1, "compute_ended": 1732235277.725227, "compute_began": 1732235276.803635, "compute_input": [{"inputs": {"user_message": "What's the capital of France?"}}, {"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What's the capital of France?"}, {"role": "assistant", "content": "The capital of France is Paris."}]}, {"inputs": {"model_name": "gpt-4-0613"}}, {"inputs": {"finetune": true}}], "compute_output": [{"outputs": {"user_message": "What's the capital of France?"}}, {"outputs": {"response": "The capital of France is Paris."}}, {"outputs": {"result": "The capital of France is Paris."}}]}], "environment_compute_steps": []}]}, {"partition_index": 2, "events": [{"event_type": "lm_call", "opened": 1732235277.725993, "closed": 1732235278.566485, "partition_index": 2, "agent_compute_steps": [{"event_order": 1, "compute_ended": 1732235278.565533, "compute_began": 1732235277.725993, "compute_input": [{"inputs": {"user_message": "What's 2+2?"}}, {"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What's 2+2?"}, {"role": "assistant", "content": "2+2 equals 4."}]}, {"inputs": {"model_name": "gpt-4-0613"}}, {"inputs": {"finetune": true}}], "compute_output": [{"outputs": {"user_message": "What's 2+2?"}}, {"outputs": {"response": "2+2 equals 4."}}, {"outputs": {"result": "2+2 equals 4."}}]}], "environment_compute_steps": []}]}, {"partition_index": 3, "events": [{"event_type": "lm_call", "opened": 1732235278.5664, "closed": 1732235279.192375, "partition_index": 3, "agent_compute_steps": [{"event_order": 1, "compute_ended": 1732235279.1919549, "compute_began": 1732235278.5664, "compute_input": [{"inputs": {"user_message": "Who wrote Romeo and Juliet?"}}, {"messages": [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Who wrote Romeo and Juliet?"}, {"role": "assistant", "content": "William Shakespeare wrote Romeo and Juliet."}]}, {"inputs": {"model_name": "gpt-4-0613"}}, {"inputs": {"finetune": true}}], "compute_output": [{"outputs": {"user_message": "Who wrote Romeo and Juliet?"}}, {"outputs": {"response": "William Shakespeare wrote Romeo and Juliet."}}, {"outputs": {"result": "William Shakespeare wrote Romeo and Juliet."}}]}], "environment_compute_steps": []}]}]}] \ No newline at end of file +[ + { + "instance_system_id": "test_agent_async", + "partition": [ + { + "partition_index": 0, + "events": [ + { + "event_type": "environment_processing", + "opened": 1732235276.8035352, + "closed": 1732235277.7255611, + "partition_index": 0, + "agent_compute_steps": [], + "environment_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732235276.803606, + "compute_began": 1732235276.8035352, + "compute_input": [ + { + "inputs": { + "input_data": "What's the capital of France?" + } + } + ], + "compute_output": [ + { + "outputs": { + "input_data": "What's the capital of France?" + } + }, + { + "outputs": { + "result": { + "processed": "What's the capital of France?", + "timestamp": 1732235276.803597 + } + } + }, + { + "outputs": { + "result": { + "processed": "What's the capital of France?", + "timestamp": 1732235276.803597 + } + } + } + ] + } + ] + }, + { + "event_type": "environment_processing", + "opened": 1732235277.725513, + "closed": 1732235278.565709, + "partition_index": 0, + "agent_compute_steps": [], + "environment_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732235277.72592, + "compute_began": 1732235277.725513, + "compute_input": [ + { + "inputs": { + "input_data": "What's 2+2?" + } + } + ], + "compute_output": [ + { + "outputs": { + "input_data": "What's 2+2?" + } + }, + { + "outputs": { + "result": { + "processed": "What's 2+2?", + "timestamp": 1732235277.725895 + } + } + }, + { + "outputs": { + "result": { + "processed": "What's 2+2?", + "timestamp": 1732235277.725895 + } + } + } + ] + } + ] + }, + { + "event_type": "environment_processing", + "opened": 1732235278.5656729, + "closed": 1732235279.19231, + "partition_index": 0, + "agent_compute_steps": [], + "environment_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732235278.566205, + "compute_began": 1732235278.5656729, + "compute_input": [ + { + "inputs": { + "input_data": "Who wrote Romeo and Juliet?" + } + } + ], + "compute_output": [ + { + "outputs": { + "input_data": "Who wrote Romeo and Juliet?" + } + }, + { + "outputs": { + "result": { + "processed": "Who wrote Romeo and Juliet?", + "timestamp": 1732235278.566155 + } + } + }, + { + "outputs": { + "result": { + "processed": "Who wrote Romeo and Juliet?", + "timestamp": 1732235278.566155 + } + } + } + ] + } + ] + } + ] + }, + { + "partition_index": 1, + "events": [ + { + "event_type": "lm_call", + "opened": 1732235276.803635, + "closed": 1732235277.726039, + "partition_index": 1, + "agent_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732235277.725227, + "compute_began": 1732235276.803635, + "compute_input": [ + { + "inputs": { + "user_message": "What's the capital of France?" + } + }, + { + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "What's the capital of France?" + }, + { + "role": "assistant", + "content": "The capital of France is Paris." + } + ] + }, + { + "inputs": { + "model_name": "gpt-4-0613" + } + }, + { + "inputs": { + "finetune": true + } + } + ], + "compute_output": [ + { + "outputs": { + "user_message": "What's the capital of France?" + } + }, + { + "outputs": { + "response": "The capital of France is Paris." + } + }, + { + "outputs": { + "result": "The capital of France is Paris." + } + } + ] + } + ], + "environment_compute_steps": [] + } + ] + }, + { + "partition_index": 2, + "events": [ + { + "event_type": "lm_call", + "opened": 1732235277.725993, + "closed": 1732235278.566485, + "partition_index": 2, + "agent_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732235278.565533, + "compute_began": 1732235277.725993, + "compute_input": [ + { + "inputs": { + "user_message": "What's 2+2?" + } + }, + { + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "What's 2+2?" + }, + { + "role": "assistant", + "content": "2+2 equals 4." + } + ] + }, + { + "inputs": { + "model_name": "gpt-4-0613" + } + }, + { + "inputs": { + "finetune": true + } + } + ], + "compute_output": [ + { + "outputs": { + "user_message": "What's 2+2?" + } + }, + { + "outputs": { + "response": "2+2 equals 4." + } + }, + { + "outputs": { + "result": "2+2 equals 4." + } + } + ] + } + ], + "environment_compute_steps": [] + } + ] + }, + { + "partition_index": 3, + "events": [ + { + "event_type": "lm_call", + "opened": 1732235278.5664, + "closed": 1732235279.192375, + "partition_index": 3, + "agent_compute_steps": [ + { + "event_order": 1, + "compute_ended": 1732235279.1919549, + "compute_began": 1732235278.5664, + "compute_input": [ + { + "inputs": { + "user_message": "Who wrote Romeo and Juliet?" + } + }, + { + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant." + }, + { + "role": "user", + "content": "Who wrote Romeo and Juliet?" + }, + { + "role": "assistant", + "content": "William Shakespeare wrote Romeo and Juliet." + } + ] + }, + { + "inputs": { + "model_name": "gpt-4-0613" + } + }, + { + "inputs": { + "finetune": true + } + } + ], + "compute_output": [ + { + "outputs": { + "user_message": "Who wrote Romeo and Juliet?" + } + }, + { + "outputs": { + "response": "William Shakespeare wrote Romeo and Juliet." + } + }, + { + "outputs": { + "result": "William Shakespeare wrote Romeo and Juliet." + } + } + ] + } + ], + "environment_compute_steps": [] + } + ] + } + ] + } +] \ No newline at end of file From 2eebf7ee04e5b01e12239da3bf0d48334c938248 Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Tue, 17 Dec 2024 11:43:11 -0800 Subject: [PATCH 3/7] rename i_s_id to s_i_id --- openapi.json | 4 +-- pyproject.toml | 2 +- setup.py | 2 +- synth_sdk/tracing/abstractions.py | 15 ++++---- synth_sdk/tracing/decorators.py | 56 +++++++++++++++-------------- synth_sdk/tracing/events/manage.py | 43 ++++++++++++---------- synth_sdk/tracing/events/scope.py | 24 ++++++------- synth_sdk/tracing/events/store.py | 44 +++++++++++------------ synth_sdk/tracing/local.py | 5 ++- synth_sdk/tracing/upload.py | 27 +++++++------- synth_sdk/tracing/utils.py | 2 +- tutorials/AsyncAgentExample.py | 12 +++---- tutorials/SimpleAgentExample.py | 12 +++---- tutorials/SyncInAsyncExample.py | 12 +++---- tutorials/reward_signals.json | 6 ++-- tutorials/reward_signals_async.json | 6 ++-- tutorials/traces.json | 2 +- tutorials/traces_async.json | 2 +- 18 files changed, 136 insertions(+), 140 deletions(-) diff --git a/openapi.json b/openapi.json index 8a47144..043213a 100644 --- a/openapi.json +++ b/openapi.json @@ -204,7 +204,7 @@ "Classes" ], "summary": "abstractions.TrainingQuestion", - "description": "A training question is a question that an agent (instance_system_id) is trying to answer.\nIt contains an intent and criteria that the agent is trying to meet.\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", + "description": "A training question is a question that an agent (system_instance_id) is trying to answer.\nIt contains an intent and criteria that the agent is trying to meet.\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", "externalDocs": { "description": "View source on GitHub", "url": "https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py" @@ -235,7 +235,7 @@ "Classes" ], "summary": "abstractions.RewardSignal", - "description": "A reward signal tells us how well an agent (instance_system_id) is doing on a particular question (question_id).\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", + "description": "A reward signal tells us how well an agent (system_instance_id) is doing on a particular question (question_id).\n\n[View source on GitHub](https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py)", "externalDocs": { "description": "View source on GitHub", "url": "https://github.com/synth-laboratories/synth-sdk/blob/main/synth_sdk/tracing/abstractions.py" diff --git a/pyproject.toml b/pyproject.toml index 83e2041..68f813e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "synth-sdk" -version = "0.2.97" +version = "0.2.99" description = "" authors = [{name = "Synth AI", email = "josh@usesynth.ai"}] license = {text = "MIT"} diff --git a/setup.py b/setup.py index 9de5cd3..88af083 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="synth-sdk", - version="0.2.97", + version="0.2.99", packages=find_packages(), install_requires=[ "opentelemetry-api", diff --git a/synth_sdk/tracing/abstractions.py b/synth_sdk/tracing/abstractions.py index 9918264..146dfbf 100644 --- a/synth_sdk/tracing/abstractions.py +++ b/synth_sdk/tracing/abstractions.py @@ -5,7 +5,6 @@ from pydantic import BaseModel - logger = logging.getLogger(__name__) @@ -93,7 +92,7 @@ class EnvironmentComputeStep(ComputeStep): @dataclass class Event: - instance_system_id: str + system_instance_id: str event_type: str opened: Any # timestamp closed: Any # timestamp @@ -135,7 +134,7 @@ def to_dict(self): @dataclass class SystemTrace: system_id: str - instance_system_id: str + system_instance_id: str metadata: Optional[Dict[str, Any]] partition: List[EventPartitionElement] current_partition_index: int = 0 # Track current partition @@ -143,7 +142,7 @@ class SystemTrace: def to_dict(self): return { "system_id": self.system_id, - "instance_system_id": self.instance_system_id, + "system_instance_id": self.system_instance_id, "partition": [element.to_dict() for element in self.partition], "current_partition_index": self.current_partition_index, "metadata": self.metadata if self.metadata else None, @@ -152,7 +151,7 @@ def to_dict(self): class TrainingQuestion(BaseModel): """ - A training question is a question that an agent (instance_system_id) is trying to answer. + A training question is a question that an agent (system_instance_id) is trying to answer. It contains an intent and criteria that the agent is trying to meet. """ @@ -170,18 +169,18 @@ def to_dict(self): class RewardSignal(BaseModel): """ - A reward signal tells us how well an agent (instance_system_id) is doing on a particular question (question_id). + A reward signal tells us how well an agent (system_instance_id) is doing on a particular question (question_id). """ question_id: str - instance_system_id: str + system_instance_id: str reward: Union[float, int, bool] annotation: Optional[str] = None def to_dict(self): return { "question_id": self.question_id, - "instance_system_id": self.instance_system_id, + "system_instance_id": self.system_instance_id, "reward": self.reward, "annotation": self.annotation, } diff --git a/synth_sdk/tracing/decorators.py b/synth_sdk/tracing/decorators.py index 309a0c7..ccdb102 100644 --- a/synth_sdk/tracing/decorators.py +++ b/synth_sdk/tracing/decorators.py @@ -5,7 +5,6 @@ from functools import wraps from typing import Any, Callable, Dict, List, Literal - from synth_sdk.tracing.abstractions import ( AgentComputeStep, ArbitraryInputs, @@ -20,16 +19,16 @@ from synth_sdk.tracing.local import ( _local, active_events_var, - instance_system_id_var, logger, - system_name_var, system_id_var, + system_instance_id_var, + system_name_var, ) -from synth_sdk.tracing.utils import get_system_id from synth_sdk.tracing.trackers import ( synth_tracker_async, synth_tracker_sync, ) +from synth_sdk.tracing.utils import get_system_id logger = logging.getLogger(__name__) @@ -63,17 +62,17 @@ def wrapper(*args, **kwargs): self_instance = func.__self__ # Ensure required attributes are present - required_attrs = ["instance_system_id", "system_name"] + required_attrs = ["system_instance_id", "system_name"] for attr in required_attrs: if not hasattr(self_instance, attr): raise ValueError(f"Instance missing required attribute '{attr}'") # Set thread-local variables - _local.instance_system_id = self_instance.instance_system_id + _local.system_instance_id = self_instance.system_instance_id _local.system_name = self_instance.system_name _local.system_id = get_system_id( self_instance.system_name - )#self_instance.system_id + ) # self_instance.system_id # Initialize Trace synth_tracker_sync.initialize() @@ -89,7 +88,7 @@ def wrapper(*args, **kwargs): if manage_event == "create": # logger.debug("Creating new event") event = Event( - instance_system_id=_local.instance_system_id, + system_instance_id=_local.system_instance_id, event_type=event_type, opened=compute_began, closed=None, @@ -99,7 +98,7 @@ def wrapper(*args, **kwargs): ) if increment_partition: event.partition_index = event_store.increment_partition( - _local.instance_system_id, + _local.system_instance_id, _local.system_id, ) logger.debug( @@ -226,10 +225,12 @@ def wrapper(*args, **kwargs): current_event = _local.active_events[event_type] current_event.closed = compute_ended # Store the event - if hasattr(_local, "instance_system_id"): - event_store.add_event(_local.instance_system_id, _local.system_id, current_event) + if hasattr(_local, "system_instance_id"): + event_store.add_event( + _local.system_instance_id, _local.system_id, current_event + ) # logger.debug( - # f"Stored and closed event {event_type} for system {_local.instance_system_id}" + # f"Stored and closed event {event_type} for system {_local.system_instance_id}" # ) del _local.active_events[event_type] @@ -239,9 +240,9 @@ def wrapper(*args, **kwargs): raise finally: # synth_tracker_sync.finalize() - if hasattr(_local, "instance_system_id"): - # logger.debug(f"Cleaning up instance_system_id: {_local.instance_system_id}") - delattr(_local, "instance_system_id") + if hasattr(_local, "system_instance_id"): + # logger.debug(f"Cleaning up system_instance_id: {_local.system_instance_id}") + delattr(_local, "system_instance_id") return wrapper @@ -276,17 +277,19 @@ async def async_wrapper(*args, **kwargs): self_instance = func.__self__ # Ensure required attributes are present - required_attrs = ["instance_system_id", "system_name"] + required_attrs = ["system_instance_id", "system_name"] for attr in required_attrs: if not hasattr(self_instance, attr): raise ValueError(f"Instance missing required attribute '{attr}'") # Set context variables - instance_system_id_token = instance_system_id_var.set( - self_instance.instance_system_id + system_instance_id_token = system_instance_id_var.set( + self_instance.system_instance_id ) system_name_token = system_name_var.set(self_instance.system_name) - system_id_token = system_id_var.set(get_system_id(self_instance.system_name)) + system_id_token = system_id_var.set( + get_system_id(self_instance.system_name) + ) # Initialize AsyncTrace synth_tracker_async.initialize() @@ -303,7 +306,7 @@ async def async_wrapper(*args, **kwargs): if manage_event == "create": # logger.debug("Creating new event") event = Event( - instance_system_id=self_instance.instance_system_id, + system_instance_id=self_instance.system_instance_id, event_type=event_type, opened=compute_began, closed=None, @@ -313,8 +316,7 @@ async def async_wrapper(*args, **kwargs): ) if increment_partition: event.partition_index = event_store.increment_partition( - instance_system_id_var.get(), - system_id_var.get() + system_instance_id_var.get(), system_id_var.get() ) logger.debug( f"Incremented partition to: {event.partition_index}" @@ -442,11 +444,11 @@ async def async_wrapper(*args, **kwargs): current_event = active_events_var.get()[event_type] current_event.closed = compute_ended # Store the event - if instance_system_id_var.get(): + if system_instance_id_var.get(): event_store.add_event( - instance_system_id_var.get(), + system_instance_id_var.get(), system_id_var.get(), - current_event + current_event, ) active_events = active_events_var.get() del active_events[event_type] @@ -459,10 +461,10 @@ async def async_wrapper(*args, **kwargs): finally: # synth_tracker_async.finalize() # Reset context variables - instance_system_id_var.reset(instance_system_id_token) + system_instance_id_var.reset(system_instance_id_token) system_name_var.reset(system_name_token) system_id_var.reset(system_id_token) - # logger.debug("Cleaning up instance_system_id from context vars") + # logger.debug("Cleaning up system_instance_id from context vars") return async_wrapper diff --git a/synth_sdk/tracing/events/manage.py b/synth_sdk/tracing/events/manage.py index e5026c0..4b183eb 100644 --- a/synth_sdk/tracing/events/manage.py +++ b/synth_sdk/tracing/events/manage.py @@ -1,7 +1,6 @@ import time from typing import Literal, Optional - from synth_sdk.tracing.abstractions import Event from synth_sdk.tracing.events.store import event_store from synth_sdk.tracing.local import _local, logger @@ -48,8 +47,8 @@ def set_current_event( # If there's an existing event of the same type, end it if event.event_type in _local.active_events: if ( - _local.active_events[event.event_type].instance_system_id - == event.instance_system_id + _local.active_events[event.event_type].system_instance_id + == event.system_instance_id ): logger.debug(f"Found existing event of type {event.event_type}") existing_event = _local.active_events[event.event_type] @@ -58,14 +57,14 @@ def set_current_event( f"Closed existing event of type {event.event_type} at {existing_event.closed}" ) - # Store the closed event if instance_system_id is present - if hasattr(_local, "instance_system_id"): + # Store the closed event if system_instance_id is present + if hasattr(_local, "system_instance_id"): logger.debug( - f"Storing closed event for system {_local.instance_system_id}" + f"Storing closed event for system {_local.system_instance_id}" ) try: event_store.add_event( - _local.instance_system_id, _local.system_id, existing_event + _local.system_instance_id, _local.system_id, existing_event ) logger.debug("Successfully stored closed event") except Exception as e: @@ -77,7 +76,11 @@ def set_current_event( # logger.debug("New event set as current in thread local") else: - from synth_sdk.tracing.local import active_events_var, instance_system_id_var, system_id_var + from synth_sdk.tracing.local import ( + active_events_var, + system_id_var, + system_instance_id_var, + ) # Get current active events from context var active_events = active_events_var.get() @@ -86,24 +89,24 @@ def set_current_event( if event.event_type in active_events: existing_event = active_events[event.event_type] - # Check that the active event has the same instance_system_id as the one we're settting - if existing_event.instance_system_id == event.instance_system_id: + # Check that the active event has the same system_instance_id as the one we're settting + if existing_event.system_instance_id == event.system_instance_id: logger.debug(f"Found existing event of type {event.event_type}") existing_event.closed = time.time() logger.debug( f"Closed existing event of type {event.event_type} at {existing_event.closed}" ) - # Store the closed event if instance_system_id is present - instance_system_id = instance_system_id_var.get() - if instance_system_id: + # Store the closed event if system_instance_id is present + system_instance_id = system_instance_id_var.get() + if system_instance_id: logger.debug( - f"Storing closed event for system {instance_system_id}" + f"Storing closed event for system {system_instance_id}" ) try: - event_store.add_event(instance_system_id, - system_id_var.get(), - existing_event) + event_store.add_event( + system_instance_id, system_id_var.get(), existing_event + ) logger.debug("Successfully stored closed event") except Exception as e: logger.error(f"Failed to store closed event: {str(e)}") @@ -127,7 +130,9 @@ def end_event(event_type: str) -> Optional[Event]: if current_event: current_event.closed = time.time() # Store the event - if hasattr(_local, "instance_system_id"): - event_store.add_event(_local.instance_system_id, _local.system_id, current_event) + if hasattr(_local, "system_instance_id"): + event_store.add_event( + _local.system_instance_id, _local.system_id, current_event + ) clear_current_event(event_type) return current_event diff --git a/synth_sdk/tracing/events/scope.py b/synth_sdk/tracing/events/scope.py index ba0a3c8..6715327 100644 --- a/synth_sdk/tracing/events/scope.py +++ b/synth_sdk/tracing/events/scope.py @@ -4,7 +4,7 @@ from synth_sdk.tracing.abstractions import Event from synth_sdk.tracing.decorators import _local, clear_current_event, set_current_event from synth_sdk.tracing.events.store import event_store -from synth_sdk.tracing.local import instance_system_id_var, system_id_var +from synth_sdk.tracing.local import system_id_var, system_instance_id_var @contextmanager @@ -25,20 +25,16 @@ def event_scope(event_type: str): except RuntimeError: is_async = False - # Get instance_system_id from appropriate source - instance_system_id = ( - instance_system_id_var.get() + # Get system_instance_id from appropriate source + system_instance_id = ( + system_instance_id_var.get() if is_async - else getattr(_local, "instance_system_id", None) - ) - system_id = ( - system_id_var.get() - if is_async - else getattr(_local, "system_id", None) + else getattr(_local, "system_instance_id", None) ) + system_id = system_id_var.get() if is_async else getattr(_local, "system_id", None) event = Event( - instance_system_id=instance_system_id, + system_instance_id=system_instance_id, event_type=event_type, opened=time.time(), closed=None, @@ -53,6 +49,6 @@ def event_scope(event_type: str): finally: event.closed = time.time() clear_current_event(event_type) - # Store the event if instance_system_id is available - if instance_system_id: - event_store.add_event(instance_system_id, system_id, event) + # Store the event if system_instance_id is available + if system_instance_id: + event_store.add_event(system_instance_id, system_id, event) diff --git a/synth_sdk/tracing/events/store.py b/synth_sdk/tracing/events/store.py index 7371b50..12a2c12 100644 --- a/synth_sdk/tracing/events/store.py +++ b/synth_sdk/tracing/events/store.py @@ -8,9 +8,7 @@ from synth_sdk.tracing.local import ( # Import context variables _local, active_events_var, - instance_system_id_var, system_id_var, - system_name_var, ) logger = logging.getLogger(__name__) @@ -23,25 +21,25 @@ def __init__(self): self.logger = logging.getLogger(__name__) def get_or_create_system_trace( - self, instance_system_id: str, system_id: str, _already_locked: bool = False + self, system_instance_id: str, system_id: str, _already_locked: bool = False ) -> SystemTrace: - """Get or create a SystemTrace for the given instance_system_id.""" + """Get or create a SystemTrace for the given system_instance_id.""" logger = logging.getLogger(__name__) - # logger.debug(f"Starting get_or_create_system_trace for {instance_system_id}") + # logger.debug(f"Starting get_or_create_system_trace for {system_instance_id}") def _get_or_create(): # logger.debug("Inside _get_or_create") - if instance_system_id not in self._traces: - # logger.debug(f"Creating new system trace for {instance_system_id}") - self._traces[instance_system_id] = SystemTrace( + if system_instance_id not in self._traces: + # logger.debug(f"Creating new system trace for {system_instance_id}") + self._traces[system_instance_id] = SystemTrace( system_id=system_id, - instance_system_id=instance_system_id, + system_instance_id=system_instance_id, metadata={}, partition=[EventPartitionElement(partition_index=0, events=[])], current_partition_index=0, ) # logger.debug("Returning system trace") - return self._traces[instance_system_id] + return self._traces[system_instance_id] if _already_locked: return _get_or_create() @@ -50,15 +48,15 @@ def _get_or_create(): # logger.debug("Lock acquired in get_or_create_system_trace") return _get_or_create() - def increment_partition(self, instance_system_id: str, system_id: str) -> int: + def increment_partition(self, system_instance_id: str, system_id: str) -> int: """Increment the partition index for a system and create new partition element.""" logger = logging.getLogger(__name__) - # logger.debug(f"Starting increment_partition for system {instance_system_id}") + # logger.debug(f"Starting increment_partition for system {system_instance_id}") with self._lock: # logger.debug("Lock acquired in increment_partition") system_trace = self.get_or_create_system_trace( - instance_system_id, system_id, _already_locked=True + system_instance_id, system_id, _already_locked=True ) # logger.debug( # f"Got system trace, current index: {system_trace.current_partition_index}" @@ -78,9 +76,9 @@ def increment_partition(self, instance_system_id: str, system_id: str) -> int: return system_trace.current_partition_index - def add_event(self, instance_system_id: str, system_id: str, event: Event): + def add_event(self, system_instance_id: str, system_id: str, event: Event): """Add an event to the appropriate partition of the system trace.""" - # self.#logger.debug(f"Adding event type {event.event_type} to system {instance_system_id}") + # self.#logger.debug(f"Adding event type {event.event_type} to system {system_instance_id}") # self.#logger.debug( # f"Event details: opened={event.opened}, closed={event.closed}, partition={event.partition_index}" # ) @@ -93,7 +91,7 @@ def add_event(self, instance_system_id: str, system_id: str, event: Event): try: system_trace = self.get_or_create_system_trace( - instance_system_id, system_id + system_instance_id, system_id ) # self.#logger.debug( # f"Got system trace with {len(system_trace.partition)} partitions" @@ -140,13 +138,13 @@ def end_all_active_events(self): # For synchronous code if hasattr(_local, "active_events"): active_events = _local.active_events - instance_system_id = getattr(_local, "instance_system_id", None) + system_instance_id = getattr(_local, "system_instance_id", None) system_id = getattr(_local, "system_id", None) - if active_events: # and instance_system_id: + if active_events: # and system_instance_id: for event_type, event in list(active_events.items()): if event.closed is None: event.closed = time.time() - self.add_event(event.instance_system_id, system_id, event) + self.add_event(event.system_instance_id, system_id, event) # self.#logger.debug(f"Stored and closed event {event_type}") _local.active_events.clear() @@ -154,14 +152,12 @@ def end_all_active_events(self): # For asynchronous code active_events_async = active_events_var.get() - - - if active_events_async: # and instance_system_id_async: + if active_events_async: # and system_instance_id_async: for event_type, event in list(active_events_async.items()): system_id = system_id_var.get() if event.closed is None: event.closed = time.time() - self.add_event(event.instance_system_id, system_id, event) + self.add_event(event.system_instance_id, system_id, event) # self.#logger.debug(f"Stored and closed event {event_type}") active_events_var.set({}) @@ -171,7 +167,7 @@ def get_system_traces_json(self) -> str: return json.dumps( [ { - "instance_system_id": trace.instance_system_id, + "system_instance_id": trace.system_instance_id, "current_partition_index": trace.current_partition_index, "partition": [ { diff --git a/synth_sdk/tracing/local.py b/synth_sdk/tracing/local.py index 1780177..1240789 100644 --- a/synth_sdk/tracing/local.py +++ b/synth_sdk/tracing/local.py @@ -2,14 +2,13 @@ import threading from contextvars import ContextVar - logger = logging.getLogger(__name__) -# Thread-local storage for active events and instance_system_id +# Thread-local storage for active events and system_instance_id # Used for synchronous tracing _local = threading.local() # Used for asynchronous tracing system_name_var: ContextVar[str] = ContextVar("system_name") system_id_var: ContextVar[str] = ContextVar("system_id") -instance_system_id_var: ContextVar[str] = ContextVar("instance_system_id") +system_instance_id_var: ContextVar[str] = ContextVar("system_instance_id") active_events_var: ContextVar[dict] = ContextVar("active_events", default={}) diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 04c83fb..8463399 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -4,7 +4,6 @@ import os import ssl import time -import uuid from pprint import pprint from typing import Any, Dict, List @@ -96,9 +95,7 @@ def send_system_traces_s3( system_id: str, verbose: bool = False, ): - upload_id, signed_url = get_upload_id( - base_url, api_key, system_id, verbose - ) + upload_id, signed_url = get_upload_id(base_url, api_key, system_id, verbose) load_signed_url(signed_url, dataset, traces) token_url = f"{base_url}/v1/auth/token" @@ -143,7 +140,7 @@ def send_system_traces_s3( def get_upload_id( - base_url: str, api_key: str, instance_system_id, verbose: bool = False + base_url: str, api_key: str, system_instance_id, verbose: bool = False ): token_url = f"{base_url}/v1/auth/token" token_response = requests.get( @@ -152,7 +149,7 @@ def get_upload_id( token_response.raise_for_status() access_token = token_response.json()["access_token"] - api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?instance_system_id={instance_system_id}" + api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?system_instance_id={system_instance_id}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}", @@ -186,8 +183,8 @@ def validate_traces(cls, traces): for trace in traces: # Validate required fields in each trace - if "instance_system_id" not in trace: - raise ValueError("Each trace must have a instance_system_id") + if "system_instance_id" not in trace: + raise ValueError("Each trace must have a system_instance_id") if "partition" not in trace: raise ValueError("Each trace must have a partition") @@ -251,7 +248,7 @@ def validate_dataset(cls, dataset): raise ValueError("Reward signals must be a list") for signal in reward_signals: - required_signal_fields = ["question_id", "instance_system_id", "reward"] + required_signal_fields = ["question_id", "system_instance_id", "reward"] missing_fields = [f for f in required_signal_fields if f not in signal] if missing_fields: raise ValueError( @@ -290,7 +287,7 @@ def format_upload_output(dataset, traces): # Format reward signals array with error handling reward_signals_data = [ { - "instance_system_id": rs.instance_system_id, + "system_instance_id": rs.system_instance_id, "reward": rs.reward, "question_id": rs.question_id, "annotation": rs.annotation if hasattr(rs, "annotation") else None, @@ -301,7 +298,7 @@ def format_upload_output(dataset, traces): # Format traces array traces_data = [ { - "instance_system_id": t.instance_system_id, + "system_instance_id": t.system_instance_id, "metadata": t.metadata if t.metadata else None, "partition": [ { @@ -353,10 +350,10 @@ def upload_helper( for event_type, event in _local.active_events.items(): if event and event.closed is None: event.closed = time.time() - if hasattr(_local, "instance_system_id"): + if hasattr(_local, "system_instance_id"): try: event_store.add_event( - _local.instance_system_id, _local.system_id, event + _local.system_instance_id, _local.system_id, event ) if verbose: print(f"Closed and stored active event: {event_type}") @@ -374,7 +371,9 @@ def upload_helper( for event in partition.events: if event.closed is None: event.closed = current_time - event_store.add_event(trace.instance_system_id, trace.system_id, event) + event_store.add_event( + trace.system_instance_id, trace.system_id, event + ) if verbose: print(f"Closed existing unclosed event: {event.event_type}") diff --git a/synth_sdk/tracing/utils.py b/synth_sdk/tracing/utils.py index d65a8da..5c5e6e7 100644 --- a/synth_sdk/tracing/utils.py +++ b/synth_sdk/tracing/utils.py @@ -2,7 +2,7 @@ def get_system_id(system_name: str) -> str: - """Create a deterministic instance_system_id from system_name using SHA-256.""" + """Create a deterministic system_instance_id from system_name using SHA-256.""" if not system_name: raise ValueError("system_name cannot be empty") # Create SHA-256 hash of system_name diff --git a/tutorials/AsyncAgentExample.py b/tutorials/AsyncAgentExample.py index 8c5cfb2..cc215e1 100644 --- a/tutorials/AsyncAgentExample.py +++ b/tutorials/AsyncAgentExample.py @@ -28,10 +28,10 @@ class TestAgent: def __init__(self): - self.instance_system_id = "test_agent_async" + self.system_instance_id = "test_agent_async" logger.debug( - "Initializing TestAgent with instance_system_id: %s", - self.instance_system_id, + "Initializing TestAgent with system_instance_id: %s", + self.system_instance_id, ) self.client = AsyncOpenAI() logger.debug("OpenAI client initialized") @@ -132,7 +132,7 @@ async def run_test(): reward_signals=[ RewardSignal( question_id=f"q{i}", - instance_system_id=agent.instance_system_id, + system_instance_id=agent.system_instance_id, reward=1.0, annotation="Test reward", ) @@ -186,9 +186,9 @@ async def run_test(): logger.debug("Cleaning up event: %s", event_type) if event.closed is None: event.closed = time.time() - if hasattr(_local, "instance_system_id"): + if hasattr(_local, "system_instance_id"): try: - event_store.add_event(_local.instance_system_id, event) + event_store.add_event(_local.system_instance_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) diff --git a/tutorials/SimpleAgentExample.py b/tutorials/SimpleAgentExample.py index 378d8af..751bc4c 100644 --- a/tutorials/SimpleAgentExample.py +++ b/tutorials/SimpleAgentExample.py @@ -27,10 +27,10 @@ class TestAgent: def __init__(self): - self.instance_system_id = "test_agent_sync" + self.system_instance_id = "test_agent_sync" logger.debug( - "Initializing TestAgent with instance_system_id: %s", - self.instance_system_id, + "Initializing TestAgent with system_instance_id: %s", + self.system_instance_id, ) # Initialize OpenAI client instead of LM @@ -135,7 +135,7 @@ def run_test(): reward_signals=[ RewardSignal( question_id=f"q{i}", - instance_system_id=agent.instance_system_id, + system_instance_id=agent.system_instance_id, reward=1.0, annotation="Test reward", ) @@ -194,9 +194,9 @@ def run_test(): logger.debug("Cleaning up event: %s", event_type) if event.closed is None: event.closed = time.time() - if hasattr(_local, "instance_system_id"): + if hasattr(_local, "system_instance_id"): try: - event_store.add_event(_local.instance_system_id, event) + event_store.add_event(_local.system_instance_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) diff --git a/tutorials/SyncInAsyncExample.py b/tutorials/SyncInAsyncExample.py index 2025db0..f74979d 100644 --- a/tutorials/SyncInAsyncExample.py +++ b/tutorials/SyncInAsyncExample.py @@ -28,10 +28,10 @@ class TestAgent: def __init__(self): - self.instance_system_id = "test_agent_sync" + self.system_instance_id = "test_agent_sync" logger.debug( - "Initializing TestAgent with instance_system_id: %s", - self.instance_system_id, + "Initializing TestAgent with system_instance_id: %s", + self.system_instance_id, ) # Initialize OpenAI client instead of LM @@ -134,7 +134,7 @@ async def run_test(): reward_signals=[ RewardSignal( question_id=f"q{i}", - instance_system_id=agent.instance_system_id, + system_instance_id=agent.system_instance_id, reward=1.0, annotation="Test reward", ) @@ -193,9 +193,9 @@ async def run_test(): logger.debug("Cleaning up event: %s", event_type) if event.closed is None: event.closed = time.time() - if hasattr(_local, "instance_system_id"): + if hasattr(_local, "system_instance_id"): try: - event_store.add_event(_local.instance_system_id, event) + event_store.add_event(_local.system_instance_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) diff --git a/tutorials/reward_signals.json b/tutorials/reward_signals.json index 32f8fe3..14c3d99 100644 --- a/tutorials/reward_signals.json +++ b/tutorials/reward_signals.json @@ -1,18 +1,18 @@ [ { - "instance_system_id": "test_agent_sync", + "system_instance_id": "test_agent_sync", "reward": 1.0, "question_id": "q0", "annotation": "Test reward" }, { - "instance_system_id": "test_agent_sync", + "system_instance_id": "test_agent_sync", "reward": 1.0, "question_id": "q1", "annotation": "Test reward" }, { - "instance_system_id": "test_agent_sync", + "system_instance_id": "test_agent_sync", "reward": 1.0, "question_id": "q2", "annotation": "Test reward" diff --git a/tutorials/reward_signals_async.json b/tutorials/reward_signals_async.json index ba17f5a..0b0485b 100644 --- a/tutorials/reward_signals_async.json +++ b/tutorials/reward_signals_async.json @@ -1,18 +1,18 @@ [ { - "instance_system_id": "test_agent_async", + "system_instance_id": "test_agent_async", "reward": 1.0, "question_id": "q0", "annotation": "Test reward" }, { - "instance_system_id": "test_agent_async", + "system_instance_id": "test_agent_async", "reward": 1.0, "question_id": "q1", "annotation": "Test reward" }, { - "instance_system_id": "test_agent_async", + "system_instance_id": "test_agent_async", "reward": 1.0, "question_id": "q2", "annotation": "Test reward" diff --git a/tutorials/traces.json b/tutorials/traces.json index 8f548ee..2984189 100644 --- a/tutorials/traces.json +++ b/tutorials/traces.json @@ -1,6 +1,6 @@ [ { - "instance_system_id": "test_agent_sync", + "system_instance_id": "test_agent_sync", "partition": [ { "partition_index": 0, diff --git a/tutorials/traces_async.json b/tutorials/traces_async.json index 6b8c52a..9a8693e 100644 --- a/tutorials/traces_async.json +++ b/tutorials/traces_async.json @@ -1,6 +1,6 @@ [ { - "instance_system_id": "test_agent_async", + "system_instance_id": "test_agent_async", "partition": [ { "partition_index": 0, From f5b4a7618cf4a442114b7197f9fed5cf65e641ba Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Tue, 17 Dec 2024 16:19:02 -0800 Subject: [PATCH 4/7] Easier local testing! --- pyproject.toml | 2 +- setup.py | 2 +- synth_sdk/tracing/upload.py | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 68f813e..8e69376 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "synth-sdk" -version = "0.2.99" +version = "0.2.100" description = "" authors = [{name = "Synth AI", email = "josh@usesynth.ai"}] license = {text = "MIT"} diff --git a/setup.py b/setup.py index 88af083..4b0cabe 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="synth-sdk", - version="0.2.99", + version="0.2.100", packages=find_packages(), install_requires=[ "opentelemetry-api", diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 8463399..33d64d4 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -342,6 +342,9 @@ def upload_helper( api_key = os.getenv("SYNTH_API_KEY") if not api_key: raise ValueError("SYNTH_API_KEY environment variable not set") + base_url = os.getenv( + "SYNTH_ENDPOINT_OVERRIDE", "https://agent-learning.onrender.com" + ) # End all active events before uploading from synth_sdk.tracing.decorators import _local @@ -395,7 +398,7 @@ def upload_helper( response, payload = send_system_traces_s3( dataset=dataset, traces=traces, - base_url="https://agent-learning.onrender.com", + base_url=base_url, api_key=api_key, system_id=traces[0].system_id, verbose=verbose, From 4b76c584b008aef64b5780a46ffd903d6a72fc25 Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Tue, 17 Dec 2024 16:58:21 -0800 Subject: [PATCH 5/7] remove typo in upload_id --- pyproject.toml | 2 +- setup.py | 2 +- synth_sdk/tracing/upload.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8e69376..7ca1911 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "synth-sdk" -version = "0.2.100" +version = "0.2.102" description = "" authors = [{name = "Synth AI", email = "josh@usesynth.ai"}] license = {text = "MIT"} diff --git a/setup.py b/setup.py index 4b0cabe..bba0397 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="synth-sdk", - version="0.2.100", + version="0.2.102", packages=find_packages(), install_requires=[ "opentelemetry-api", diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 33d64d4..00ecc7a 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -140,7 +140,7 @@ def send_system_traces_s3( def get_upload_id( - base_url: str, api_key: str, system_instance_id, verbose: bool = False + base_url: str, api_key: str, system_id, verbose: bool = False ): token_url = f"{base_url}/v1/auth/token" token_response = requests.get( @@ -149,7 +149,7 @@ def get_upload_id( token_response.raise_for_status() access_token = token_response.json()["access_token"] - api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?system_instance_id={system_instance_id}" + api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?system_id={system_id}" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}", From 4cb419eab80d6d0f078d7f184f83c4bffbe4b3a1 Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Tue, 17 Dec 2024 17:02:42 -0800 Subject: [PATCH 6/7] produce uuid ' --- synth_sdk/tracing/utils.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/synth_sdk/tracing/utils.py b/synth_sdk/tracing/utils.py index 5c5e6e7..9c7bb1e 100644 --- a/synth_sdk/tracing/utils.py +++ b/synth_sdk/tracing/utils.py @@ -1,11 +1,9 @@ -import hashlib +import uuid def get_system_id(system_name: str) -> str: - """Create a deterministic system_instance_id from system_name using SHA-256.""" + """Create a deterministic system_instance_id from system_name using UUID5.""" if not system_name: raise ValueError("system_name cannot be empty") - # Create SHA-256 hash of system_name - hash_object = hashlib.sha256(system_name.encode()) - # Take the first 16 characters of the hex digest for a shorter but still unique ID - return hash_object.hexdigest()[:16] + system_id = uuid.uuid5(uuid.NAMESPACE_DNS, system_name) + return str(system_id) From 823506855a4460e569241f237d8873d04d3d4eee Mon Sep 17 00:00:00 2001 From: Josh Purtell Date: Tue, 17 Dec 2024 19:19:13 -0800 Subject: [PATCH 7/7] changes --- pyproject.toml | 2 +- setup.py | 2 +- synth_sdk/tracing/abstractions.py | 2 + synth_sdk/tracing/decorators.py | 47 +++++++++++++------ synth_sdk/tracing/events/manage.py | 7 +-- synth_sdk/tracing/events/scope.py | 5 +- synth_sdk/tracing/events/store.py | 22 +++++---- synth_sdk/tracing/trackers.py | 26 +++++----- synth_sdk/tracing/upload.py | 43 +++++++++-------- .../records/episode_classic_2.json | 2 +- tutorials/AsyncAgentExample.py | 2 +- tutorials/SimpleAgentExample.py | 2 +- tutorials/SyncInAsyncExample.py | 2 +- 13 files changed, 98 insertions(+), 66 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7ca1911..6b08933 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "synth-sdk" -version = "0.2.102" +version = "0.2.103" description = "" authors = [{name = "Synth AI", email = "josh@usesynth.ai"}] license = {text = "MIT"} diff --git a/setup.py b/setup.py index bba0397..1c6d20c 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="synth-sdk", - version="0.2.102", + version="0.2.103", packages=find_packages(), install_requires=[ "opentelemetry-api", diff --git a/synth_sdk/tracing/abstractions.py b/synth_sdk/tracing/abstractions.py index 146dfbf..fc29b73 100644 --- a/synth_sdk/tracing/abstractions.py +++ b/synth_sdk/tracing/abstractions.py @@ -133,6 +133,7 @@ def to_dict(self): @dataclass class SystemTrace: + system_name: str system_id: str system_instance_id: str metadata: Optional[Dict[str, Any]] @@ -141,6 +142,7 @@ class SystemTrace: def to_dict(self): return { + "system_name": self.system_name, "system_id": self.system_id, "system_instance_id": self.system_instance_id, "partition": [element.to_dict() for element in self.partition], diff --git a/synth_sdk/tracing/decorators.py b/synth_sdk/tracing/decorators.py index ccdb102..96e43c1 100644 --- a/synth_sdk/tracing/decorators.py +++ b/synth_sdk/tracing/decorators.py @@ -98,14 +98,15 @@ def wrapper(*args, **kwargs): ) if increment_partition: event.partition_index = event_store.increment_partition( - _local.system_instance_id, + _local.system_name, _local.system_id, + _local.system_instance_id, ) - logger.debug( - f"Incremented partition to: {event.partition_index}" - ) + # logger.debug( + # f"Incremented partition to: {event.partition_index}" + # ) set_current_event(event, decorator_type="sync") - logger.debug(f"Created and set new event: {event_type}") + # logger.debug(f"Created and set new event: {event_type}") # Automatically trace function inputs bound_args = inspect.signature(func).bind(*args, **kwargs) @@ -227,11 +228,8 @@ def wrapper(*args, **kwargs): # Store the event if hasattr(_local, "system_instance_id"): event_store.add_event( - _local.system_instance_id, _local.system_id, current_event + _local.system_name, _local.system_id, _local.system_instance_id, current_event ) - # logger.debug( - # f"Stored and closed event {event_type} for system {_local.system_instance_id}" - # ) del _local.active_events[event_type] return result @@ -316,14 +314,16 @@ async def async_wrapper(*args, **kwargs): ) if increment_partition: event.partition_index = event_store.increment_partition( - system_instance_id_var.get(), system_id_var.get() - ) - logger.debug( - f"Incremented partition to: {event.partition_index}" + system_name_var.get(), + system_id_var.get(), + system_instance_id_var.get(), ) + # logger.debug( + # f"Incremented partition to: {event.partition_index}" + # ) set_current_event(event, decorator_type="async") - logger.debug(f"Created and set new event: {event_type}") + # logger.debug(f"Created and set new event: {event_type}") # Automatically trace function inputs bound_args = inspect.signature(func).bind(*args, **kwargs) @@ -446,14 +446,31 @@ async def async_wrapper(*args, **kwargs): # Store the event if system_instance_id_var.get(): event_store.add_event( - system_instance_id_var.get(), + system_name_var.get(), system_id_var.get(), + system_instance_id_var.get(), current_event, ) active_events = active_events_var.get() del active_events[event_type] active_events_var.set(active_events) + # Auto-close and store events created with manage_event="create" + if manage_event == "create" and event is not None and event.closed is None: + event.closed = time.time() + active_events_dict = active_events_var.get() + if event_type in active_events_dict: + # Store the event while context vars are still valid + event_store.add_event( + system_name_var.get(), + system_id_var.get(), + system_instance_id_var.get(), + event, + ) + # Remove from active events + active_events_dict.pop(event_type, None) + active_events_var.set(active_events_dict) + return result except Exception as e: logger.error(f"Exception in traced function '{func.__name__}': {e}") diff --git a/synth_sdk/tracing/events/manage.py b/synth_sdk/tracing/events/manage.py index 4b183eb..6158efe 100644 --- a/synth_sdk/tracing/events/manage.py +++ b/synth_sdk/tracing/events/manage.py @@ -64,7 +64,7 @@ def set_current_event( ) try: event_store.add_event( - _local.system_instance_id, _local.system_id, existing_event + _local.system_name, _local.system_id, _local.system_instance_id, existing_event ) logger.debug("Successfully stored closed event") except Exception as e: @@ -78,6 +78,7 @@ def set_current_event( else: from synth_sdk.tracing.local import ( active_events_var, + system_name_var, system_id_var, system_instance_id_var, ) @@ -105,7 +106,7 @@ def set_current_event( ) try: event_store.add_event( - system_instance_id, system_id_var.get(), existing_event + system_name_var.get(), system_id_var.get(), system_instance_id, existing_event ) logger.debug("Successfully stored closed event") except Exception as e: @@ -132,7 +133,7 @@ def end_event(event_type: str) -> Optional[Event]: # Store the event if hasattr(_local, "system_instance_id"): event_store.add_event( - _local.system_instance_id, _local.system_id, current_event + _local.system_name, _local.system_id, _local.system_instance_id, current_event ) clear_current_event(event_type) return current_event diff --git a/synth_sdk/tracing/events/scope.py b/synth_sdk/tracing/events/scope.py index 6715327..ffa8779 100644 --- a/synth_sdk/tracing/events/scope.py +++ b/synth_sdk/tracing/events/scope.py @@ -4,7 +4,7 @@ from synth_sdk.tracing.abstractions import Event from synth_sdk.tracing.decorators import _local, clear_current_event, set_current_event from synth_sdk.tracing.events.store import event_store -from synth_sdk.tracing.local import system_id_var, system_instance_id_var +from synth_sdk.tracing.local import system_name_var, system_id_var, system_instance_id_var @contextmanager @@ -32,6 +32,7 @@ def event_scope(event_type: str): else getattr(_local, "system_instance_id", None) ) system_id = system_id_var.get() if is_async else getattr(_local, "system_id", None) + system_name = system_name_var.get() if is_async else getattr(_local, "system_name", None) event = Event( system_instance_id=system_instance_id, @@ -51,4 +52,4 @@ def event_scope(event_type: str): clear_current_event(event_type) # Store the event if system_instance_id is available if system_instance_id: - event_store.add_event(system_instance_id, system_id, event) + event_store.add_event(system_name, system_id, system_instance_id, event) diff --git a/synth_sdk/tracing/events/store.py b/synth_sdk/tracing/events/store.py index 12a2c12..74308f6 100644 --- a/synth_sdk/tracing/events/store.py +++ b/synth_sdk/tracing/events/store.py @@ -21,7 +21,11 @@ def __init__(self): self.logger = logging.getLogger(__name__) def get_or_create_system_trace( - self, system_instance_id: str, system_id: str, _already_locked: bool = False + self, + system_name: str, + system_id: str, + system_instance_id: str, + _already_locked: bool = False ) -> SystemTrace: """Get or create a SystemTrace for the given system_instance_id.""" logger = logging.getLogger(__name__) @@ -32,6 +36,7 @@ def _get_or_create(): if system_instance_id not in self._traces: # logger.debug(f"Creating new system trace for {system_instance_id}") self._traces[system_instance_id] = SystemTrace( + system_name=system_name, system_id=system_id, system_instance_id=system_instance_id, metadata={}, @@ -48,7 +53,7 @@ def _get_or_create(): # logger.debug("Lock acquired in get_or_create_system_trace") return _get_or_create() - def increment_partition(self, system_instance_id: str, system_id: str) -> int: + def increment_partition(self, system_name: str, system_id: str, system_instance_id: str) -> int: """Increment the partition index for a system and create new partition element.""" logger = logging.getLogger(__name__) # logger.debug(f"Starting increment_partition for system {system_instance_id}") @@ -56,7 +61,7 @@ def increment_partition(self, system_instance_id: str, system_id: str) -> int: with self._lock: # logger.debug("Lock acquired in increment_partition") system_trace = self.get_or_create_system_trace( - system_instance_id, system_id, _already_locked=True + system_name, system_id, system_instance_id, _already_locked=True ) # logger.debug( # f"Got system trace, current index: {system_trace.current_partition_index}" @@ -76,7 +81,7 @@ def increment_partition(self, system_instance_id: str, system_id: str) -> int: return system_trace.current_partition_index - def add_event(self, system_instance_id: str, system_id: str, event: Event): + def add_event(self, system_name: str, system_id: str, system_instance_id: str, event: Event): """Add an event to the appropriate partition of the system trace.""" # self.#logger.debug(f"Adding event type {event.event_type} to system {system_instance_id}") # self.#logger.debug( @@ -91,7 +96,7 @@ def add_event(self, system_instance_id: str, system_id: str, event: Event): try: system_trace = self.get_or_create_system_trace( - system_instance_id, system_id + system_name, system_id, system_instance_id, _already_locked=True ) # self.#logger.debug( # f"Got system trace with {len(system_trace.partition)} partitions" @@ -139,25 +144,26 @@ def end_all_active_events(self): if hasattr(_local, "active_events"): active_events = _local.active_events system_instance_id = getattr(_local, "system_instance_id", None) + system_name = getattr(_local, "system_name", None) system_id = getattr(_local, "system_id", None) if active_events: # and system_instance_id: for event_type, event in list(active_events.items()): if event.closed is None: event.closed = time.time() - self.add_event(event.system_instance_id, system_id, event) + self.add_event(system_name, system_id, event.system_instance_id, event) # self.#logger.debug(f"Stored and closed event {event_type}") _local.active_events.clear() else: # For asynchronous code active_events_async = active_events_var.get() - + print("Active events async:", active_events_async.items()) if active_events_async: # and system_instance_id_async: for event_type, event in list(active_events_async.items()): system_id = system_id_var.get() if event.closed is None: event.closed = time.time() - self.add_event(event.system_instance_id, system_id, event) + self.add_event(system_name, system_id, event.system_instance_id, event) # self.#logger.debug(f"Stored and closed event {event_type}") active_events_var.set({}) diff --git a/synth_sdk/tracing/trackers.py b/synth_sdk/tracing/trackers.py index ce5887b..c348ce7 100644 --- a/synth_sdk/tracing/trackers.py +++ b/synth_sdk/tracing/trackers.py @@ -38,7 +38,7 @@ def track_lm( "model_name": model_name, "finetune": finetune, }) - #logger.debug("Tracked LM interaction") + # logger.debug("Tracked LM interaction") else: pass # raise RuntimeError( @@ -67,7 +67,7 @@ def track_state( "variable_value": variable_value, "annotation": annotation, }) - #logger.debug(f"Tracked state: {variable_name}") + # logger.debug(f"Tracked state: {variable_name}") else: pass # raise RuntimeError( @@ -84,7 +84,7 @@ def finalize(cls): cls._local.initialized = False cls._local.inputs = [] cls._local.outputs = [] - #logger.debug("Finalized trace data") + # logger.debug("Finalized trace data") # Context variables for asynchronous tracing @@ -104,7 +104,7 @@ def initialize(cls): trace_initialized_var.set(True) trace_inputs_var.set([]) # List of tuples: (origin, var) trace_outputs_var.set([]) # List of tuples: (origin, var) - #logger.debug("AsyncTrace initialized") + # logger.debug("AsyncTrace initialized") @classmethod def track_lm( @@ -122,7 +122,7 @@ def track_lm( "finetune": finetune, }) trace_inputs_var.set(trace_inputs) - #logger.debug("Tracked LM interaction") + # logger.debug("Tracked LM interaction") else: pass # raise RuntimeError( @@ -164,7 +164,7 @@ def track_state( "annotation": annotation, }) trace_outputs_var.set(trace_outputs) - #logger.debug(f"Tracked state: {variable_name}") + # logger.debug(f"Tracked state: {variable_name}") else: pass # raise RuntimeError( @@ -182,7 +182,7 @@ def finalize(cls): trace_initialized_var.set(False) trace_inputs_var.set([]) trace_outputs_var.set([]) - logger.debug("Finalized async trace data") + # logger.debug("Finalized async trace data") # Make traces available globally @@ -246,10 +246,10 @@ def process_chat(self, user_input: str): ``` """ if cls.is_called_by_async() and trace_initialized_var.get(): - logger.debug("Using async tracker to track LM") + # logger.debug("Using async tracker to track LM") synth_tracker_async.track_lm(messages, model_name, finetune) elif getattr(synth_tracker_sync._local, "initialized", False): - logger.debug("Using sync tracker to track LM") + # logger.debug("Using sync tracker to track LM") synth_tracker_sync.track_lm(messages, model_name, finetune) else: # raise RuntimeError("Trace not initialized in track_lm.") @@ -302,10 +302,10 @@ def update_state(self, new_value: dict): ``` """ if cls.is_called_by_async() and trace_initialized_var.get(): - logger.debug("Using async tracker to track state") + # logger.debug("Using async tracker to track state") synth_tracker_async.track_state(variable_name, variable_value, origin, annotation) elif getattr(synth_tracker_sync._local, "initialized", False): - logger.debug("Using sync tracker to track state") + # logger.debug("Using sync tracker to track state") synth_tracker_sync.track_state(variable_name, variable_value, origin, annotation) else: #raise RuntimeError("Trace not initialized in track_state.") @@ -319,13 +319,13 @@ def get_traced_data( traced_inputs, traced_outputs = [], [] if async_sync in ["async", ""]: - #logger.debug("Getting traced data from async tracker") + # logger.debug("Getting traced data from async tracker") traced_inputs_async, traced_outputs_async = synth_tracker_async.get_traced_data() traced_inputs.extend(traced_inputs_async) traced_outputs.extend(traced_outputs_async) if async_sync in ["sync", ""]: - #logger.debug("Getting traced data from sync tracker") + # logger.debug("Getting traced data from sync tracker") traced_inputs_sync, traced_outputs_sync = synth_tracker_sync.get_traced_data() traced_inputs.extend(traced_inputs_sync) traced_outputs.extend(traced_outputs_sync) diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 00ecc7a..6090eda 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -93,9 +93,12 @@ def send_system_traces_s3( base_url: str, api_key: str, system_id: str, + system_name: str, verbose: bool = False, ): - upload_id, signed_url = get_upload_id(base_url, api_key, system_id, verbose) + upload_id, signed_url = get_upload_id( + base_url, api_key, system_id, system_name, verbose + ) load_signed_url(signed_url, dataset, traces) token_url = f"{base_url}/v1/auth/token" @@ -140,8 +143,11 @@ def send_system_traces_s3( def get_upload_id( - base_url: str, api_key: str, system_id, verbose: bool = False + base_url: str, api_key: str, system_id: str, system_name: str, verbose: bool = False ): + """ + Modified client-side function to send both system_id and system_name. + """ token_url = f"{base_url}/v1/auth/token" token_response = requests.get( token_url, headers={"customer_specific_api_key": api_key} @@ -149,7 +155,11 @@ def get_upload_id( token_response.raise_for_status() access_token = token_response.json()["access_token"] - api_url = f"{base_url}/v1/uploads/get-upload-id-signed-url?system_id={system_id}" + # Include system_name in the query parameters + api_url = ( + f"{base_url}/v1/uploads/get-upload-id-signed-url?" + f"system_id={system_id}&system_name={system_name}" + ) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}", @@ -356,7 +366,10 @@ def upload_helper( if hasattr(_local, "system_instance_id"): try: event_store.add_event( - _local.system_instance_id, _local.system_id, event + _local.system_name, + _local.system_id, + _local.system_instance_id, + event, ) if verbose: print(f"Closed and stored active event: {event_type}") @@ -375,7 +388,10 @@ def upload_helper( if event.closed is None: event.closed = current_time event_store.add_event( - trace.system_instance_id, trace.system_id, event + trace.system_name, + trace.system_id, + trace.system_instance_id, + event, ) if verbose: print(f"Closed existing unclosed event: {event.event_type}") @@ -395,31 +411,20 @@ def upload_helper( print("Upload format validation successful") # Send to server - response, payload = send_system_traces_s3( + upload_id, signed_url = send_system_traces_s3( dataset=dataset, traces=traces, base_url=base_url, api_key=api_key, system_id=traces[0].system_id, + system_name=traces[0].system_name, verbose=verbose, ) - if verbose: - print("Response status code:", response.status_code) - if response.status_code == 202: - print(f"Upload successful - sent {len(traces)} system traces.") - print( - f"Dataset included {len(dataset.questions)} questions and {len(dataset.reward_signals)} reward signals." - ) - - if show_payload: - print("Payload sent to server: ") - pprint(payload) - questions_json, reward_signals_json, traces_json = format_upload_output( dataset, traces ) - return response, questions_json, reward_signals_json, traces_json + return upload_id, questions_json, reward_signals_json, traces_json except ValueError as e: if verbose: diff --git a/tests/iteration/craftax/generate_data/records/episode_classic_2.json b/tests/iteration/craftax/generate_data/records/episode_classic_2.json index b0a8f50..1ae3717 100644 --- a/tests/iteration/craftax/generate_data/records/episode_classic_2.json +++ b/tests/iteration/craftax/generate_data/records/episode_classic_2.json @@ -1,5 +1,5 @@ { - "Collect Wood": false, + "Collect Wood": true, "Place Table": false, "Eat Cow": false, "Collect Sapling": false, diff --git a/tutorials/AsyncAgentExample.py b/tutorials/AsyncAgentExample.py index cc215e1..e7f7293 100644 --- a/tutorials/AsyncAgentExample.py +++ b/tutorials/AsyncAgentExample.py @@ -188,7 +188,7 @@ async def run_test(): event.closed = time.time() if hasattr(_local, "system_instance_id"): try: - event_store.add_event(_local.system_instance_id, event) + event_store.add_event(_local.system_name, _local.system_id, _local.system_instance_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) diff --git a/tutorials/SimpleAgentExample.py b/tutorials/SimpleAgentExample.py index 751bc4c..9d538a6 100644 --- a/tutorials/SimpleAgentExample.py +++ b/tutorials/SimpleAgentExample.py @@ -196,7 +196,7 @@ def run_test(): event.closed = time.time() if hasattr(_local, "system_instance_id"): try: - event_store.add_event(_local.system_instance_id, event) + event_store.add_event(_local.system_name, _local.system_id, _local.system_instance_id, event) logger.debug( "Successfully cleaned up event: %s", event_type ) diff --git a/tutorials/SyncInAsyncExample.py b/tutorials/SyncInAsyncExample.py index f74979d..79d2556 100644 --- a/tutorials/SyncInAsyncExample.py +++ b/tutorials/SyncInAsyncExample.py @@ -195,7 +195,7 @@ async def run_test(): event.closed = time.time() if hasattr(_local, "system_instance_id"): try: - event_store.add_event(_local.system_instance_id, event) + event_store(_local.system_name, _local.system_id, _local.system_instance_id, event) logger.debug( "Successfully cleaned up event: %s", event_type )