Skip to content

Commit

Permalink
Merge pull request #4 from rishiraj/fix-upload
Browse files Browse the repository at this point in the history
Update main.py to fix uploads
  • Loading branch information
rishiraj authored Oct 10, 2024
2 parents e3cc465 + 9c3550f commit 60291c4
Showing 1 changed file with 37 additions and 15 deletions.
52 changes: 37 additions & 15 deletions firerequests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,28 @@ async def upload_file(
parallel_failures: int = 3, max_retries: int = 5, callback: Optional[Any] = None
):
file_size = os.path.getsize(file_path)
tasks = []
part_size = file_size // len(parts_urls)
last_part_size = file_size - part_size * (len(parts_urls) - 1) # To handle any remaining bytes

semaphore = asyncio.Semaphore(max_files)
tasks = []
try:
async with aiohttp.ClientSession() as session:
for part_number, part_url in enumerate(parts_urls):
start = part_number * chunk_size
# Calculate start and stop positions for each part
if part_number == len(parts_urls) - 1: # For the last part, ensure we include the remaining bytes
start = part_number * part_size
size = last_part_size
else:
start = part_number * part_size
size = part_size

# Start uploading the chunks for the given part
tasks.append(self.upload_chunk_with_retries(
session, part_url, file_path, start, chunk_size, semaphore, parallel_failures, max_retries
session, part_url, file_path, start, size, chunk_size, semaphore, parallel_failures, max_retries
))


# Track progress using a progress bar
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True, desc="Uploading on 🔥")
for chunk_result in asyncio.as_completed(tasks):
uploaded = await chunk_result
Expand All @@ -135,33 +147,43 @@ async def upload_file(
print(f"Error in upload_file: {e}")

async def upload_chunk_with_retries(
self, session: ClientSession, url: str, file_path: str, start: int, chunk_size: int,
self, session: ClientSession, url: str, file_path: str, start: int, part_size: int, chunk_size: int,
semaphore: asyncio.Semaphore, parallel_failures: int, max_retries: int
):
async with semaphore:
attempt = 0
while attempt <= max_retries:
try:
return await self.upload_chunk(session, url, file_path, start, chunk_size)
# Adjust chunk upload for each part
return await self.upload_chunks(session, url, file_path, start, part_size, chunk_size)
except Exception as e:
if attempt == max_retries:
raise e
await asyncio.sleep(self.exponential_backoff(BASE_WAIT_TIME, attempt, MAX_WAIT_TIME))
attempt += 1

async def upload_chunk(
self, session: ClientSession, url: str, file_path: str, start: int, chunk_size: int
async def upload_chunks(
self, session: ClientSession, url: str, file_path: str, start: int, part_size: int, chunk_size: int
):
try:
# Upload in smaller chunks within each part range
total_uploaded = 0
async with aiofiles.open(file_path, 'rb') as f:
await f.seek(start)
chunk = await f.read(chunk_size)
headers = {'Content-Length': str(len(chunk))}
async with session.put(url, data=chunk, headers=headers) as response:
response.raise_for_status()
return len(chunk)
while total_uploaded < part_size:
await f.seek(start + total_uploaded)
chunk = await f.read(min(chunk_size, part_size - total_uploaded))
if not chunk:
break

headers = {'Content-Length': str(len(chunk))}
async with session.put(url, data=chunk, headers=headers) as response:
response.raise_for_status()

total_uploaded += len(chunk)
return total_uploaded
except Exception as e:
print(f"Error in upload_chunk: {e}")
print(f"Error in upload_chunks: {e}")
return 0

def download(self, url: str, filename: Optional[str] = None, max_files: int = 10, chunk_size: int = 2 * 1024 * 1024):
"""
Expand Down

0 comments on commit 60291c4

Please sign in to comment.