Skip to content

Commit

Permalink
Implement test feedback, including previous test
Browse files Browse the repository at this point in the history
  • Loading branch information
SamRemis committed Feb 13, 2024
1 parent 61fbb61 commit 44e1cb6
Showing 1 changed file with 73 additions and 64 deletions.
137 changes: 73 additions & 64 deletions tests/unit/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@
SSOTokenLoader,
datetime2timestamp,
)
from tests import BaseEnvVar, IntegerRefresher, mock, skip_if_windows, unittest
from tests import (
BaseEnvVar,
IntegerRefresher,
mock,
skip_if_windows,
temporary_file,
unittest,
)

# Passed to session to keep it from finding default config file
TESTENVVARS = {'config_file': (None, 'AWS_CONFIG_FILE', None)}
Expand Down Expand Up @@ -3138,72 +3145,74 @@ def test_can_pass_basic_auth_token(self):
self.assertEqual(creds.method, 'container-role')

def test_can_pass_auth_token_from_file(self):
token_file_path = os.path.join(self.tempdir, 'token.jwt')
with open(token_file_path, 'w') as token_file:
token_file.write('Basic auth-token')
environ = {
'AWS_CONTAINER_CREDENTIALS_FULL_URI': 'http://localhost/foo',
'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE': token_file_path,
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
timestamp = (timeobj + timedelta(hours=24)).isoformat()
fetcher.retrieve_full_uri.return_value = {
"AccessKeyId": "access_key",
"SecretAccessKey": "secret_key",
"Token": "token",
"Expiration": timestamp,
}
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
with temporary_file('w') as f:
f.write('Basic auth-token')
f.flush()
environ = {
'AWS_CONTAINER_CREDENTIALS_FULL_URI': 'http://localhost/foo',
'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE': f.name,
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
timestamp = (timeobj + timedelta(hours=24)).isoformat()
fetcher.retrieve_full_uri.return_value = {
"AccessKeyId": "access_key",
"SecretAccessKey": "secret_key",
"Token": "token",
"Expiration": timestamp,
}
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()

fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo',
headers={'Authorization': 'Basic auth-token'},
)
self.assertEqual(creds.access_key, 'access_key')
self.assertEqual(creds.secret_key, 'secret_key')
self.assertEqual(creds.token, 'token')
self.assertEqual(creds.method, 'container-role')
fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo',
headers={'Authorization': 'Basic auth-token'},
)
self.assertEqual(creds.access_key, 'access_key')
self.assertEqual(creds.secret_key, 'secret_key')
self.assertEqual(creds.token, 'token')
self.assertEqual(creds.method, 'container-role')

def test_reloads_auth_token_from_file(self):
token_file_path = os.path.join(self.tempdir, 'token.jwt')
with open(token_file_path, 'w') as token_file:
token_file.write('First auth token')
environ = {
'AWS_CONTAINER_CREDENTIALS_FULL_URI': 'http://localhost/foo',
'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE': token_file_path,
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
fetcher.retrieve_full_uri.side_effect = [
{
"AccessKeyId": "first_key",
"SecretAccessKey": "first_secret",
"Token": "first_token",
"Expiration": (timeobj + timedelta(seconds=2)).isoformat(),
},
{
"AccessKeyId": "second_key",
"SecretAccessKey": "second_secret",
"Token": "second_token",
"Expiration": (timeobj + timedelta(minutes=5)).isoformat(),
},
]
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo',
headers={'Authorization': 'First auth token'},
)
time.sleep(3)
with open(token_file_path, 'w') as token_file:
token_file.write('Second auth token')
creds._refresh()
fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo',
headers={'Authorization': 'Second auth token'},
)
with temporary_file('w') as f:
f.write('First auth token')
f.flush()
environ = {
'AWS_CONTAINER_CREDENTIALS_FULL_URI': 'http://localhost/foo',
'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE': f.name,
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
fetcher.retrieve_full_uri.side_effect = [
{
"AccessKeyId": "first_key",
"SecretAccessKey": "first_secret",
"Token": "first_token",
"Expiration": (timeobj + timedelta(seconds=1)).isoformat(),
},
{
"AccessKeyId": "second_key",
"SecretAccessKey": "second_secret",
"Token": "second_token",
"Expiration": (timeobj + timedelta(minutes=5)).isoformat(),
},
]
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo',
headers={'Authorization': 'First auth token'},
)
time.sleep(1.5)
f.seek(0)
f.truncate()
f.write('Second auth token')
f.flush()
creds._refresh()
fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo',
headers={'Authorization': 'Second auth token'},
)

def test_throws_error_on_invalid_token_file(self):
token_file_path = '/some/path/token.jwt'
Expand Down

0 comments on commit 44e1cb6

Please sign in to comment.