Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count rows as a metadata only operation #1388

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

tusharchou
Copy link

closes issue: Count rows as a metadata-only operation #1223

@@ -1493,6 +1496,13 @@ def to_ray(self) -> ray.data.dataset.Dataset:

return ray.data.from_arrow(self.to_arrow())

def count(self) -> int:
res = 0
tasks = self.plan_files()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this count will not be accurate when there are deletes files

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu thank you for the review. I am trying to account for positional deletes, do you have a suggestion on how that can be achieved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be widely off, not just because the merge-on-read deletes, but because plan_files returns all the files that (might) contain relevant rows. For example, if it cannot be determined if has relevant data, it will be returned by plan_files.

I think there are two ways forward:

  • One is similar to how we handle deletes. For deletes, we check if the whole file matches, if this is the case, then we can simply drop the file from the metadata. You can find the code here. If a file fully matches, is is valid to use task.file.record_count. We would need to extend this to also see if there are also merge-on-read deletes as Kevin already mentioned, or just fail when there are positional deletes.
  • A cleaner option, but this is a bit more work, but pretty exciting, would be to include the residual-predicate in the FileScanTask. When we run a query, like day_added = 2024-12-01 and user_id = 10, then the day_added = 2024-12-01 might be satisfied with the partitioning already. This is the case when the table is partitioned by day, and we know that all the data in the file evaluates true for day_added = 2024-12-01, then we need to open the file, and filter for user_id = 10. If we would leave out the user_id = 10, then it would be ALWAYS_TRUE, and then we know that we can just use task.file.record_count. This way we could very easily loop over the .plan_files():
def count(self) -> int:
    res = 0
    tasks = self.plan_files()
    for task in tasks:
        if task.residual == ALWAYS_TRUE and len(task.delete_files):
            res += task.file.record_count
            else:
            # pseudocode, open the table, and apply the filter and deletes
            res += len(_table_from_scan_task(task))
    return res

To get to the second step, we first have to port the ResidualEvaluator. The java code can be found here, including some excellent tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Fokko I have added Residual Evaluator with Tests.
Now I am trying to create the breaking tests for count where delete has occurred and the counts should differ

@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_delete_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
    identifier = "default.table_partitioned_delete"

    run_spark_commands(
        spark,
        [
            f"DROP TABLE IF EXISTS {identifier}",
            f"""
            CREATE TABLE {identifier} (
                number_partitioned  int,
                number              int
            )
            USING iceberg
            PARTITIONED BY (number_partitioned)
            TBLPROPERTIES(
                'format-version' = 2,
                'write.delete.mode'='merge-on-read',
                'write.update.mode'='merge-on-read',
                'write.merge.mode'='merge-on-read'
            )
        """,
            f"""
            INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
        """,
            # Generate a positional delete
            f"""
            DELETE FROM {identifier} WHERE number = 30
        """,
        ],
    )

    tbl = session_catalog.load_table(identifier)

    # Assert that there is just a single Parquet file, that has one merge on read file
    files = list(tbl.scan().plan_files())
    assert len(files) == 1
    assert len(files[0].delete_files) == 1
    # Will rewrite a data file without the positional delete
    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10], "number": [20, 40]}
    assert tbl.scan().count() == 2
    assert tbl.scan().count() == 1

    tbl.delete(EqualTo("number", 40))

    # One positional delete has been added, but an OVERWRITE status is set
    # https://github.com/apache/iceberg/issues/10122
    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}
    assert tbl.scan().count() == 1

@jayceslesar
Copy link
Contributor

Question: Does it make sense to expose this as the __len__ dunder method because python? It would just return the self.count()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants