diff --git a/.gitignore b/.gitignore index e6ec1d1..d5de451 100644 --- a/.gitignore +++ b/.gitignore @@ -163,3 +163,7 @@ cython_debug/ # uv.lock is annoying uv.lock + + +dev/** */ +dev \ No newline at end of file diff --git a/testing/__init__.py b/testing/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/testing/ai_agent_async.py b/testing/ai_agent_async.py deleted file mode 100644 index c9dc06d..0000000 --- a/testing/ai_agent_async.py +++ /dev/null @@ -1,196 +0,0 @@ -from zyk import LM -from synth_sdk.tracing.decorators import trace_system, _local -from synth_sdk.tracing.trackers import SynthTracker -from synth_sdk.tracing.upload import upload -from synth_sdk.tracing.abstractions import TrainingQuestion, RewardSignal, Dataset -from synth_sdk.tracing.events.store import event_store -import asyncio -import synth_sdk.config.settings -import time -import json -import logging - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, # Changed from CRITICAL to DEBUG - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - - -class TestAgent: - def __init__(self): - self.system_id = "test_agent_async" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) - self.lm = LM( - model_name="gpt-4o-mini-2024-07-18", - formatting_model_name="gpt-4o-mini-2024-07-18", - temperature=1, - ) - logger.debug("LM initialized") - - @trace_system( - origin="agent", - event_type="lm_call", - manage_event="create", - increment_partition=True, - verbose=True, - ) - async def make_lm_call(self, user_message: str) -> str: - # Only pass the user message, not self - #SynthTracker.track_input([user_message], variable_name="user_message", origin="agent") - - logger.debug("Starting LM call with message: %s", user_message) - response = await self.lm.respond_async( - system_message="You are a helpful assistant.", user_message=user_message - ) - SynthTracker.track_lm( - messages = [{"role": "user", "content": user_message}, {"role": "assistant", "content": response}], - model_name = self.lm.model_name, - finetune = False - ) - SynthTracker.track_state( - variable_name = "minecraft_screen_description", - variable_value = None, - origin = "environment", - annotation = "Minecraft screen description" - ) - - #SynthTracker.track_output(response, variable_name="response", origin="agent") - - logger.debug("LM response received: %s", response) - time.sleep(0.1) - return response - - @trace_system( - origin="environment", - event_type="environment_processing", - manage_event="create", - verbose=True, - ) - async def process_environment(self, input_data: str) -> dict: - # Only pass the input data, not self - SynthTracker.track_state( - variable_name="input_data", - variable_value=input_data, - origin="environment", - annotation=None # Optional: you can add an annotation if needed - ) - - result = {"processed": input_data, "timestamp": time.time()} - - SynthTracker.track_state( - variable_name="result", - variable_value=result, - origin="environment" - ) - return result - - -async def run_test(): - logger.info("Starting run_test") - # Create test agent - agent = TestAgent() - - try: - # List of test questions - questions = [ - "What's the capital of France?", - "What's 2+2?", - "Who wrote Romeo and Juliet?", - ] - logger.debug("Test questions initialized: %s", questions) - - # Make multiple LM calls with environment processing - responses = [] - for i, question in enumerate(questions): - logger.info("Processing question %d: %s", i, question) - try: - # First process in environment - env_result = await agent.process_environment(question) - logger.debug("Environment processing result: %s", env_result) - - # Then make LM call - response = await agent.make_lm_call(question) - responses.append(response) - logger.debug("Response received and stored: %s", response) - except Exception as e: - logger.error("Error during processing: %s", str(e), exc_info=True) - continue - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = Dataset( - questions=[ - TrainingQuestion( - intent="Test question", - criteria="Testing tracing functionality", - question_id=f"q{i}", - ) - for i in range(len(questions)) - ], - reward_signals=[ - RewardSignal( - question_id=f"q{i}", - system_id=agent.system_id, - reward=1.0, - annotation="Test reward", - ) - for i in range(len(questions)) - ], - ) - logger.debug( - "Dataset created with %d questions and %d reward signals", - len(dataset.questions), - len(dataset.reward_signals), - ) - - # Upload traces - try: - logger.info("Attempting to upload traces") - response, payload, dataset, traces = await upload(dataset=dataset, verbose=True) - logger.info("Upload successful!") - print("Upload successful!") - except Exception as e: - logger.error("Upload failed: %s", str(e), exc_info=True) - print(f"Upload failed: {str(e)}") - - # Print debug information - traces = event_store.get_system_traces() - logger.debug("Retrieved %d system traces", len(traces)) - print("\nTraces:") - print(json.dumps([trace.to_dict() for trace in traces], indent=2)) - - print("\nDataset:") - print(json.dumps(dataset.to_dict(), indent=2)) - finally: - logger.info("Starting cleanup") - # Cleanup - if hasattr(_local, "active_events"): - for event_type, event in _local.active_events.items(): - logger.debug("Cleaning up event: %s", event_type) - if event.closed is None: - event.closed = time.time() - if hasattr(_local, "system_id"): - try: - event_store.add_event(_local.system_id, event) - logger.debug( - "Successfully cleaned up event: %s", event_type - ) - except Exception as e: - logger.error( - "Error during cleanup of event %s: %s", - event_type, - str(e), - exc_info=True, - ) - print( - f"Error during cleanup of event {event_type}: {str(e)}" - ) - logger.info("Cleanup completed") - -# Run a sample agent using the async decorator and tracker -if __name__ == "__main__": - logger.info("Starting main execution") - asyncio.run(run_test()) - logger.info("Main execution completed") diff --git a/testing/ai_agent_sync.py b/testing/ai_agent_sync.py deleted file mode 100644 index 0164fc3..0000000 --- a/testing/ai_agent_sync.py +++ /dev/null @@ -1,193 +0,0 @@ -from zyk import LM -from synth_sdk.tracing.decorators import trace_system, _local -from synth_sdk.tracing.trackers import SynthTracker -from synth_sdk.tracing.upload import upload -from synth_sdk.tracing.abstractions import TrainingQuestion, RewardSignal, Dataset -from synth_sdk.tracing.events.store import event_store -import asyncio -import synth_sdk.config.settings -import time -import json -import logging - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, # Changed from CRITICAL to DEBUG - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - - -class TestAgent: - def __init__(self): - self.system_id = "test_agent_sync" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) - self.lm = LM( - model_name="gpt-4o-mini-2024-07-18", - formatting_model_name="gpt-4o-mini-2024-07-18", - temperature=1, - ) - logger.debug("LM initialized") - - @trace_system( - origin="agent", - event_type="lm_call", - manage_event="create", - increment_partition=True, - verbose=True, - ) - def make_lm_call(self, user_message: str) -> str: - logger.debug("Starting LM call with message: %s", user_message) - response = self.lm.respond_sync( - system_message="You are a helpful assistant.", user_message=user_message - ) - - # Track LM interaction - SynthTracker.track_lm( - messages=[{"role": "user", "content": user_message}, - {"role": "assistant", "content": response}], - model_name=self.lm.model_name, - finetune=False - ) - - # Track additional state if needed - SynthTracker.track_state( - variable_name="minecraft_screen_description", - variable_value=None, - origin="environment", - annotation="Minecraft screen description" - ) - - logger.debug("LM response received: %s", response) - time.sleep(0.1) - return response - - @trace_system( - origin="environment", - event_type="environment_processing", - manage_event="create", - verbose=True, - ) - def process_environment(self, input_data: str) -> dict: - # Track input state - SynthTracker.track_state( - variable_name="input_data", - variable_value=input_data, - origin="environment", - annotation=None - ) - - result = {"processed": input_data, "timestamp": time.time()} - - # Track result state - SynthTracker.track_state( - variable_name="result", - variable_value=result, - origin="environment" - ) - return result - - -async def run_test(): - logger.info("Starting run_test") - agent = TestAgent() - - try: - questions = [ - "What's the capital of France?", - "What's 2+2?", - "Who wrote Romeo and Juliet?", - ] - logger.debug("Test questions initialized: %s", questions) - - responses = [] - for i, question in enumerate(questions): - logger.info("Processing question %d: %s", i, question) - try: - env_result = agent.process_environment(question) - logger.debug("Environment processing result: %s", env_result) - - response = agent.make_lm_call(question) - responses.append(response) - logger.debug("Response received and stored: %s", response) - except Exception as e: - logger.error("Error during processing: %s", str(e), exc_info=True) - continue - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = Dataset( - questions=[ - TrainingQuestion( - intent="Test question", - criteria="Testing tracing functionality", - question_id=f"q{i}", - ) - for i in range(len(questions)) - ], - reward_signals=[ - RewardSignal( - question_id=f"q{i}", - system_id=agent.system_id, - reward=1.0, - annotation="Test reward", - ) - for i in range(len(questions)) - ], - ) - logger.debug( - "Dataset created with %d questions and %d reward signals", - len(dataset.questions), - len(dataset.reward_signals), - ) - - # Upload traces - try: - logger.info("Attempting to upload traces") - response, payload, dataset, traces= await upload(dataset=dataset, verbose=True) - logger.info("Upload successful!") - print("Upload successful!") - except Exception as e: - logger.error("Upload failed: %s", str(e), exc_info=True) - print(f"Upload failed: {str(e)}") - - # Print debug information - traces = event_store.get_system_traces() - logger.debug("Retrieved %d system traces", len(traces)) - print("\nTraces:") - print(json.dumps([trace.to_dict() for trace in traces], indent=2)) - - print("\nDataset:") - print(json.dumps(dataset.to_dict(), indent=2)) - finally: - logger.info("Starting cleanup") - # Cleanup - if hasattr(_local, "active_events"): - for event_type, event in _local.active_events.items(): - logger.debug("Cleaning up event: %s", event_type) - if event.closed is None: - event.closed = time.time() - if hasattr(_local, "system_id"): - try: - event_store.add_event(_local.system_id, event) - logger.debug( - "Successfully cleaned up event: %s", event_type - ) - except Exception as e: - logger.error( - "Error during cleanup of event %s: %s", - event_type, - str(e), - exc_info=True, - ) - print( - f"Error during cleanup of event {event_type}: {str(e)}" - ) - logger.info("Cleanup completed") - - -# Run a sample agent using the sync decorator and tracker -if __name__ == "__main__": - logger.info("Starting main execution") - asyncio.run(run_test()) - logger.info("Main execution completed") diff --git a/testing/circular.py b/testing/circular.py deleted file mode 100644 index 797711d..0000000 --- a/testing/circular.py +++ /dev/null @@ -1,82 +0,0 @@ -import ast -import synth_sdk.config.settings -import os -from collections import defaultdict - - -def get_imports(file_path): - """Extract all imports from a Python file.""" - with open(file_path) as f: - tree = ast.parse(f.read()) - - imports = [] - for node in ast.walk(tree): - if isinstance(node, ast.Import): - for name in node.names: - imports.append(name.name) - elif isinstance(node, ast.ImportFrom): - module = node.module if node.module else "" - for name in node.names: - full_name = f"{module}.{name.name}" if module else name.name - imports.append(full_name) - return imports - - -def find_circular_imports(directory): - """Find potential circular imports in a directory.""" - # Map files to their imports - file_imports = {} - # Map modules to the files that import them - imported_by = defaultdict(list) - - # Walk through all Python files - for root, _, files in os.walk(directory): - for file in files: - if file.endswith(".py"): - file_path = os.path.join(root, file) - module_path = os.path.relpath(file_path, directory).replace("/", ".")[ - :-3 - ] - - try: - imports = get_imports(file_path) - file_imports[module_path] = imports - - # Record which files import this module - for imp in imports: - imported_by[imp].append(module_path) - except SyntaxError: - print(f"Syntax error in {file_path}") - continue - - # Check for circular dependencies - def check_circular(module, visited, path): - if module in path: - cycle = path[path.index(module) :] + [module] - return f"Circular import detected: {' -> '.join(cycle)}" - - if module in visited: - return None - - visited.add(module) - path.append(module) - - if module in file_imports: - for imp in file_imports[module]: - result = check_circular(imp, visited.copy(), path.copy()) - if result: - return result - - return None - - # Check each module - for module in file_imports: - result = check_circular(module, set(), []) - if result: - print(result) - - -if __name__ == "__main__": - # Use the current directory, or specify your package directory - directory = "synth_sdk" - find_circular_imports(directory) diff --git a/testing/decorator.py b/testing/decorator.py deleted file mode 100644 index 8310792..0000000 --- a/testing/decorator.py +++ /dev/null @@ -1,34 +0,0 @@ -from synth_sdk.tracing.decorators import trace_system_async -import synth_sdk.config.settings -import time -import logging -import asyncio - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) - - -class TestClass: - def __init__(self): - self.system_id = "test_system" - - @trace_system_async( - origin="agent", event_type="test", manage_event="create", - increment_partition=True, verbose=True - ) - async def test_method(self, x): - time.sleep(0.1) # Simulate work - return x * 2 - - -async def test_decorator(): - test = TestClass() - result = await test.test_method(5) - print(f"Result: {result}") - - -if __name__ == "__main__": - - asyncio.run(test_decorator()) diff --git a/testing/openai_autologging.py b/testing/openai_autologging.py deleted file mode 100644 index c1f81ff..0000000 --- a/testing/openai_autologging.py +++ /dev/null @@ -1,67 +0,0 @@ -import asyncio -import logging -import json -import os -from typing import List -from synth_sdk.provider_support.openai_lf import AsyncOpenAI -from synth_sdk.tracing.decorators import trace_system -from synth_sdk.tracing.events.store import event_store -from synth_sdk.tracing.abstractions import Event, SystemTrace - -from dotenv import load_dotenv - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, # Set to DEBUG to capture all logs - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - -class OpenAIAgent: - def __init__(self): - self.system_id = "openai_agent_async_test" - logger.debug("Initializing OpenAIAgent with system_id: %s", self.system_id) - self.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Replace with your actual API key - load_dotenv() - @trace_system( - origin="agent", - event_type="openai_completion", - manage_event="create", - increment_partition=True, - verbose=True, - ) - async def get_completion(self, prompt: str) -> str: - logger.debug("Sending prompt to OpenAI: %s", prompt) - try: - response = await self.openai.chat.completions.create( - model="gpt-4o-mini-2024-07-18", - messages = [{"role": "user", "content": prompt}], - max_tokens=50, - ) - completion_text = response.choices[0].message.content - logger.debug("Received completion: %s", completion_text) - return completion_text - except Exception as e: - logger.error("Error during OpenAI call: %s", str(e), exc_info=True) - raise - -async def run_test(): - logger.info("Starting OpenAI Agent Async Test") - agent = OpenAIAgent() - prompt = "Explain the theory of relativity in simple terms." - - try: - completion = await agent.get_completion(prompt) - print(f"OpenAI Completion:\n{completion}") - except Exception as e: - print(f"An error occurred: {str(e)}") - - # Retrieve and display traces from the event store - logger.info("Retrieving system traces from event store") - traces: List[SystemTrace] = event_store.get_system_traces() - print("\nRetrieved System Traces:") - for trace in traces: - print(json.dumps(trace.to_dict(), indent=2)) - -if __name__ == "__main__": - asyncio.run(run_test()) diff --git a/testing/pytest_test.py b/testing/pytest_test.py deleted file mode 100644 index 809cb45..0000000 --- a/testing/pytest_test.py +++ /dev/null @@ -1,13 +0,0 @@ -import pytest - -def test_example(): - assert 1 + 1 == 2 - -def test_string(): - assert "hello".upper() == "HELLO" - -try: - def test_list(): - assert len([1, 2, 3]) == 3 -except Exception as e: - print(f"Error: {str(e)}") \ No newline at end of file diff --git a/testing/traces_test.py b/testing/traces_test.py deleted file mode 100644 index bad92b1..0000000 --- a/testing/traces_test.py +++ /dev/null @@ -1,252 +0,0 @@ -from zyk import LM -from synth_sdk.tracing.decorators import trace_system, _local -from synth_sdk.tracing.trackers import SynthTracker -from synth_sdk.tracing.upload import upload, validate_json, createPayload -from synth_sdk.tracing.abstractions import ( - TrainingQuestion, RewardSignal, Dataset, SystemTrace, EventPartitionElement, - MessageInputs, MessageOutputs, ArbitraryInputs, ArbitraryOutputs -) -from synth_sdk.tracing.events.store import event_store -from typing import Dict, List -import asyncio -import time -import logging -import pytest -from unittest.mock import MagicMock, patch -import requests - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, # Changed from CRITICAL to DEBUG - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - -# Unit Test Configuration: -# =============================== -questions = ["What's the capital of France?"] -mock_llm_response = "The capital of France is Paris." - -eventPartition_test = [ - EventPartitionElement(0, []), - EventPartitionElement(1, []) -] - -trace_test = [ - SystemTrace( - system_id="test_agent_upload", - partition=eventPartition_test, - current_partition_index=1 - ) -] - -# This function generates a payload from the data in the dataset to compare the sent payload against -def generate_payload_from_data(dataset: Dataset, traces: List[SystemTrace]) -> Dict: - payload = { - "traces": [trace.to_dict() for trace in traces], - "dataset": dataset.to_dict(), - } - return payload - -def createPayload_wrapper(dataset: Dataset, traces: str, base_url: str, api_key: str) -> Dict: - payload = createPayload(dataset, traces) - response = requests.Response() - response.status_code = 200 - return response, payload - -# =============================== -# Utility Functions -def createUploadDataset(agent): - dataset = Dataset( - questions=[ - TrainingQuestion( - intent="Test question", - criteria="Testing tracing functionality", - question_id=f"q{i}", - ) - for i in range(len(questions)) - ], - reward_signals=[ - RewardSignal( - question_id=f"q{i}", - system_id=agent.system_id, - reward=1.0, - annotation="Test reward", - ) - for i in range(len(questions)) - ], - ) - logger.debug( - "Dataset created with %d questions and %d reward signals", - len(dataset.questions), - len(dataset.reward_signals), - ) - return dataset - -def ask_questions(agent): - # Make multiple LM calls with environment processing - responses = [] - for i, question in enumerate(questions): - logger.info("Processing question %d: %s", i, question) - env_result = agent.process_environment(question) - logger.debug("Environment processing result: %s", env_result) - response = agent.make_lm_call(question) - responses.append(response) - logger.debug("Response received and stored: %s", response) - return responses - -# =============================== - -class TestAgent: - def __init__(self): - self.system_id = "test_agent_upload" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) - self.lm = MagicMock() - self.lm.model_name = "gpt-4o-mini-2024-07-18" - self.lm.respond_sync.return_value = mock_llm_response - logger.debug("LM initialized") - - @trace_system( - origin="agent", - event_type="lm_call", - manage_event="create", - increment_partition=True, - verbose=False, - ) - def make_lm_call(self, user_message: str) -> str: - # Create MessageInputs - message_input = MessageInputs(messages=[{"role": "user", "content": user_message}]) - # Track LM interaction using the new SynthTracker form - SynthTracker.track_lm( - messages=message_input.messages, - model_name=self.lm.model_name, - finetune=False - ) - - logger.debug("Starting LM call with message: %s", user_message) - response = self.lm.respond_sync( - system_message="You are a helpful assistant.", user_message=user_message - ) - - # Create MessageOutputs - message_output = MessageOutputs(messages=[{"role": "assistant", "content": response}]) - # Track state using the new SynthTracker form - SynthTracker.track_state( - variable_name="response", - variable_value=message_output.messages, - origin="agent", - annotation="LLM response" - ) - - logger.debug("LM response received: %s", response) - return response - - @trace_system( - origin="environment", - event_type="environment_processing", - manage_event="create", - verbose=False, - ) - def process_environment(self, input_data: str) -> dict: - # Create ArbitraryInputs - arbitrary_input = ArbitraryInputs(inputs={"input_data": input_data}) - # Track state using the new SynthTracker form - SynthTracker.track_state( - variable_name="input_data", - variable_value=arbitrary_input.inputs, - origin="environment", - annotation="Environment input data" - ) - - result = {"processed": input_data, "timestamp": time.time()} - - # Create ArbitraryOutputs - arbitrary_output = ArbitraryOutputs(outputs=result) - # Track state using the new SynthTracker form - SynthTracker.track_state( - variable_name="result", - variable_value=arbitrary_output.outputs, - origin="environment", - annotation="Environment processing result" - ) - return result - -# Use the new SynthTracker finalize method appropriately -@patch("synth_sdk.tracing.upload.send_system_traces", side_effect=createPayload_wrapper) -def test_generate_traces_sync(mock_send_system_traces): - logger.info("Starting test_generate_traces_sync") - agent = TestAgent() # Create test agent - logger.debug("Test questions initialized: %s", questions) # List of test questions - - # Ask questions - responses = ask_questions(agent) - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = createUploadDataset(agent) - - # Upload traces - logger.info("Attempting to upload traces (sync version)") - # Pytest assertion - payload_ground_truth = generate_payload_from_data(dataset, trace_test) - - _, payload_default_trace = upload(dataset=dataset, verbose=True, show_payload=True) - assert payload_ground_truth == payload_default_trace - - _, payload_trace_passed = upload( - dataset=dataset, - traces=trace_test, - verbose=True, - show_payload=True - ) - assert payload_ground_truth == payload_trace_passed - - # Finalize the tracker - SynthTracker.finalize() - logger.info("Resetting event store 0") - event_store.__init__() - -@pytest.mark.asyncio -@patch("synth_sdk.tracing.upload.send_system_traces", side_effect=createPayload_wrapper) -async def test_generate_traces_async(mock_send_system_traces): - logger.info("Starting test_generate_traces_async") - agent = TestAgent() - - # Ask questions - responses = ask_questions(agent) - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = createUploadDataset(agent) - - # Upload traces - logger.info("Attempting to upload traces (async version)") - # Pytest assertion - payload_ground_truth = generate_payload_from_data(dataset, trace_test) - - _, payload_default_trace = await upload(dataset=dataset, verbose=True, show_payload=True) - assert payload_ground_truth == payload_default_trace - - _, payload_trace_passed = await upload( - dataset=dataset, - traces=trace_test, - verbose=True, - show_payload=True - ) - assert payload_ground_truth == payload_trace_passed - - # Finalize the tracker - SynthTracker.finalize() - logger.info("Resetting event store 1") - event_store.__init__() - -# Run the tests -if __name__ == "__main__": - logger.info("Starting main execution") - asyncio.run(test_generate_traces_async()) - logger.info("Async test completed") - print("=============================================") - print("=============================================") - test_generate_traces_sync() - logger.info("Sync test completed") - logger.info("Main execution completed") \ No newline at end of file diff --git a/testing/upload_payload_test.py b/testing/upload_payload_test.py deleted file mode 100644 index 18bf99f..0000000 --- a/testing/upload_payload_test.py +++ /dev/null @@ -1,210 +0,0 @@ -from zyk import LM -from synth_sdk.tracing.decorators import trace_system, _local -from synth_sdk.tracing.trackers import SynthTracker -from synth_sdk.tracing.upload import upload, validate_json, createPayload -from synth_sdk.tracing.abstractions import ( - TrainingQuestion, RewardSignal, Dataset, SystemTrace, -) -from synth_sdk.tracing.events.store import event_store -from typing import Dict, List -import asyncio -import time -import logging -import pytest -from unittest.mock import MagicMock, patch -import requests - -# Configure logging -logging.basicConfig( - level=logging.DEBUG, # Changed from CRITICAL to DEBUG - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - -# Unit Test Configuration: -# =============================== -questions = ["What's the capital of France?"] -mock_llm_response = "The capital of France is Paris." - -# This function generates a payload from the data in the dataset to compare the sent payload against -def generate_payload_from_data(dataset: Dataset, traces: List[SystemTrace]) -> Dict: - payload = { - "traces": [trace.to_dict() for trace in traces], - "dataset": dataset.to_dict(), - } - return payload - -def createPayload_wrapper(dataset: Dataset, traces: str, base_url: str, api_key: str) -> Dict: - payload = createPayload(dataset, traces) - response = requests.Response() - response.status_code = 200 - return response, payload - -# =============================== -# Utility Functions -def createUploadDataset(agent): - dataset = Dataset( - questions=[ - TrainingQuestion( - intent="Test question", - criteria="Testing tracing functionality", - question_id=f"q{i}", - ) - for i in range(len(questions)) - ], - reward_signals=[ - RewardSignal( - question_id=f"q{i}", - system_id=agent.system_id, - reward=1.0, - annotation="Test reward", - ) - for i in range(len(questions)) - ], - ) - logger.debug( - "Dataset created with %d questions and %d reward signals", - len(dataset.questions), - len(dataset.reward_signals), - ) - return dataset - -def ask_questions(agent): - # Make multiple LM calls with environment processing - responses = [] - for i, question in enumerate(questions): - logger.info("Processing question %d: %s", i, question) - env_result = agent.process_environment(question) - logger.debug("Environment processing result: %s", env_result) - response = agent.make_lm_call(question) - responses.append(response) - logger.debug("Response received and stored: %s", response) - return responses - -# =============================== - -class TestAgent: - def __init__(self): - self.system_id = "test_agent_upload" - logger.debug("Initializing TestAgent with system_id: %s", self.system_id) - self.lm = MagicMock() - self.lm.model_name = "gpt-4o-mini-2024-07-18" - self.lm.respond_sync.return_value = mock_llm_response - logger.debug("LM initialized") - - @trace_system( - origin="agent", - event_type="lm_call", - manage_event="create", - increment_partition=True, - verbose=False, - ) - def make_lm_call(self, user_message: str) -> str: - # Track LM interaction using the new SynthTracker form - SynthTracker.track_lm( - messages=[{"role": "user", "content": user_message}], - model_name=self.lm.model_name, - finetune=False - ) - - logger.debug("Starting LM call with message: %s", user_message) - response = self.lm.respond_sync( - system_message="You are a helpful assistant.", user_message=user_message - ) - - # Track state using the new SynthTracker form - SynthTracker.track_state( - variable_name="response", - variable_value={"role": "assistant", "content": response}, - origin="agent", - annotation="LLM response" - ) - - logger.debug("LM response received: %s", response) - return response - - @trace_system( - origin="environment", - event_type="environment_processing", - manage_event="create", - verbose=False, - ) - def process_environment(self, input_data: str) -> dict: - # Track state using the new SynthTracker form - SynthTracker.track_state( - variable_name="input_data", - variable_value=input_data, - origin="environment", - annotation="Environment input data" - ) - - result = {"processed": input_data, "timestamp": time.time()} - - # Track state using the new SynthTracker form - SynthTracker.track_state( - variable_name="result", - variable_value=result, - origin="environment", - annotation="Environment processing result" - ) - return result - -@pytest.mark.asyncio -@patch("synth_sdk.tracing.upload.send_system_traces", side_effect=createPayload_wrapper) -async def test_upload_async(mock_send_system_traces): - logger.info("Starting test_upload_async") - agent = TestAgent() # Create test agent - - # Ask questions - responses = ask_questions(agent) - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = createUploadDataset(agent) - - # Upload traces - logger.info("Attempting to upload traces, async version") - response, payload, dataset, traces = await upload(dataset=dataset, verbose=True, show_payload=True) - logger.info("Upload successful!") - - # Pytest assertion - expected_payload = generate_payload_from_data(dataset, event_store.get_system_traces()) - assert payload == expected_payload - - logger.info("Resetting event store") - event_store.__init__() - -@patch("synth_sdk.tracing.upload.send_system_traces", side_effect=createPayload_wrapper) -def test_upload_sync(mock_send_system_traces): - logger.info("Starting test_upload_sync") - agent = TestAgent() - - # Ask questions - responses = ask_questions(agent) - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = createUploadDataset(agent) - - # Upload traces - logger.info("Attempting to upload traces, non-async version") - response, payload, dataset, traces = upload(dataset=dataset, verbose=True, show_payload=True) - logger.info("Upload successful!") - - # Pytest assertion - expected_payload = generate_payload_from_data(dataset, event_store.get_system_traces()) - assert payload == expected_payload - - logger.info("Resetting event store") - event_store.__init__() - -# Run the tests -if __name__ == "__main__": - logger.info("Starting main execution") - asyncio.run(test_upload_async()) - logger.info("Async test completed") - print("=============================================") - print("=============================================") - test_upload_sync() - logger.info("Sync test completed") - logger.info("Main execution completed") \ No newline at end of file