-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMP-97070 Move Spark metadata upon transfer completion #8
base: AMP-96980
Are you sure you want to change the base?
Conversation
print(f'Identified bucket: {bucket}, prefix: {prefix}') | ||
|
||
# List all files in the s3_path directory | ||
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter='/') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this return folders as well? If only files are returned, then we are good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only files. Same for listing objects in our Java repo using Amplitude's S3 wrapper
The example job mentioned in the description has the logs of what was discovered (note that /meta
folder exists at the point of execution of this method and is not listed)
if '://' not in s3_uri_with_spark_metadata: | ||
raise ValueError(f'Invalid s3 URI: {s3_uri_with_spark_metadata}. Expected to contain "://".') | ||
bucket, prefix = s3_uri_with_spark_metadata.split('://')[1].split('/', 1) | ||
bucket = replace_double_slashes_with_single_slash(bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious where double slashes are coming from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for bucket, it shouldn't
It's just a sanitization in case some unnormalized input for s3uri is provided (e.g. s3:////bucket////prefix////
)
expected_output = '/path/to/file/with/double/slashes/end/' | ||
self.assertEqual(expected_output, replace_double_slashes_with_single_slash(input_string)) | ||
|
||
def test_move_spark_metadata_to_separate_s3_folder(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Let's hold on merge until AMP-96980 is approved so the other PR can still focus on event mutation stuff.
Description
Move spark transfer metadata to a subdirectory of the target export S3 location
Testing
DatabricksToS3WorkerServiceIntegrationTest.testUnloadData