Skip to content

Commit

Permalink
updates to handle passing url
Browse files Browse the repository at this point in the history
  • Loading branch information
DoKu88 committed Dec 16, 2024
1 parent f5a202c commit 1112e3d
Showing 1 changed file with 48 additions and 34 deletions.
82 changes: 48 additions & 34 deletions synth_sdk/tracing/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def createPayload(dataset: Dataset, traces: List[SystemTrace]) -> Dict[str, Any]
}
return payload

async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str):
async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace]):
# 1. Create S3 client
s3_client = boto3.client(
"s3",
Expand All @@ -56,7 +56,7 @@ async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], bas

# 3. Create bucket path with datetime
bucket_name = os.getenv("WASABI_BUCKET_NAME")
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
current_time = datetime.now().strftime("%Y_%m_%d_%H%M%S")
bucket_path = f"uploads/upload_{current_time}.json"

# 4. Upload payload to Wasabi
Expand All @@ -81,46 +81,60 @@ async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], bas
'signed_url': signed_url
}

async def send_system_traces_s3_wrapper(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str):
result = await send_system_traces_s3(dataset, traces, base_url, api_key, upload_id)
bucket_path, signed_url = result['bucket_path'], result['signed_url']
def send_system_traces_s3_wrapper(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str):
# Create async function that contains all async operations
async def _async_operations():

upload_id = await get_upload_id(base_url, api_key)
result = await send_system_traces_s3(dataset, traces)
bucket_path, signed_url = result['bucket_path'], result['signed_url']

token_url = f"{base_url}/v1/auth/token"
token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key})
token_response.raise_for_status()
access_token = token_response.json()["access_token"]

api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}/{signed_url}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}
upload_id = await get_upload_id(base_url, api_key)

try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()
token_url = f"{base_url}/v1/auth/token"
token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key})
token_response.raise_for_status()
access_token = token_response.json()["access_token"]

upload_id = response.json()["upload_id"]
signed_url = response.json()["signed_url"]
message = response.json()["message"]
api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}"
data = {"signed_url": signed_url}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}

print(f"Message: {message}")
print(f"Upload ID retrieved: {upload_id}")
print(f"Signed URL: {signed_url}")
try:
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()

return upload_id, signed_url
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred: {e}")
raise
except Exception as e:
logging.error(f"An error occurred: {e}")
raise
upload_id = response.json()["upload_id"]
signed_url = response.json()["signed_url"]
message = response.json()["message"]

print(f"Message: {message}")
print(f"Upload ID retrieved: {upload_id}")
print(f"Signed URL: {signed_url}")

return upload_id, signed_url
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred: {e}")
raise
except Exception as e:
logging.error(f"An error occurred: {e}")
raise

return bucket_path, signed_url
# Run the async operations in an event loop
if not is_event_loop_running():
# If no event loop is running, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(_async_operations())
finally:
loop.close()
else:
# If an event loop is already running, use it
loop = asyncio.get_event_loop()
return loop.run_until_complete(_async_operations())

async def send_system_traces(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str):
async with aiohttp.ClientSession() as session:
Expand Down Expand Up @@ -463,7 +477,7 @@ def upload_helper(dataset: Dataset, traces: List[SystemTrace]=[], verbose: bool
print("Upload format validation successful")

# Send to server
response, payload = send_system_traces_chunked(
response, payload = send_system_traces_s3_wrapper(
dataset=dataset,
traces=traces,
base_url="https://agent-learning.onrender.com",
Expand Down

0 comments on commit 1112e3d

Please sign in to comment.