Skip to content

Commit

Permalink
Initial implementation of AWS CRT S3 transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
nateprewitt committed Nov 23, 2023
1 parent 8413dfe commit a5f36d3
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 1 deletion.
129 changes: 129 additions & 0 deletions boto3/crt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""
This file contains private functionality for interacting with the AWS
Common Runtime library (awscrt) in boto3.
All code contained within this file is for internal usage within this
project and is not intended for external consumption. All interfaces
contained within are subject to abrupt breaking changes.
"""

import threading

from botocore.session import Session
from s3transfer.crt import (
BotocoreCRTCredentialsWrapper,
BotocoreCRTRequestSerializer,
CRTTransferManager,
acquire_crt_s3_process_lock,
create_s3_crt_client,
)

# Singletons for CRT-backed transfers
_CRT_S3_CLIENT = None
_BOTOCORE_CRT_SERIALIZER = None

_CLIENT_CREATION_LOCK = threading.Lock()


def _create_crt_client(session, config, region_name, cred_provider):
"""Create a CRT S3 Client for file transfer.
Instantiating many of these may lead to degraded performance or
system resource exhaustion.
"""
create_crt_client_kwargs = {
'region': region_name,
'use_ssl': True,
'crt_credentials_provider': cred_provider,
}
return create_s3_crt_client(**create_crt_client_kwargs)


def _create_crt_request_serializer(session, region_name):
return BotocoreCRTRequestSerializer(
session, {'region_name': region_name, 'endpoint_url': None}
)


def _create_crt_s3_client(session, config, region_name, credentials, **kwargs):
"""Create boto3 wrapper class to manage crt lock reference and S3 client."""
lock = acquire_crt_s3_process_lock('boto3')
if lock is None:
# If we're unable to acquire the lock, we cannot
# use the CRT in this process and should default to
# the classic s3transfer manager.
return None

cred_wrapper = BotocoreCRTCredentialsWrapper(credentials)
cred_provider = cred_wrapper.to_crt_credentials_provider()
return CRTS3Client(
_create_crt_client(session, config, region_name, cred_provider),
lock,
region_name,
)


def _initialize_crt_transfer_primatives(client, config):
session = Session()
region_name = client.meta.region_name
credentials = client._get_credentials()

serializer = _create_crt_request_serializer(session, region_name)
s3_client = _create_crt_s3_client(
session, config, region_name, credentials
)
return serializer, s3_client


def get_crt_s3_client(client, config):
global _CRT_S3_CLIENT
global _BOTOCORE_CRT_SERIALIZER

with _CLIENT_CREATION_LOCK:
if _CRT_S3_CLIENT is None:
serializer, s3_client = _initialize_crt_transfer_primatives(
client, config
)
_BOTOCORE_CRT_SERIALIZER = serializer
_CRT_S3_CLIENT = s3_client

return _CRT_S3_CLIENT


class CRTS3Client:
"""
This wrapper keeps track of our underlying CRT client, the lock used to
acquire it and the region we've used to instantiate the client.
Due to limitations in the existing CRT interfaces, we can only make calls
in a single region and does not support redirects. We track the region to
ensure we don't use the CRT client when a successful request cannot be made.
"""

def __init__(self, crt_client, process_lock, region):
self.crt_client = crt_client
self.process_lock = process_lock
self.region = region


def create_crt_transfer_manager(client, config):
"""Create a CRTTransferManager for optimized data transfer."""
crt_s3_client = get_crt_s3_client(client, config)
called_region = client.meta.region_name
if crt_s3_client is not None and crt_s3_client.region == called_region:
return CRTTransferManager(
crt_s3_client.crt_client, _BOTOCORE_CRT_SERIALIZER
)
return None
27 changes: 26 additions & 1 deletion boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ def __call__(self, bytes_amount):
"""
from os import PathLike, fspath
import logging
import threading
from os import PathLike, fspath, getpid

from botocore.compat import HAS_CRT
from botocore.exceptions import ClientError
from s3transfer.exceptions import (
RetriesExceededError as S3TransferRetriesExceededError,
Expand All @@ -136,9 +139,14 @@ def __call__(self, bytes_amount):

from boto3.exceptions import RetriesExceededError, S3UploadFailedError

if HAS_CRT:
from boto3.crt import create_crt_transfer_manager

KB = 1024
MB = KB * KB

logger = logging.getLogger(__name__)


def create_transfer_manager(client, config, osutil=None):
"""Creates a transfer manager based on configuration
Expand All @@ -155,6 +163,23 @@ def create_transfer_manager(client, config, osutil=None):
:rtype: s3transfer.manager.TransferManager
:returns: A transfer manager based on parameters provided
"""
if HAS_CRT:
crt_transfer_manager = create_crt_transfer_manager(client, config)
if crt_transfer_manager is not None:
logger.debug(
f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}"
)
return crt_transfer_manager

# If we don't resolve something above, fallback to the default.
logger.debug(
f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}"
)
return _create_default_transfer_manager(client, config, osutil)


def _create_default_transfer_manager(client, config, osutil):
"""Create the default TransferManager implementation for s3transfer."""
executor_cls = None
if not config.use_threads:
executor_cls = NonThreadedExecutor
Expand Down
12 changes: 12 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import unittest
from unittest import mock

from botocore.compat import HAS_CRT


def unique_id(name):
"""
Expand Down Expand Up @@ -50,3 +52,13 @@ def setUp(self):

def tearDown(self):
self.bc_session_patch.stop()


def requires_crt(reason=None):
if reason is None:
reason = "Test requires awscrt to be installed"

def decorator(func):
return unittest.skipIf(not HAS_CRT, reason)(func)

return decorator
70 changes: 70 additions & 0 deletions tests/unit/test_crt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import boto3
import s3transfer
from botocore.compat import HAS_CRT

from tests import mock, requires_crt

if HAS_CRT:
import boto3.crt


USW2_S3_CLIENT = boto3.client('s3', region_name='us-west-2')
USE1_S3_CLIENT = boto3.client('s3', region_name='us-east-1')


@requires_crt()
def test_create_crt_transfer_manager_with_lock_in_use():
with mock.patch('boto3.crt.acquire_crt_s3_process_lock') as lock:
lock.return_value = None

# Verify we can't create a second CRT client
tm = boto3.crt.create_crt_transfer_manager(USW2_S3_CLIENT, None)
assert tm is None


@requires_crt()
def test_create_crt_transfer_manager():
tm = boto3.crt.create_crt_transfer_manager(USW2_S3_CLIENT, None)
assert isinstance(tm, s3transfer.crt.CRTTransferManager)


@requires_crt()
def test_crt_singleton_is_returned_every_call():
first_s3_client = boto3.crt.get_crt_s3_client(USW2_S3_CLIENT, None)
second_s3_client = boto3.crt.get_crt_s3_client(USW2_S3_CLIENT, None)

assert isinstance(first_s3_client, boto3.crt.CRTS3Client)
assert first_s3_client is second_s3_client
assert first_s3_client.crt_client is second_s3_client.crt_client


@requires_crt()
def test_create_crt_transfer_manager_w_client_in_wrong_region():
"""Ensure we don't return the crt transfer manager if client is in
different region. The CRT isn't able to handle region redirects and
will consistently fail.
We can remove this test once we have this fixed on the CRT side.
"""
usw2_s3_client = boto3.crt.create_crt_transfer_manager(
USW2_S3_CLIENT, None
)
assert isinstance(usw2_s3_client, boto3.crt.CRTTransferManager)

use1_s3_client = boto3.crt.create_crt_transfer_manager(
USE1_S3_CLIENT, None
)
assert use1_s3_client is None

0 comments on commit a5f36d3

Please sign in to comment.