Skip to content

Commit

Permalink
Gate async HTTP on a config (#4377)
Browse files Browse the repository at this point in the history
The async HTTP usage has been shown to cause OOMs in OSS-Fuzz that cause
significant problems,
in particular causing NTP to fail to sync.
  • Loading branch information
jonathanmetzman authored Nov 7, 2024
1 parent de8fbac commit cfd2180
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 34 deletions.
4 changes: 4 additions & 0 deletions configs/test/project.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ monitoring:
# Flag to indicate if Stackdriver monitoring is enabled or not (disabled by default).
enabled: false

async_http:
# WARNING: This can cause OOMs.
enabled: true

firebase:
# API key for Firebase (public).
api_key: firebase-api-key
Expand Down
40 changes: 31 additions & 9 deletions src/clusterfuzz/_internal/google_cloud_utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import requests
import requests.exceptions

from clusterfuzz._internal.base import memoize
from clusterfuzz._internal.base import retry
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.system import environment
from clusterfuzz._internal.system import fast_http
from clusterfuzz._internal.system import shell

from . import credentials
Expand Down Expand Up @@ -91,6 +93,8 @@
# Timeout for HTTP operations.
HTTP_TIMEOUT_SECONDS = 15

# TODO(metzman): Figure out why this works on oss-fuzz and doesn't destroy
# memory usage?
_POOL_SIZE = 16

_TRANSIENT_ERRORS = [
Expand Down Expand Up @@ -1153,6 +1157,13 @@ def blobs_bucket():
return local_config.ProjectConfig().get('blobs.bucket')


@memoize.wrap(memoize.FifoInMemory(1))
def use_async_http():
enabled = local_config.ProjectConfig().get('async_http.enabled', False)
logs.info(f'Using async HTTP: {enabled}.')
return enabled


def uworker_input_bucket():
"""Returns the bucket where uworker input is done."""
test_uworker_input_bucket = environment.get_value('TEST_UWORKER_INPUT_BUCKET')
Expand Down Expand Up @@ -1247,13 +1258,13 @@ def get_signed_download_url(remote_path, minutes=SIGNED_URL_EXPIRATION_MINUTES):
return provider.sign_download_url(remote_path, minutes=minutes)


def _error_tolerant_download_signed_url_to_file(url_and_path):
def _error_tolerant_download_signed_url_to_file(url_and_path) -> bool:
url, path = url_and_path
try:
download_signed_url_to_file(url, path)
return url
return True
except Exception:
return None
return False


def _error_tolerant_upload_signed_url(url_and_path) -> bool:
Expand Down Expand Up @@ -1308,13 +1319,24 @@ def download_signed_urls(signed_urls: List[str],
for idx in range(len(signed_urls))
]
logs.info('Downloading URLs.')
with _pool() as pool:
urls = list(
pool.map(_error_tolerant_download_signed_url_to_file,
zip(signed_urls, filepaths)))

urls_and_filepaths = list(zip(signed_urls, filepaths))

def synchronous_download_urls(urls_and_filepaths):
with _pool() as pool:
return list(
pool.map(_error_tolerant_download_signed_url_to_file,
urls_and_filepaths))

if use_async_http():
results = fast_http.download_urls(urls_and_filepaths)
else:
results = synchronous_download_urls(urls_and_filepaths)

download_results = [
SignedUrlDownloadResult(url, filepaths[idx])
for idx, url in enumerate(urls)
SignedUrlDownloadResult(*urls_and_filepaths[idx])
for idx, result in enumerate(results)
if result
]
return download_results

Expand Down
46 changes: 21 additions & 25 deletions src/clusterfuzz/_internal/system/fast_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import itertools
import multiprocessing
from typing import List
from typing import Optional
from typing import Tuple

import aiohttp

from clusterfuzz._internal.base import utils
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.system import environment

Expand All @@ -41,36 +41,35 @@ def _pool(pool_size=_POOL_SIZE):
yield futures.ProcessPoolExecutor(pool_size)


def download_urls(urls: List[str], filepaths: List[str]) -> List[Optional[str]]:
"""Downloads multiple |urls| to |filepaths| in parallel and
asynchronously. Tolerates errors. Returns a list of whether each
download was successful."""
assert len(urls) == len(filepaths)
if len(urls) == 0:
def download_urls(urls_and_filepaths: List[Tuple[str, str]]) -> List[bool]:
"""Downloads multiple urls to filepaths in parallel and asynchronously.
Tolerates errors. Returns a list of whether each download was successful."""
utils.python_gc()
if len(urls_and_filepaths) == 0:
# Do this to avoid issues with the range function.
return []
url_batches = []
url_batch_size = len(urls) // _POOL_SIZE
batches = []

batch_size = len(urls_and_filepaths) // _POOL_SIZE
# Avoid issues with range when urls is less than _POOL_SIZE.
url_batch_size = max(url_batch_size, len(urls))
batch_size = max(batch_size, len(urls_and_filepaths))
# Avoid OOMs by limiting the amount of concurrent downloads.
batch_size = min(5, batch_size)

urls_and_filepaths = list(zip(urls, filepaths))
for idx in range(0, len(urls), url_batch_size):
url_batch = urls_and_filepaths[idx:idx + url_batch_size]
url_batches.append(url_batch)
for idx in range(0, len(urls_and_filepaths), batch_size):
batch = urls_and_filepaths[idx:idx + batch_size]
batches.append(batch)
with _pool() as pool:
return list(itertools.chain(*pool.map(_download_files, url_batches)))
return list(itertools.chain(*pool.map(_download_files, batches)))


def _download_files(
urls_and_paths: List[Tuple[str, str]]) -> List[Optional[str]]:
def _download_files(urls_and_paths: List[Tuple[str, str]]) -> List[bool]:
urls, paths = list(zip(*urls_and_paths))
return asyncio.run(_async_download_files(list(urls), list(paths)))


async def _async_download_files(urls: List[str],
paths: List[str]) -> List[Optional[str]]:
paths: List[str]) -> List[bool]:
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(_error_tolerant_download_file(session, url, path))
Expand All @@ -80,13 +79,13 @@ async def _async_download_files(urls: List[str],


async def _error_tolerant_download_file(session: aiohttp.ClientSession,
url: str, path: str) -> Optional[str]:
url: str, path: str) -> bool:
try:
await _async_download_file(session, url, path)
return url
return True
except:
logs.warning(f'Failed to download {url}.')
return None
return False


async def _async_download_file(session: aiohttp.ClientSession, url: str,
Expand All @@ -102,8 +101,5 @@ async def _async_download_file(session: aiohttp.ClientSession, url: str,
status=response.status,
)
with open(path, 'wb') as fp:
while True:
chunk = await response.content.read(1024)
if not chunk:
return
async for chunk in response.content.iter_any(1024):
fp.write(chunk)

0 comments on commit cfd2180

Please sign in to comment.