Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plan tasks for TableScan #1427

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

ConeyLiu
Copy link
Contributor

Now, we only support plan files. Plan tasks(split large file based on split_offset) would be more useful when we want to read in parallel.

@kevinjqliu kevinjqliu self-requested a review December 13, 2024 16:25
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! This is awesome

This PR modifies DataScan.plan_files so that reads will default to use the new CombinedFileScanTask

Nevermind, it does not, but we might want to default to CombinedFileScanTask
https://github.com/apache/iceberg-python/pull/1427/files#diff-23e8153e0fd497a9212215bd2067068f3b56fa071770c7ef326db3d3d03cee9bR1380-R1452

pyiceberg/manifest.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Show resolved Hide resolved
pyiceberg/table/__init__.py Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
tests/integration/test_reads.py Show resolved Hide resolved
tests/integration/test_reads.py Show resolved Hide resolved

@dataclass(init=False)
class CombinedFileScanTask(ScanTask):
"""Task representing combined multiple file scan tasks."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could use a better docstring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated doc, pls take another look

)
return property_as_int(self.options, TableProperties.READ_SPLIT_OPEN_FILE_COST, table_value) # type: ignore

def plan_task(self) -> Iterable[CombinedFileScanTask]:
Copy link

@corleyma corleyma Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's intentional that we're not actually calling this in any of the methods (like to_arrow, etc) that actually execute a scan?

If the plan is to call this in to_arrow eventually, it would be good to have some benchmarks with realistic latencies (e.g., against actual object storage).

If there is no plan to call this directly in pyiceberg, I wonder who the intended consumers of this API would be? I would expect most query engines -- distributed or otherwise -- to have their own notions for how to optimize scan planning.

Copy link

@corleyma corleyma Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, assuming there's still some future plan to offload expensive stuff like data scanning to a rust core, I do wonder if we want to commit to exposing additional scan planning APIs that may not align with whatever query engine gets embedded in pyiceberg?

In the case that this is primarily intended to be for the benefit of pyiceberg's own scan execution I would consider making this a private API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I originally thought the plan_task function was being used. Looks like I confused plan_files with plan_task. Currently plan_files is used by the read path
https://grep.app/search?q=plan_files&filter[repo][0]=apache/iceberg-python&filter[path][0]=pyiceberg/table/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's intentional that we're not actually calling this in any of the methods (like to_arrow, etc) that actually execute a scan?

Yes, I am not sure of the initial intention why we read the full file. So keep it not changing.

If there is no plan to call this directly in pyiceberg, I wonder who the intended consumers of this API would be? I would expect most query engines -- distributed or otherwise -- to have their own notions for how to optimize scan planning.

I met OOM errors recently when we read the iceberg table with ray in distributed. Also, the parallelism is limited by the number of files when planning with files(files are rewritten into larger one, eg 512MB or 1GB). Like Spark/Flink, I think the plan_tasks should be useful for distributed reading.

In the case that this is primarily intended to be for the benefit of pyiceberg's own scan execution I would consider making this a private API.

The distributed engine such as ray/daft would have to implement themself if they want to do plan_tasks. It would be better to put it into pyiceberg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same concern as @corleyma. For example, Java splits on row-groups to make full use of the parallelism of Spark, Trino, Hive etc. On a local machine, it makes more sense to just plow through the file itself, preferably using a native reader like PyArrow or Iceberg-Rust in the future.

There are a lot of details around this API, for example, a task might point to a row-group that doesn't contain any relevant information, and we don't know until we open the file itself.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we perform our own Plan Files splitting and merging. We create an iterator from pyiceberg where @kevinjqliu linked the code. But during our own optimization passes we perform fusion of files as well as splitting by row group.

The logic for that can be found here
https://github.com/Eventual-Inc/Daft/blob/e148248dae8af90c8993d2ec6b2f471521c0a7f2/src/daft-scan/src/scan_task_iters.rs#L181

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @samster25 One question, do you plan purely on the data provided by PyIceberg, or do first fetch the footers of the Parquet files?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko If have metadata from pyiceberg like num_rows, file_size, num_row_groups (from split_offsets), we skip fetching the Parquet Footer and speed up planning.

On that note, we would also love to have distinct_counts in the DataFile as well. This would help us estimate memory usage when planning a query in Daft.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re what @ConeyLiu was mentioning about the issues he encountered with Ray, I think it would be reasonable for pyiceberg to have this more nuanced distributed planning logic in a private API and (if we wanted to), to expose a RayDatasets implementation that uses the private API... but I'd vote against committing to this planning logic in a public API for the reasons outlined above (engines want to bring their own logic, and pyiceberg may well want to significantly change this logic for itself in the future to the extent that we do continue to embed more query engine functionality for convenience).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback.

@Fokko Fokko self-requested a review December 14, 2024 11:09
read_split_open_file_cost = 1
read_split_lookback = 5

plan_tasks = list(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu added more tests here, pls give more advice.

@ConeyLiu
Copy link
Contributor Author

Thanks @kevinjqliu @corleyma for your review. Pls take another look, thanks a lot.

@kevinjqliu
Copy link
Contributor

Thanks everyone for the great discussion here! To summarize the thread above, I think the main concern here is around exposing this functionality as part of PyIceberg's DataScan public API.

Currently, PyIceberg's read path assumes to be run on a single node machine. This assumption is embedded in the way we plan and execute the read path. For example, we use multi-threading and not (yet) multi-processing.

As an Iceberg library for the python ecosystem, I do believe there's value for PyIceberg to provide the helper methods for distributed processing.
I'd like to propose a path forward for this PR. Instead of integrating the feature directly into DataScan, what if we create a new class (or subclass DataScan) specifically for distributed processing? We can encapsulate the planning and execution logic inside this new class. The goal is to provide primitives to allow work to be distributed.

I imagine something like this if I want to integrate with Ray.

table = catalog.load_table("blah")
tasks = table.distributed_scan()
futures = [process_task_remote.remote(task) for task in tasks] # Submit tasks to Ray for parallel processing
results = ray.get(futures)

The distributed_scan is a helper function in the Table class. Alternatively, we can not expose this at all and have users call the new class directly.

tasks = DistributedTableScan(table).plan_files()

Looking forward to hear what people think!

@ConeyLiu
Copy link
Contributor Author

@kevinjqliu, thanks for the summary and the great proposal. Another option would be to provide a plan_util to support plan tasks like the Java-side implementation.

@Fokko
Copy link
Contributor

Fokko commented Dec 23, 2024

Just to add some context:

Currently, PyIceberg's read path assumes to be run on a single node machine. This assumption is embedded in the way we plan and execute the read path. For example, we use multi-threading and not (yet) multi-processing.

We explored multi-processing, but it didn't give any advantages in terms of performance and introduced a lot of issues around pickling between the processes. Therefore we rely on multi-threading and most of the Arrow stuff happens with the GIL released anyway (diminishing the upside of multi-processing).

tasks = DistributedTableScan(table).plan_files()

I don't think this is the right path forward as it creates more confusion. It is still part of the public API, but it introduces another way of doing the same thing. Also, would this return the same set of files? I agree with what @corleyma said, and I think we can add this API, just not make it public. I think this might create a lot of confusion for the user, for example, PyIceberg itself uses plan_files, Daft uses plan_files with their logic to combine certain tasks/files, and Ray would use the plan_tasks where it respects the newly added configuration. Why not add this logic to combine the tasks directly with Ray?

Another option would be to provide a plan_util to support plan tasks like the Java-side implementation.

I like that idea. Could you elaborate on that? Are you suggesting something like:

def convert_files_to_tasks(
    files: Iterable[FileScanTask],
    target_split_size: int,
    split_file_open_cost: int,
    loop_back: int
) -> List[CombinedFileScanTask]:
    ...

I think I like that idea

@kevinjqliu
Copy link
Contributor

Another option would be to provide a plan_util to support plan tasks like the Java-side implementation.

Thats interesting, I like that too. "util" suggests that its optional and augmentation of existing functionality

The distributed_scan is a helper function in the Table class.

My suggestion was also around an optional/helper function. But i think utils provides better separation

@ConeyLiu
Copy link
Contributor Author

I like that idea. Could you elaborate on that?

Yes, the following is what I implemented in the internal repo:

def plan_scan_tasks(
    files: Iterable[FileScanTask], split_size: int, loop_back: int, open_file_cost: int
) -> List[CombinedFileScanTask]:
    """Plan balanced combined tasks for this scan by splitting large and combining small tasks.

    Returns:
        List of CombinedFileScanTasks
    """

    def split(task: FileScanTask) -> List[FileScanTask]:
        data_file = task.file
        if not data_file.file_format.is_splittable() or not data_file.split_offsets:
            return [task]

        split_offsets = data_file.split_offsets
        if not all(split_offsets[i] <= split_offsets[i + 1] for i in range(len(split_offsets) - 1)):
            # split offsets must be strictly ascending
            return [task]

        all_tasks = []
        for i in range(len(split_offsets) - 1):
            all_tasks.append(
                FileScanTask(data_file, task.delete_files, split_offsets[i], split_offsets[i + 1] - split_offsets[i])
            )

        all_tasks.append(
            FileScanTask(data_file, task.delete_files, split_offsets[-1], data_file.file_size_in_bytes - split_offsets[-1])
        )

        return all_tasks

    def weight_func(task: FileScanTask) -> int:
        return max(task.size_in_bytes(), (1 + len(task.delete_files)) * open_file_cost)

    split_file_tasks = list(itertools.chain.from_iterable(map(split, files)))
    packing_iterator = PackingIterator(split_file_tasks, split_size, loop_back, weight_func, False)

    return list(map(_merge_split_task, packing_iterator))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants