Skip to content

Commit

Permalink
feat(ingestion/s3): ignore depth mismatched path
Browse files Browse the repository at this point in the history
  • Loading branch information
eagle-25 committed Jan 14, 2025
1 parent ebbadf6 commit 651892a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 17 deletions.
18 changes: 13 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,26 @@ def get_folder_info(
Returns:
List[Folder]: A list of Folder objects representing the partitions found.
"""

def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool:
allowed = path_spec_.allowed(s3_uri)
if not allowed:
logger.debug(f"File {s3_uri} not allowed and skipping")
return allowed

Check warning on line 874 in metadata-ingestion/src/datahub/ingestion/source/s3/source.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/s3/source.py#L870-L874

Added lines #L870 - L874 were not covered by tests

s3_objects = (

Check warning on line 876 in metadata-ingestion/src/datahub/ingestion/source/s3/source.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/s3/source.py#L876

Added line #L876 was not covered by tests
obj
for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
if _is_allowed_path(path_spec, f"s3://{obj.bucket_name}/{obj.key}")
)

partitions: List[Folder] = []
s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
for key, group in group_s3_objects_by_dirname(s3_objects).items():
file_size = 0
creation_time = None
modification_time = None

for item in group:
file_path = self.create_s3_path(item.bucket_name, item.key)
if not path_spec.allowed(file_path):
logger.debug(f"File {file_path} not allowed and skipping")
continue
file_size += item.size
if creation_time is None or item.last_modified < creation_time:
creation_time = item.last_modified
Expand Down
31 changes: 31 additions & 0 deletions metadata-ingestion/tests/unit/data_lake/test_path_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest

from datahub.ingestion.source.data_lake_common.path_spec import PathSpec


@pytest.mark.parametrize(
"include, s3_uri, expected",
[
(
"s3://bucket/{table}/{partition0}/*.csv",
"s3://bucket/table/p1/test.csv",
True,
),
(
"s3://bucket/{table}/{partition0}/*.csv",
"s3://bucket/table/p1/p2/test.csv",
False,
),
],
)
def test_allowed_ignores_depth_mismatch(
include: str, s3_uri: str, expected: bool
) -> None:
# arrange
path_spec = PathSpec(
include=include,
table_name="{table}",
)

# act, assert
assert path_spec.allowed(s3_uri) == expected
65 changes: 53 additions & 12 deletions metadata-ingestion/tests/unit/s3/test_s3_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator


def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)


def test_partition_comparator_numeric_folder_name():
folder1 = "3"
folder2 = "12"
Expand Down Expand Up @@ -249,18 +261,6 @@ def test_get_folder_info():
"""
Test S3Source.get_folder_info returns the latest file in each folder
"""

def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)

# arrange
path_spec = PathSpec(
include="s3://my-bucket/{table}/{partition0}/*.csv",
Expand Down Expand Up @@ -303,3 +303,44 @@ def _get_s3_source(path_spec_: PathSpec) -> S3Source:
assert len(res) == 2
assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv"
assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv"


def test_get_folder_info_ignores_disallowed_path(
caplog: pytest.LogCaptureFixture,
) -> None:
"""
Test S3Source.get_folder_info skips disallowed files and logs a message
"""
# arrange
path_spec = Mock(
spec=PathSpec,
include="s3://my-bucket/{table}/{partition0}/*.csv",
table_name="{table}",
)

bucket = Mock()
bucket.objects.filter().page_size = Mock(
return_value=[
Mock(
bucket_name="my-bucket",
key="my-folder/ignore/this/path/0001.csv",
creation_time=datetime(2025, 1, 1, 1),
last_modified=datetime(2025, 1, 1, 1),
size=100,
),
]
)

# act
path_spec.allowed = Mock(return_value=False)

res = _get_s3_source(path_spec).get_folder_info(
path_spec, bucket, prefix="/my-folder"
)

# assert
expected_called_s3_uri = "s3://my-bucket/my-folder/ignore/this/path/0001.csv"

path_spec.allowed.assert_called_once_with(expected_called_s3_uri)
assert f"File {expected_called_s3_uri} not allowed and skipping" in caplog.text
assert not any(res)

0 comments on commit 651892a

Please sign in to comment.