diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index 945fca6580..e2b6c9a442 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -45,7 +45,7 @@ def __init__( file_max_items: int = None, file_max_bytes: int = None, disable_compression: bool = False, - _caps: DestinationCapabilitiesContext = None + _caps: DestinationCapabilitiesContext = None, ): self.writer_spec = writer_spec if self.writer_spec.requires_destination_capabilities and not _caps: @@ -99,29 +99,17 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int # until the first chunk is written we can change the columns schema freely if columns is not None: self._current_columns = dict(columns) - - new_rows_count: int - if isinstance(item, List): - # items coming in single list will be written together, not matter how many are there - self._buffered_items.extend(item) - # update row count, if item supports "num_rows" it will be used to count items - if len(item) > 0 and hasattr(item[0], "num_rows"): - new_rows_count = sum(tbl.num_rows for tbl in item) - else: - new_rows_count = len(item) - else: - self._buffered_items.append(item) - # update row count, if item supports "num_rows" it will be used to count items - if hasattr(item, "num_rows"): - new_rows_count = item.num_rows - else: - new_rows_count = 1 + # add item to buffer and count new rows + new_rows_count = self._buffer_items_with_row_count(item) self._buffered_items_count += new_rows_count - # flush if max buffer exceeded - if self._buffered_items_count >= self.buffer_max_items: - self._flush_items() # set last modification date self._last_modified = time.time() + # flush if max buffer exceeded, the second path of the expression prevents empty data frames to pile up in the buffer + if ( + self._buffered_items_count >= self.buffer_max_items + or len(self._buffered_items) >= self.buffer_max_items + ): + self._flush_items() # rotate the file if max_bytes exceeded if self._file: # rotate on max file size @@ -218,6 +206,26 @@ def __exit__(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb if not in_exception: raise + def _buffer_items_with_row_count(self, item: TDataItems) -> int: + """Adds `item` to in-memory buffer and counts new rows, depending in item type""" + new_rows_count: int + if isinstance(item, List): + # update row count, if item supports "num_rows" it will be used to count items + if len(item) > 0 and hasattr(item[0], "num_rows"): + new_rows_count = sum(tbl.num_rows for tbl in item) + else: + new_rows_count = len(item) + # items coming in a single list will be written together, no matter how many there are + self._buffered_items.extend(item) + else: + self._buffered_items.append(item) + # update row count, if item supports "num_rows" it will be used to count items + if hasattr(item, "num_rows"): + new_rows_count = item.num_rows + else: + new_rows_count = 1 + return new_rows_count + def _rotate_file(self, allow_empty_file: bool = False) -> DataWriterMetrics: metrics = self._flush_and_close_file(allow_empty_file) self._file_name = ( diff --git a/dlt/common/data_writers/writers.py b/dlt/common/data_writers/writers.py index e1b3a5bbe3..22df7ecea4 100644 --- a/dlt/common/data_writers/writers.py +++ b/dlt/common/data_writers/writers.py @@ -37,7 +37,7 @@ from dlt.common.metrics import DataWriterMetrics from dlt.common.schema.typing import TTableSchemaColumns from dlt.common.schema.utils import is_nullable_column -from dlt.common.typing import StrAny +from dlt.common.typing import StrAny, TDataItem if TYPE_CHECKING: @@ -73,8 +73,8 @@ def __init__(self, f: IO[Any], caps: DestinationCapabilitiesContext = None) -> N def write_header(self, columns_schema: TTableSchemaColumns) -> None: # noqa pass - def write_data(self, rows: Sequence[Any]) -> None: - self.items_count += len(rows) + def write_data(self, items: Sequence[TDataItem]) -> None: + self.items_count += len(items) def write_footer(self) -> None: # noqa pass @@ -82,9 +82,9 @@ def write_footer(self) -> None: # noqa def close(self) -> None: # noqa pass - def write_all(self, columns_schema: TTableSchemaColumns, rows: Sequence[Any]) -> None: + def write_all(self, columns_schema: TTableSchemaColumns, items: Sequence[TDataItem]) -> None: self.write_header(columns_schema) - self.write_data(rows) + self.write_data(items) self.write_footer() @classmethod @@ -157,9 +157,9 @@ def writer_spec(cls) -> FileWriterSpec: class JsonlWriter(DataWriter): - def write_data(self, rows: Sequence[Any]) -> None: - super().write_data(rows) - for row in rows: + def write_data(self, items: Sequence[TDataItem]) -> None: + super().write_data(items) + for row in items: json.dump(row, self._f) self._f.write(b"\n") @@ -176,12 +176,12 @@ def writer_spec(cls) -> FileWriterSpec: class TypedJsonlListWriter(JsonlWriter): - def write_data(self, rows: Sequence[Any]) -> None: + def write_data(self, items: Sequence[TDataItem]) -> None: # skip JsonlWriter when calling super - super(JsonlWriter, self).write_data(rows) + super(JsonlWriter, self).write_data(items) # write all rows as one list which will require to write just one line # encode types with PUA characters - json.typed_dump(rows, self._f) + json.typed_dump(items, self._f) self._f.write(b"\n") @classmethod @@ -223,11 +223,11 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None: if self.writer_type == "default": self._f.write("VALUES\n") - def write_data(self, rows: Sequence[Any]) -> None: - super().write_data(rows) + def write_data(self, items: Sequence[TDataItem]) -> None: + super().write_data(items) # do not write empty rows, such things may be produced by Arrow adapters - if len(rows) == 0: + if len(items) == 0: return def write_row(row: StrAny, last_row: bool = False) -> None: @@ -245,11 +245,11 @@ def write_row(row: StrAny, last_row: bool = False) -> None: self._f.write(self.sep) # write rows - for row in rows[:-1]: + for row in items[:-1]: write_row(row) # write last row without separator so we can write footer eventually - write_row(rows[-1], last_row=True) + write_row(items[-1], last_row=True) self._chunks_written += 1 def write_footer(self) -> None: @@ -343,19 +343,19 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None: ] self.writer = self._create_writer(self.schema) - def write_data(self, rows: Sequence[Any]) -> None: - super().write_data(rows) + def write_data(self, items: Sequence[TDataItem]) -> None: + super().write_data(items) from dlt.common.libs.pyarrow import pyarrow # replace complex types with json for key in self.complex_indices: - for row in rows: + for row in items: if (value := row.get(key)) is not None: # TODO: make this configurable if value is not None and not isinstance(value, str): row[key] = json.dumps(value) - table = pyarrow.Table.from_pylist(rows, schema=self.schema) + table = pyarrow.Table.from_pylist(items, schema=self.schema) # Write self.writer.write_table(table, row_group_size=self.parquet_row_group_size) @@ -424,10 +424,10 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None: i for i, field in columns_schema.items() if field["data_type"] == "binary" ] - def write_data(self, rows: Sequence[Any]) -> None: + def write_data(self, items: Sequence[TDataItem]) -> None: # convert bytes and json if self.complex_indices or self.bytes_indices: - for row in rows: + for row in items: for key in self.complex_indices: if (value := row.get(key)) is not None: row[key] = json.dumps(value) @@ -446,9 +446,9 @@ def write_data(self, rows: Sequence[Any]) -> None: " type as binary.", ) - self.writer.writerows(rows) + self.writer.writerows(items) # count rows that got written - self.items_count += sum(len(row) for row in rows) + self.items_count += sum(len(row) for row in items) def close(self) -> None: self.writer = None @@ -472,20 +472,21 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None: # Schema will be written as-is from the arrow table self._column_schema = columns_schema - def write_data(self, rows: Sequence[Any]) -> None: - from dlt.common.libs.pyarrow import pyarrow + def write_data(self, items: Sequence[TDataItem]) -> None: + from dlt.common.libs.pyarrow import concat_batches_and_tables_in_order - for row in rows: - if not self.writer: - self.writer = self._create_writer(row.schema) - if isinstance(row, pyarrow.Table): - self.writer.write_table(row, row_group_size=self.parquet_row_group_size) - elif isinstance(row, pyarrow.RecordBatch): - self.writer.write_batch(row, row_group_size=self.parquet_row_group_size) - else: - raise ValueError(f"Unsupported type {type(row)}") - # count rows that got written - self.items_count += row.num_rows + if not items: + return + # concat batches and tables into a single one, preserving order + # pyarrow writer starts a row group for each item it writes (even with 0 rows) + # it also converts batches into tables internally. by creating a single table + # we allow the user rudimentary control over row group size via max buffered items + table = concat_batches_and_tables_in_order(items) + self.items_count += table.num_rows + if not self.writer: + self.writer = self._create_writer(table.schema) + # write concatenated tables + self.writer.write_table(table, row_group_size=self.parquet_row_group_size) def write_footer(self) -> None: if not self.writer: @@ -529,12 +530,12 @@ def __init__( def write_header(self, columns_schema: TTableSchemaColumns) -> None: self._columns_schema = columns_schema - def write_data(self, rows: Sequence[Any]) -> None: + def write_data(self, items: Sequence[TDataItem]) -> None: from dlt.common.libs.pyarrow import pyarrow import pyarrow.csv - for row in rows: - if isinstance(row, (pyarrow.Table, pyarrow.RecordBatch)): + for item in items: + if isinstance(item, (pyarrow.Table, pyarrow.RecordBatch)): if not self.writer: if self.quoting == "quote_needed": quoting = "needed" @@ -545,14 +546,14 @@ def write_data(self, rows: Sequence[Any]) -> None: try: self.writer = pyarrow.csv.CSVWriter( self._f, - row.schema, + item.schema, write_options=pyarrow.csv.WriteOptions( include_header=self.include_header, delimiter=self._delimiter_b, quoting_style=quoting, ), ) - self._first_schema = row.schema + self._first_schema = item.schema except pyarrow.ArrowInvalid as inv_ex: if "Unsupported Type" in str(inv_ex): raise InvalidDataItem( @@ -564,18 +565,18 @@ def write_data(self, rows: Sequence[Any]) -> None: ) raise # make sure that Schema stays the same - if not row.schema.equals(self._first_schema): + if not item.schema.equals(self._first_schema): raise InvalidDataItem( "csv", "arrow", "Arrow schema changed without rotating the file. This may be internal" " error or misuse of the writer.\nFirst" - f" schema:\n{self._first_schema}\n\nCurrent schema:\n{row.schema}", + f" schema:\n{self._first_schema}\n\nCurrent schema:\n{item.schema}", ) # write headers only on the first write try: - self.writer.write(row) + self.writer.write(item) except pyarrow.ArrowInvalid as inv_ex: if "Invalid UTF8 payload" in str(inv_ex): raise InvalidDataItem( @@ -596,9 +597,9 @@ def write_data(self, rows: Sequence[Any]) -> None: ) raise else: - raise ValueError(f"Unsupported type {type(row)}") + raise ValueError(f"Unsupported type {type(item)}") # count rows that got written - self.items_count += row.num_rows + self.items_count += item.num_rows def write_footer(self) -> None: if self.writer is None and self.include_header: @@ -634,8 +635,8 @@ def writer_spec(cls) -> FileWriterSpec: class ArrowToObjectAdapter: """A mixin that will convert object writer into arrow writer.""" - def write_data(self, rows: Sequence[Any]) -> None: - for batch in rows: + def write_data(self, items: Sequence[TDataItem]) -> None: + for batch in items: # convert to object data item format super().write_data(batch.to_pylist()) # type: ignore[misc] diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index 9b318245f2..3f047e275a 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -506,6 +506,30 @@ def cast_arrow_schema_types( return schema +def concat_batches_and_tables_in_order( + tables_or_batches: Iterable[Union[pyarrow.Table, pyarrow.RecordBatch]] +) -> pyarrow.Table: + """Concatenate iterable of tables and batches into a single table, preserving row order. Zero copy is used during + concatenation so schemas must be identical. + """ + batches = [] + tables = [] + for item in tables_or_batches: + if isinstance(item, pyarrow.RecordBatch): + batches.append(item) + elif isinstance(item, pyarrow.Table): + if batches: + tables.append(pyarrow.Table.from_batches(batches)) + batches = [] + tables.append(item) + else: + raise ValueError(f"Unsupported type {type(item)}") + if batches: + tables.append(pyarrow.Table.from_batches(batches)) + # "none" option ensures 0 copy concat + return pyarrow.concat_tables(tables, promote_options="none") + + class NameNormalizationCollision(ValueError): def __init__(self, reason: str) -> None: msg = f"Arrow column name collision after input data normalization. {reason}" diff --git a/docs/website/docs/dlt-ecosystem/file-formats/parquet.md b/docs/website/docs/dlt-ecosystem/file-formats/parquet.md index 5d85b7a557..30f7051386 100644 --- a/docs/website/docs/dlt-ecosystem/file-formats/parquet.md +++ b/docs/website/docs/dlt-ecosystem/file-formats/parquet.md @@ -35,6 +35,7 @@ Under the hood, `dlt` uses the [pyarrow parquet writer](https://arrow.apache.org - `flavor`: Sanitize schema or set other compatibility options to work with various target systems. Defaults to None which is **pyarrow** default. - `version`: Determine which Parquet logical types are available for use, whether the reduced set from the Parquet 1.x.x format or the expanded logical types added in later format versions. Defaults to "2.6". - `data_page_size`: Set a target threshold for the approximate encoded size of data pages within a column chunk (in bytes). Defaults to None which is **pyarrow** default. +- `row_group_size`: Set the number of rows in a row group. [See here](#row-group-size) how this can optimize parallel processing of queries on your destination over the default setting of `pyarrow`. - `timestamp_timezone`: A string specifying timezone, default is UTC. - `coerce_timestamps`: resolution to which coerce timestamps, choose from **s**, **ms**, **us**, **ns** - `allow_truncated_timestamps` - will raise if precision is lost on truncated timestamp. @@ -76,3 +77,19 @@ You can generate parquet files without timezone adjustment information in two wa 2. Set the **timestamp_timezone** to empty string (ie. `DATA_WRITER__TIMESTAMP_TIMEZONE=""`) to generate logical type without UTC adjustment. To our best knowledge, arrow will convert your timezone aware DateTime(s) to UTC and store them in parquet without timezone information. + + +### Row group size +The `pyarrow` parquet writer writes each item, i.e. table or record batch, in a separate row group. +This may lead to many small row groups which may not be optimal for certain query engines. For example, `duckdb` parallelizes on a row group. +`dlt` allows controlling the size of the row group by +[buffering and concatenating tables](../../reference/performance.md#controlling-in-memory-buffers) and batches before they are written. The concatenation is done as a zero-copy to save memory. +You can control the size of the row group by setting the maximum number of rows kept in the buffer. +```toml +[extract.data_writer] +buffer_max_items=10e6 +``` +Mind that `dlt` holds the tables in memory. Thus, 1,000,000 rows in the example above may consume a significant amount of RAM. + +`row_group_size` configuration setting has limited utility with `pyarrow` writer. It may be useful when you write single very large pyarrow tables +or when your in memory buffer is really large. \ No newline at end of file diff --git a/tests/libs/test_parquet_writer.py b/tests/libs/test_parquet_writer.py index 158ed047d8..5fdc6d6cc2 100644 --- a/tests/libs/test_parquet_writer.py +++ b/tests/libs/test_parquet_writer.py @@ -7,7 +7,7 @@ from dlt.common import pendulum, Decimal, json from dlt.common.configuration import inject_section -from dlt.common.data_writers.writers import ParquetDataWriter +from dlt.common.data_writers.writers import ArrowToParquetWriter, ParquetDataWriter from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.schema.utils import new_column from dlt.common.configuration.specs.config_section_context import ConfigSectionContext @@ -313,3 +313,84 @@ def _assert_pq_column(col: int, prec: str) -> None: _assert_pq_column(1, "milliseconds") _assert_pq_column(2, "microseconds") _assert_pq_column(3, "nanoseconds") + + +def test_arrow_parquet_row_group_size() -> None: + import pyarrow as pa + + c1 = {"col1": new_column("col1", "bigint")} + + id_ = -1 + + def get_id_() -> int: + nonlocal id_ + id_ += 1 + return id_ + + single_elem_table = lambda: pa.Table.from_pylist([{"col1": get_id_()}]) + single_elem_batch = lambda: pa.RecordBatch.from_pylist([{"col1": get_id_()}]) + + with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=2) as writer: + writer.write_data_item(single_elem_table(), columns=c1) + writer._flush_items() + assert writer._writer.items_count == 1 + + with pa.parquet.ParquetFile(writer.closed_files[0].file_path) as reader: + assert reader.num_row_groups == 1 + assert reader.metadata.row_group(0).num_rows == 1 + + # should be packages into single group + with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=2) as writer: + writer.write_data_item( + [ + single_elem_table(), + single_elem_batch(), + single_elem_batch(), + single_elem_table(), + single_elem_batch(), + ], + columns=c1, + ) + writer._flush_items() + assert writer._writer.items_count == 5 + + with pa.parquet.ParquetFile(writer.closed_files[0].file_path) as reader: + assert reader.num_row_groups == 1 + assert reader.metadata.row_group(0).num_rows == 5 + + with open(writer.closed_files[0].file_path, "rb") as f: + table = pq.read_table(f) + # all ids are there and in order + assert table["col1"].to_pylist() == list(range(1, 6)) + + # pass also empty and make it to be written with a separate call to parquet writer (by buffer_max_items) + with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=1) as writer: + pq_batch = single_elem_batch() + writer.write_data_item(pq_batch, columns=c1) + # writer._flush_items() + # assert writer._writer.items_count == 5 + # this will also create arrow schema + print(pq_batch.schema) + writer.write_data_item(pa.RecordBatch.from_pylist([], schema=pq_batch.schema), columns=c1) + + with pa.parquet.ParquetFile(writer.closed_files[0].file_path) as reader: + assert reader.num_row_groups == 2 + assert reader.metadata.row_group(0).num_rows == 1 + # row group with size 0 for an empty item + assert reader.metadata.row_group(1).num_rows == 0 + + +def test_empty_tables_get_flushed() -> None: + c1 = {"col1": new_column("col1", "bigint")} + single_elem_table = pa.Table.from_pylist([{"col1": 1}]) + empty_batch = pa.RecordBatch.from_pylist([], schema=single_elem_table.schema) + + with get_writer(ArrowToParquetWriter, file_max_bytes=2**8, buffer_max_items=2) as writer: + writer.write_data_item(empty_batch, columns=c1) + writer.write_data_item(empty_batch, columns=c1) + # written + assert len(writer._buffered_items) == 0 + writer.write_data_item(empty_batch, columns=c1) + assert len(writer._buffered_items) == 1 + writer.write_data_item(single_elem_table, columns=c1) + assert len(writer._buffered_items) == 0 diff --git a/tests/libs/test_pyarrow.py b/tests/libs/test_pyarrow.py deleted file mode 100644 index 68541e96e0..0000000000 --- a/tests/libs/test_pyarrow.py +++ /dev/null @@ -1,111 +0,0 @@ -from copy import deepcopy -from datetime import timezone, datetime, timedelta # noqa: I251 -import pyarrow as pa - -from dlt.common import pendulum -from dlt.common.libs.pyarrow import ( - from_arrow_scalar, - get_py_arrow_timestamp, - py_arrow_to_table_schema_columns, - get_py_arrow_datatype, - to_arrow_scalar, -) -from dlt.common.destination import DestinationCapabilitiesContext - -from tests.cases import TABLE_UPDATE_COLUMNS_SCHEMA - - -def test_py_arrow_to_table_schema_columns(): - dlt_schema = deepcopy(TABLE_UPDATE_COLUMNS_SCHEMA) - - caps = DestinationCapabilitiesContext.generic_capabilities() - # The arrow schema will add precision - dlt_schema["col4"]["precision"] = caps.timestamp_precision - dlt_schema["col6"]["precision"], dlt_schema["col6"]["scale"] = caps.decimal_precision - dlt_schema["col11"]["precision"] = caps.timestamp_precision - dlt_schema["col4_null"]["precision"] = caps.timestamp_precision - dlt_schema["col6_null"]["precision"], dlt_schema["col6_null"]["scale"] = caps.decimal_precision - dlt_schema["col11_null"]["precision"] = caps.timestamp_precision - - # Ignoring wei as we can't distinguish from decimal - dlt_schema["col8"]["precision"], dlt_schema["col8"]["scale"] = (76, 0) - dlt_schema["col8"]["data_type"] = "decimal" - dlt_schema["col8_null"]["precision"], dlt_schema["col8_null"]["scale"] = (76, 0) - dlt_schema["col8_null"]["data_type"] = "decimal" - # No json type - dlt_schema["col9"]["data_type"] = "text" - del dlt_schema["col9"]["variant"] - dlt_schema["col9_null"]["data_type"] = "text" - del dlt_schema["col9_null"]["variant"] - - # arrow string fields don't have precision - del dlt_schema["col5_precision"]["precision"] - - # Convert to arrow schema - arrow_schema = pa.schema( - [ - pa.field( - column["name"], - get_py_arrow_datatype(column, caps, "UTC"), - nullable=column["nullable"], - ) - for column in dlt_schema.values() - ] - ) - - result = py_arrow_to_table_schema_columns(arrow_schema) - - # Resulting schema should match the original - assert result == dlt_schema - - -def test_to_arrow_scalar() -> None: - naive_dt = get_py_arrow_timestamp(6, tz=None) - # print(naive_dt) - # naive datetimes are converted as UTC when time aware python objects are used - assert to_arrow_scalar(datetime(2021, 1, 1, 5, 2, 32), naive_dt).as_py() == datetime( - 2021, 1, 1, 5, 2, 32 - ) - assert to_arrow_scalar( - datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone.utc), naive_dt - ).as_py() == datetime(2021, 1, 1, 5, 2, 32) - assert to_arrow_scalar( - datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone(timedelta(hours=-8))), naive_dt - ).as_py() == datetime(2021, 1, 1, 5, 2, 32) + timedelta(hours=8) - - # naive datetimes are treated like UTC - utc_dt = get_py_arrow_timestamp(6, tz="UTC") - dt_converted = to_arrow_scalar( - datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone(timedelta(hours=-8))), utc_dt - ).as_py() - assert dt_converted.utcoffset().seconds == 0 - assert dt_converted == datetime(2021, 1, 1, 13, 2, 32, tzinfo=timezone.utc) - - berlin_dt = get_py_arrow_timestamp(6, tz="Europe/Berlin") - dt_converted = to_arrow_scalar( - datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone(timedelta(hours=-8))), berlin_dt - ).as_py() - # no dst - assert dt_converted.utcoffset().seconds == 60 * 60 - assert dt_converted == datetime(2021, 1, 1, 13, 2, 32, tzinfo=timezone.utc) - - -def test_from_arrow_scalar() -> None: - naive_dt = get_py_arrow_timestamp(6, tz=None) - sc_dt = to_arrow_scalar(datetime(2021, 1, 1, 5, 2, 32), naive_dt) - - # this value is like UTC - py_dt = from_arrow_scalar(sc_dt) - assert isinstance(py_dt, pendulum.DateTime) - # and we convert to explicit UTC - assert py_dt == datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone.utc) - - # converts to UTC - berlin_dt = get_py_arrow_timestamp(6, tz="Europe/Berlin") - sc_dt = to_arrow_scalar( - datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone(timedelta(hours=-8))), berlin_dt - ) - py_dt = from_arrow_scalar(sc_dt) - assert isinstance(py_dt, pendulum.DateTime) - assert py_dt.tzname() == "UTC" - assert py_dt == datetime(2021, 1, 1, 13, 2, 32, tzinfo=timezone.utc)