From 1112e3d960f7dc9df3612e9273f7f7e586d95093 Mon Sep 17 00:00:00 2001 From: doku88 Date: Sun, 15 Dec 2024 16:59:50 -0800 Subject: [PATCH] updates to handle passing url --- synth_sdk/tracing/upload.py | 82 ++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 34 deletions(-) diff --git a/synth_sdk/tracing/upload.py b/synth_sdk/tracing/upload.py index 3a35b54..00ea21e 100644 --- a/synth_sdk/tracing/upload.py +++ b/synth_sdk/tracing/upload.py @@ -41,7 +41,7 @@ def createPayload(dataset: Dataset, traces: List[SystemTrace]) -> Dict[str, Any] } return payload -async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str): +async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace]): # 1. Create S3 client s3_client = boto3.client( "s3", @@ -56,7 +56,7 @@ async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], bas # 3. Create bucket path with datetime bucket_name = os.getenv("WASABI_BUCKET_NAME") - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + current_time = datetime.now().strftime("%Y_%m_%d_%H%M%S") bucket_path = f"uploads/upload_{current_time}.json" # 4. Upload payload to Wasabi @@ -81,46 +81,60 @@ async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], bas 'signed_url': signed_url } -async def send_system_traces_s3_wrapper(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str): - result = await send_system_traces_s3(dataset, traces, base_url, api_key, upload_id) - bucket_path, signed_url = result['bucket_path'], result['signed_url'] +def send_system_traces_s3_wrapper(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str): + # Create async function that contains all async operations + async def _async_operations(): - upload_id = await get_upload_id(base_url, api_key) + result = await send_system_traces_s3(dataset, traces) + bucket_path, signed_url = result['bucket_path'], result['signed_url'] - token_url = f"{base_url}/v1/auth/token" - token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key}) - token_response.raise_for_status() - access_token = token_response.json()["access_token"] - - api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}/{signed_url}" - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {access_token}", - } + upload_id = await get_upload_id(base_url, api_key) - try: - response = requests.get(api_url, headers=headers) - response.raise_for_status() + token_url = f"{base_url}/v1/auth/token" + token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key}) + token_response.raise_for_status() + access_token = token_response.json()["access_token"] - upload_id = response.json()["upload_id"] - signed_url = response.json()["signed_url"] - message = response.json()["message"] + api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}" + data = {"signed_url": signed_url} + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {access_token}", + } - print(f"Message: {message}") - print(f"Upload ID retrieved: {upload_id}") - print(f"Signed URL: {signed_url}") + try: + response = requests.post(api_url, headers=headers, json=data) + response.raise_for_status() - return upload_id, signed_url - except requests.exceptions.HTTPError as e: - logging.error(f"HTTP error occurred: {e}") - raise - except Exception as e: - logging.error(f"An error occurred: {e}") - raise + upload_id = response.json()["upload_id"] + signed_url = response.json()["signed_url"] + message = response.json()["message"] + print(f"Message: {message}") + print(f"Upload ID retrieved: {upload_id}") + print(f"Signed URL: {signed_url}") + return upload_id, signed_url + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise + except Exception as e: + logging.error(f"An error occurred: {e}") + raise - return bucket_path, signed_url + # Run the async operations in an event loop + if not is_event_loop_running(): + # If no event loop is running, create a new one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(_async_operations()) + finally: + loop.close() + else: + # If an event loop is already running, use it + loop = asyncio.get_event_loop() + return loop.run_until_complete(_async_operations()) async def send_system_traces(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str): async with aiohttp.ClientSession() as session: @@ -463,7 +477,7 @@ def upload_helper(dataset: Dataset, traces: List[SystemTrace]=[], verbose: bool print("Upload format validation successful") # Send to server - response, payload = send_system_traces_chunked( + response, payload = send_system_traces_s3_wrapper( dataset=dataset, traces=traces, base_url="https://agent-learning.onrender.com",