Skip to content

Commit

Permalink
feat(ingest/s3): support .gzip and fix decompression bug (#8990)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 12, 2023
1 parent c381806 commit 8813ae2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@
logger: logging.Logger = logging.getLogger(__name__)

SUPPORTED_FILE_TYPES: List[str] = ["csv", "tsv", "json", "parquet", "avro"]
SUPPORTED_COMPRESSIONS: List[str] = ["gz", "bz2"]

# These come from the smart_open library.
SUPPORTED_COMPRESSIONS: List[str] = [
"gz",
"bz2",
# We have a monkeypatch on smart_open that aliases .gzip to .gz.
"gzip",
]


class PathSpec(ConfigModel):
Expand Down
8 changes: 7 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple

import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -120,6 +121,9 @@
}
PAGE_SIZE = 1000

# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
Expand Down Expand Up @@ -407,7 +411,9 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
table_data.full_path, "rb", transport_params={"client": s3_client}
)
else:
file = open(table_data.full_path, "rb")
# We still use smart_open here to take advantage of the compression
# capabilities of smart_open.
file = smart_open(table_data.full_path, "rb")

fields = []

Expand Down

0 comments on commit 8813ae2

Please sign in to comment.