Skip to content

Commit

Permalink
Refactor with lock for client initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
nateprewitt committed Nov 15, 2023
1 parent 6000077 commit 01a79a0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 33 deletions.
7 changes: 0 additions & 7 deletions boto3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
# The default Boto3 session; autoloaded when needed.
DEFAULT_SESSION = None

# The default CRT
_DEFAULT_CRT_CLIENT = None


def setup_default_session(**kwargs):
"""
Expand All @@ -37,10 +34,6 @@ def setup_default_session(**kwargs):
DEFAULT_SESSION = Session(**kwargs)


def _get_default_crt_client(**kwargs):
return _DEFAULT_CRT_CLIENT


def set_stream_logger(name='boto3', level=logging.DEBUG, format_string=None):
"""
Add a stream handler for the given name and level to the logging module.
Expand Down
44 changes: 27 additions & 17 deletions boto3/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@

from botocore.session import Session

# The default CRT
_CRT_S3_TRANSFER_MANAGER = None


def _setup_crt_s3_transfer_manager(client, config, **kwargs):
try:
lock = CrossProcessLock('boto3')
lock.acquire()
except RuntimeError:
# If we're unable to acquire the lock, we cannot
# use the CRT in this process and should default to
# the default s3transfer manager.
return None

return CRTS3TransferManager(
create_crt_transfer_manager(client, config),
lock,
client.meta.region_name
)


def get_crt_s3_transfer_manager(client, config):
global _CRT_S3_TRANSFER_MANAGER
if _CRT_S3_TRANSFER_MANAGER is None:
_CRT_S3_TRANSFER_MANAGER = _setup_crt_s3_transfer_manager(client, config)
return _CRT_S3_TRANSFER_MANAGER


class CRTS3TransferManager:

Expand All @@ -33,23 +60,6 @@ def _get_credentials(self):

def load_credentials(self):
return self._get_credentials()


def initialize_crt_s3_transfer_manager(client, config):
try:
lock = CrossProcessLock('boto3')
lock.acquire()
except RuntimeError:
# If we're unable to acquire the lock, we cannot
# use the CRT in this process and should default to
# the default s3transfer manager.
return None

return CRTS3TransferManager(
create_crt_transfer_manager(client, config),
lock,
client.meta.region_name
)


def create_crt_transfer_manager(client, config):
Expand Down
24 changes: 15 additions & 9 deletions boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def __call__(self, bytes_amount):
"""
import logging
import threading
from os import PathLike, fspath, getpid

from botocore.compat import HAS_CRT
Expand All @@ -137,15 +139,16 @@ def __call__(self, bytes_amount):
from s3transfer.utils import OSUtils

from boto3.exceptions import RetriesExceededError, S3UploadFailedError
from boto3 import _DEFAULT_CRT_CLIENT

if HAS_CRT:
from boto3 import _get_default_crt_client
from boto3.crt import initialize_crt_s3_transfer_manager
from boto3.crt import get_crt_s3_transfer_manager
client_creation_lock = threading.Lock()

KB = 1024
MB = KB * KB

logger = logging.getLogger(__name__)


def create_transfer_manager(client, config, osutil=None):
"""Creates a transfer manager based on configuration
Expand All @@ -163,15 +166,18 @@ def create_transfer_manager(client, config, osutil=None):
:returns: A transfer manager based on parameters provided
"""
if HAS_CRT:
crt_transfer_manager = _get_default_crt_client()
if crt_transfer_manager is None:
global _DEFAULT_CRT_CLIENT
_DEFAULT_CRT_CLIENT = initialize_crt_s3_transfer_manager(client, config)
crt_transfer_manager = _DEFAULT_CRT_CLIENT
if crt_transfer_manager is not None and client.meta.region_name == crt_transfer_manager._region:
with client_creation_lock:
crt_transfer_manager = get_crt_s3_transfer_manager(client, config)
if (
crt_transfer_manager is not None
and crt_transfer_manager._region == client.meta.region_name
):
logger.debug(f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}")
return crt_transfer_manager._crt_s3_client


# If we don't resolve something above, fallback to the default.
logger.debug(f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}")
return _create_default_transfer_manager(client, config, osutil)


Expand Down

0 comments on commit 01a79a0

Please sign in to comment.