Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plan tasks for TableScan #1427

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-ke

Iceberg tables support table properties to configure table behavior.

### Read options

| Key | Options | Default | Description |
|--------------------------------|---------------|--------------------|-----------------------------------------------------------------------------------------------------|
| `read.split.target-size` | Size in bytes | 134217728 (128 MB) | Target size when combining data input splits with `plan_tasks` |
| `read.split.planning-lookback` | Integer | 10 | Number of bins to consider when combining input splits with `plan_tasks` |
| `read.split.open-file-cost` | Integer | 4194304 (4 MB) | The estimated cost to open a file, used as a minimum weight when combining splits with `plan_tasks` |

### Write options

| Key | Options | Default | Description |
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def _missing_(cls, value: object) -> Union[None, str]:
return member
return None

def is_splittable(self) -> bool:
return self == FileFormat.AVRO or self == FileFormat.PARQUET or self == FileFormat.ORC

def __repr__(self) -> str:
"""Return the string representation of the FileFormat class."""
return f"FileFormat.{self.name}"
Expand Down
96 changes: 94 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@
from pyiceberg.types import (
strtobool,
)
from pyiceberg.utils.bin_packing import ListPacker as ListPacker
from pyiceberg.utils.bin_packing import PackingIterator
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.deprecated import deprecated, deprecation_message
from pyiceberg.utils.properties import property_as_bool
from pyiceberg.utils.properties import property_as_bool, property_as_int

if TYPE_CHECKING:
import daft
Expand Down Expand Up @@ -191,6 +193,15 @@ class TableProperties:
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE

READ_SPLIT_SIZE = "read.split.target-size"
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
READ_SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024 # 128 MB

READ_SPLIT_LOOKBACK = "read.split.planning-lookback"
READ_SPLIT_LOOKBACK_DEFAULT = 10

READ_SPLIT_OPEN_FILE_COST = "read.split.open-file-cost"
READ_SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024 # 4 MB

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2
Expand Down Expand Up @@ -1229,7 +1240,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:


class ScanTask(ABC):
pass
@abstractmethod
def size_in_bytes(self) -> int: ...


@dataclass(init=False)
Expand All @@ -1253,6 +1265,26 @@ def __init__(
self.start = start or 0
self.length = length or data_file.file_size_in_bytes

def size_in_bytes(self) -> int:
return self.length + sum(f.file_size_in_bytes for f in self.delete_files)


@dataclass(init=False)
class CombinedFileScanTask(ScanTask):
"""Task representing combined multiple file scan tasks.

Used in plan_tasks. File can be split into multiple FileScanTask based on
split_offsets and then combined into read.split.target-size.
"""

tasks: List[FileScanTask]

def __init__(self, tasks: List[FileScanTask]) -> None:
self.tasks = tasks

def size_in_bytes(self) -> int:
return sum(f.size_in_bytes() for f in self.tasks)


def _open_manifest(
io: FileIO,
Expand Down Expand Up @@ -1423,6 +1455,66 @@ def plan_files(self) -> Iterable[FileScanTask]:
for data_entry in data_entries
]

def _target_split_size(self) -> int:
table_value = property_as_int(
self.table_metadata.properties, TableProperties.READ_SPLIT_SIZE, TableProperties.READ_SPLIT_SIZE_DEFAULT
)
return property_as_int(self.options, TableProperties.READ_SPLIT_SIZE, table_value) # type: ignore

def _loop_back(self) -> int:
table_value = property_as_int(
self.table_metadata.properties, TableProperties.READ_SPLIT_LOOKBACK, TableProperties.READ_SPLIT_LOOKBACK_DEFAULT
)
return property_as_int(self.options, TableProperties.READ_SPLIT_LOOKBACK, table_value) # type: ignore

def _split_open_file_cost(self) -> int:
table_value = property_as_int(
self.table_metadata.properties,
TableProperties.READ_SPLIT_OPEN_FILE_COST,
TableProperties.READ_SPLIT_OPEN_FILE_COST_DEFAULT,
)
return property_as_int(self.options, TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore

def plan_task(self) -> Iterable[CombinedFileScanTask]:
Copy link

@corleyma corleyma Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's intentional that we're not actually calling this in any of the methods (like to_arrow, etc) that actually execute a scan?

If the plan is to call this in to_arrow eventually, it would be good to have some benchmarks with realistic latencies (e.g., against actual object storage).

If there is no plan to call this directly in pyiceberg, I wonder who the intended consumers of this API would be? I would expect most query engines -- distributed or otherwise -- to have their own notions for how to optimize scan planning.

Copy link

@corleyma corleyma Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, assuming there's still some future plan to offload expensive stuff like data scanning to a rust core, I do wonder if we want to commit to exposing additional scan planning APIs that may not align with whatever query engine gets embedded in pyiceberg?

In the case that this is primarily intended to be for the benefit of pyiceberg's own scan execution I would consider making this a private API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I originally thought the plan_task function was being used. Looks like I confused plan_files with plan_task. Currently plan_files is used by the read path
https://grep.app/search?q=plan_files&filter[repo][0]=apache/iceberg-python&filter[path][0]=pyiceberg/table/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's intentional that we're not actually calling this in any of the methods (like to_arrow, etc) that actually execute a scan?

Yes, I am not sure of the initial intention why we read the full file. So keep it not changing.

If there is no plan to call this directly in pyiceberg, I wonder who the intended consumers of this API would be? I would expect most query engines -- distributed or otherwise -- to have their own notions for how to optimize scan planning.

I met OOM errors recently when we read the iceberg table with ray in distributed. Also, the parallelism is limited by the number of files when planning with files(files are rewritten into larger one, eg 512MB or 1GB). Like Spark/Flink, I think the plan_tasks should be useful for distributed reading.

In the case that this is primarily intended to be for the benefit of pyiceberg's own scan execution I would consider making this a private API.

The distributed engine such as ray/daft would have to implement themself if they want to do plan_tasks. It would be better to put it into pyiceberg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same concern as @corleyma. For example, Java splits on row-groups to make full use of the parallelism of Spark, Trino, Hive etc. On a local machine, it makes more sense to just plow through the file itself, preferably using a native reader like PyArrow or Iceberg-Rust in the future.

There are a lot of details around this API, for example, a task might point to a row-group that doesn't contain any relevant information, and we don't know until we open the file itself.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we perform our own Plan Files splitting and merging. We create an iterator from pyiceberg where @kevinjqliu linked the code. But during our own optimization passes we perform fusion of files as well as splitting by row group.

The logic for that can be found here
https://github.com/Eventual-Inc/Daft/blob/e148248dae8af90c8993d2ec6b2f471521c0a7f2/src/daft-scan/src/scan_task_iters.rs#L181

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @samster25 One question, do you plan purely on the data provided by PyIceberg, or do first fetch the footers of the Parquet files?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko If have metadata from pyiceberg like num_rows, file_size, num_row_groups (from split_offsets), we skip fetching the Parquet Footer and speed up planning.

On that note, we would also love to have distinct_counts in the DataFile as well. This would help us estimate memory usage when planning a query in Daft.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re what @ConeyLiu was mentioning about the issues he encountered with Ray, I think it would be reasonable for pyiceberg to have this more nuanced distributed planning logic in a private API and (if we wanted to), to expose a RayDatasets implementation that uses the private API... but I'd vote against committing to this planning logic in a public API for the reasons outlined above (engines want to bring their own logic, and pyiceberg may well want to significantly change this logic for itself in the future to the extent that we do continue to embed more query engine functionality for convenience).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback.

"""Plan balanced combined tasks for this scan by splitting large and combining small tasks.

Returns:
List of CombinedFileScanTasks
"""
split_size = self._target_split_size()
loop_back = self._loop_back()
open_file_cost = self._split_open_file_cost()

def split(task: FileScanTask) -> List[FileScanTask]:
data_file = task.file
if not data_file.file_format.is_splittable() or not data_file.split_offsets:
return [task]

split_offsets = data_file.split_offsets
if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)):
# split offsets must be strictly ascending
return [task]

