diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 5f9178ebf9..c9b6316f59 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -30,7 +30,7 @@ Tuple, TypeVar, ) -from urllib.parse import quote +from urllib.parse import quote_plus from pydantic import ( BeforeValidator, @@ -234,9 +234,11 @@ def partition_to_path(self, data: Record, schema: Schema) -> str: partition_field = self.fields[pos] value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=data[pos]) - value_str = quote(value_str, safe="") + value_str = quote_plus(value_str, safe="") value_strs.append(value_str) - field_strs.append(partition_field.name) + + field_str = quote_plus(partition_field.name, safe="") + field_strs.append(field_str) path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)]) return path diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index 29f664909c..1ac808c7d0 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -18,7 +18,7 @@ import uuid from datetime import date, datetime, timedelta, timezone from decimal import Decimal -from typing import Any, List +from typing import Any, Callable, List, Optional import pytest from pyspark.sql import SparkSession @@ -70,6 +70,7 @@ NestedField(field_id=12, name="fixed_field", field_type=FixedType(16), required=False), NestedField(field_id=13, name="decimal_field", field_type=DecimalType(5, 2), required=False), NestedField(field_id=14, name="uuid_field", field_type=UUIDType(), required=False), + NestedField(field_id=15, name="special#string+field", field_type=StringType(), required=False), ) @@ -77,7 +78,7 @@ @pytest.mark.parametrize( - "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", + "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification, make_compatible_name", [ # # Identity Transform ( @@ -98,6 +99,7 @@ VALUES (false, 'Boolean field set to false'); """, + None, ), ( [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], @@ -117,6 +119,7 @@ VALUES ('sample_string', 'Another string value') """, + None, ), ( [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], @@ -136,6 +139,7 @@ VALUES (42, 'Associated string value for int 42') """, + None, ), ( [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], @@ -155,6 +159,7 @@ VALUES (1234567890123456789, 'Associated string value for long 1234567890123456789') """, + None, ), ( [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], @@ -178,6 +183,7 @@ # VALUES # (3.14, 'Associated string value for float 3.14') # """ + None, ), ( [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], @@ -201,6 +207,7 @@ # VALUES # (6.282, 'Associated string value for double 6.282') # """ + None, ), ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], @@ -220,6 +227,7 @@ VALUES (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') """, + None, ), ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], @@ -239,6 +247,7 @@ VALUES (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') """, + None, ), ( [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], @@ -263,6 +272,7 @@ # VALUES # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') # """ + None, ), ( [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], @@ -287,6 +297,7 @@ # VALUES # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') # """ + None, ), ( [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], @@ -306,6 +317,7 @@ VALUES (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') """, + None, ), ( [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], @@ -325,6 +337,7 @@ VALUES ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') """, + None, ), ( [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], @@ -344,6 +357,7 @@ VALUES (CAST('example' AS BINARY), 'Associated string value for binary `example`') """, + None, ), ( [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], @@ -363,6 +377,7 @@ VALUES (123.45, 'Associated string value for decimal 123.45') """, + None, ), # # Year Month Day Hour Transform # Month Transform @@ -384,6 +399,7 @@ VALUES (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at 2023-01-01 11:55:59.999999'); """, + None, ), ( [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], @@ -403,6 +419,7 @@ VALUES (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); """, + None, ), ( [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], @@ -422,6 +439,7 @@ VALUES (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); """, + None, ), # Year Transform ( @@ -442,6 +460,7 @@ VALUES (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); """, + None, ), ( [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], @@ -461,6 +480,7 @@ VALUES (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); """, + None, ), ( [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], @@ -480,6 +500,7 @@ VALUES (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); """, + None, ), # # Day Transform ( @@ -500,6 +521,7 @@ VALUES (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); """, + None, ), ( [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], @@ -519,6 +541,7 @@ VALUES (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); """, + None, ), ( [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], @@ -538,6 +561,7 @@ VALUES (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); """, + None, ), # Hour Transform ( @@ -558,6 +582,7 @@ VALUES (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); """, + None, ), ( [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], @@ -577,6 +602,7 @@ VALUES (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); """, + None, ), # Truncate Transform ( @@ -597,6 +623,7 @@ VALUES (12345, 'Sample data for int'); """, + None, ), ( [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], @@ -616,6 +643,7 @@ VALUES (4294967297, 'Sample data for long'); """, + None, ), ( [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], @@ -635,6 +663,7 @@ VALUES ('abcdefg', 'Another sample for string'); """, + None, ), ( [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], @@ -654,6 +683,7 @@ VALUES (678.90, 'Associated string value for decimal 678.90') """, + None, ), ( [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], @@ -673,6 +703,7 @@ VALUES (binary('HELLOICEBERG'), 'Sample data for binary'); """, + None, ), # Bucket Transform ( @@ -693,6 +724,7 @@ VALUES (10, 'Integer with value 10'); """, + None, ), # Test multiple field combinations could generate the Partition record and hive partition path correctly ( @@ -721,6 +753,27 @@ VALUES (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); """, + None, + ), + # Test that special characters are URL-encoded + ( + [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], + ["special string"], + Record(**{"special#string+field": "special string"}), # type: ignore + "special%23string%2Bfield=special+string", + f"""CREATE TABLE {identifier} ( + `special#string+field` string + ) + USING iceberg + PARTITIONED BY ( + identity(`special#string+field`) + ) + """, + f"""INSERT INTO {identifier} + VALUES + ('special string') + """, + lambda name: name.replace("#", "_x23").replace("+", "_x2B"), ), ], ) @@ -734,6 +787,7 @@ def test_partition_key( expected_hive_partition_path_slice: str, spark_create_table_sql_for_justification: str, spark_data_insert_sql_for_justification: str, + make_compatible_name: Optional[Callable[[str], str]], ) -> None: partition_field_values = [PartitionFieldValue(field, value) for field, value in zip(partition_fields, partition_values)] spec = PartitionSpec(*partition_fields) @@ -768,5 +822,12 @@ def test_partition_key( spark_path_for_justification = ( snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path ) - assert spark_partition_for_justification == expected_partition_record + # Special characters in partition value are sanitized when written to the data file's partition field + # Use `make_compatible_name` to match the sanitize behavior + sanitized_record = ( + Record(**{make_compatible_name(k): v for k, v in vars(expected_partition_record).items()}) + if make_compatible_name + else expected_partition_record + ) + assert spark_partition_for_justification == sanitized_record assert expected_hive_partition_path_slice in spark_path_for_justification diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index d7425bc351..127d57a798 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -16,7 +16,8 @@ # under the License. from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.transforms import BucketTransform, TruncateTransform +from pyiceberg.transforms import BucketTransform, IdentityTransform, TruncateTransform +from pyiceberg.typedef import Record from pyiceberg.types import ( IntegerType, NestedField, @@ -118,6 +119,27 @@ def test_deserialize_partition_spec() -> None: ) +def test_partition_spec_to_path() -> None: + schema = Schema( + NestedField(field_id=1, name="str", field_type=StringType(), required=False), + NestedField(field_id=2, name="other_str", field_type=StringType(), required=False), + NestedField(field_id=3, name="int", field_type=IntegerType(), required=True), + ) + + spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="my#str%bucket"), + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="other str+bucket"), + PartitionField(source_id=3, field_id=1002, transform=BucketTransform(num_buckets=25), name="my!int:bucket"), + spec_id=3, + ) + + record = Record(**{"my#str%bucket": "my+str", "other str+bucket": "( )", "my!int:bucket": 10}) # type: ignore + + # Both partition field names and values should be URL encoded, with spaces mapping to plus signs, to match the Java + # behaviour: https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204 + assert spec.partition_to_path(record, schema) == "my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10" + + def test_partition_type(table_schema_simple: Schema) -> None: spec = PartitionSpec( PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"),