Skip to content

Commit

Permalink
created infra uploading via wasabi & passing signedLink
Browse files Browse the repository at this point in the history
  • Loading branch information
DoKu88 committed Dec 16, 2024
1 parent 7cc11b0 commit f5a202c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ dependencies = [
"pytest-asyncio>=0.24.0",
"apropos-ai>=0.4.5",
"craftaxlm>=0.0.5",
"boto3>=1.35.71",
"botocore>=1.35.71",
"tqdm>=4.66.4",
"aiohttp>=3.8.6"
]
classifiers = []

Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ alembic>=1.13.3
zyk>=0.2.10
#synth_sdk>=0.2.61
#smallbench>=0.2.11
boto3>=1.35.71
botocore>=1.35.71
tqdm>=4.66.4
aiohttp>=3.8.6
84 changes: 84 additions & 0 deletions synth_sdk/tracing/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
from pympler import asizeof
from tqdm import tqdm
import aiohttp
import boto3
from datetime import datetime

# NOTE: This may cause memory issues in the future
def validate_json(data: dict) -> None:
#Validate that a dictionary contains only JSON-serializable values.

Expand All @@ -38,6 +41,87 @@ def createPayload(dataset: Dataset, traces: List[SystemTrace]) -> Dict[str, Any]
}
return payload

async def send_system_traces_s3(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str):
# 1. Create S3 client
s3_client = boto3.client(
"s3",
endpoint_url="https://s3.wasabisys.com",
aws_access_key_id=os.getenv("WASABI_ACCESS_KEY"),
aws_secret_access_key=os.getenv("WASABI_SECRET_KEY"),
)

# 2. Create and validate payload
payload = createPayload(dataset, traces)
validate_json(payload)

# 3. Create bucket path with datetime
bucket_name = os.getenv("WASABI_BUCKET_NAME")
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
bucket_path = f"uploads/upload_{current_time}.json"

# 4. Upload payload to Wasabi
s3_client.put_object(
Bucket=bucket_name,
Key=bucket_path,
Body=json.dumps(payload),
)

# 5. Generate a signed URL
signed_url = s3_client.generate_presigned_url(
'get_object',
Params={
'Bucket': bucket_name,
'Key': bucket_path
},
ExpiresIn=14400 # URL expires in 4 hours
)

return {
'bucket_path': bucket_path,
'signed_url': signed_url
}

async def send_system_traces_s3_wrapper(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str):
result = await send_system_traces_s3(dataset, traces, base_url, api_key, upload_id)
bucket_path, signed_url = result['bucket_path'], result['signed_url']

upload_id = await get_upload_id(base_url, api_key)

token_url = f"{base_url}/v1/auth/token"
token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key})
token_response.raise_for_status()
access_token = token_response.json()["access_token"]

api_url = f"{base_url}/v1/uploads/process-upload/{upload_id}/{signed_url}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}

try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()

upload_id = response.json()["upload_id"]
signed_url = response.json()["signed_url"]
message = response.json()["message"]

print(f"Message: {message}")
print(f"Upload ID retrieved: {upload_id}")
print(f"Signed URL: {signed_url}")

return upload_id, signed_url
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred: {e}")
raise
except Exception as e:
logging.error(f"An error occurred: {e}")
raise



return bucket_path, signed_url

async def send_system_traces(dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str):
async with aiohttp.ClientSession() as session:
# Get token
Expand Down

0 comments on commit f5a202c

Please sign in to comment.