all_tasks = []
for i in range(len(split_offsets) - 1):
all_tasks.append(
FileScanTask(data_file, task.delete_files, split_offsets[i], split_offsets[i + 1] - split_offsets[i])
)

all_tasks.append(
FileScanTask(data_file, task.delete_files, split_offsets[-1], data_file.file_size_in_bytes - split_offsets[-1])
)

return all_tasks

def weight_func(task: FileScanTask) -> int:
return max(task.size_in_bytes(), (1 + len(task.delete_files)) * open_file_cost)
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved

file_tasks = self.plan_files()
split_file_tasks = list(itertools.chain.from_iterable(map(split, file_tasks)))
packing_iterator = PackingIterator(split_file_tasks, split_size, loop_back, weight_func, False)
return list(map(CombinedFileScanTask, packing_iterator))

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.

Expand Down
74 changes: 74 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
StringType,
TimestampType,
)
from pyiceberg.utils.bin_packing import PackingIterator
from pyiceberg.utils.concurrent import ExecutorFactory

DEFAULT_PROPERTIES = {"write.parquet.compression-codec": "zstd"}
Expand Down Expand Up @@ -873,3 +874,76 @@ def test_table_scan_empty_table(catalog: Catalog) -> None:
result_table = tbl.scan().to_arrow()

assert len(result_table) == 0


@pytest.mark.integration
def test_plan_tasks(session_catalog: Catalog) -> None:
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
from pyiceberg.table import TableProperties

table_name = "default.test_plan_tasks"
try:
session_catalog.drop_table(table_name)
except NoSuchTableError:
pass # Just to make sure that the table doesn't exist

tbl = session_catalog.create_table(
table_name,
Schema(
NestedField(1, "number", LongType()),
),
properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
)

# Write 10 row groups, that should end up as 10 batches
entries = 10
tbl.append(
pa.Table.from_pylist(
[
{
"number": number,
}
for number in range(entries)
],
)
)

ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
assert len(tbl.inspect.files()) == 1

plan_files = list(tbl.scan().plan_files())
assert len(plan_files) == 1
data_file = plan_files[0].file
assert data_file.split_offsets is not None and len(data_file.split_offsets) == 10

plan_tasks = list(tbl.scan(options={TableProperties.READ_SPLIT_SIZE: 1}).plan_task())
assert len(plan_tasks) == 10

split_offsets = []
for task in plan_tasks:
assert len(task.tasks) == 1
split_offsets.append(task.tasks[0].start)

assert split_offsets == plan_files[0].file.split_offsets

split_sizes = []
for i in range(1, len(data_file.split_offsets)):
split_sizes.append(data_file.split_offsets[i] - data_file.split_offsets[i - 1])

split_sizes.append(data_file.file_size_in_bytes - data_file.split_offsets[-1])

read_split_size = int(data_file.file_size_in_bytes / 4)
read_split_open_file_cost = 1
read_split_lookback = 5

plan_tasks = list(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu added more tests here, pls give more advice.

tbl.scan(
options={
TableProperties.READ_SPLIT_SIZE: read_split_size,
TableProperties.READ_SPLIT_OPEN_FILE_COST: read_split_open_file_cost,
TableProperties.READ_SPLIT_LOOKBACK: read_split_lookback,
}
).plan_task()
)

assert len(plan_tasks) == len(
list(PackingIterator(split_sizes, read_split_size, read_split_lookback, lambda size: size, False))
)
Loading