diff --git a/404.html b/404.html new file mode 100644 index 0000000000..c8ef0d37a5 --- /dev/null +++ b/404.html @@ -0,0 +1,2203 @@ + + + +
+ + + + + + + + + + + + + + +PyIceberg is based around catalogs to load tables. First step is to instantiate a catalog that loads tables. Let's use the following configuration to define a catalog called prod
:
This information must be placed inside a file called .pyiceberg.yaml
located either in the $HOME
or %USERPROFILE%
directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME
directory (if the corresponding environment variable is set).
For more details on possible configurations refer to the specific page.
+Then load the prod
catalog:
from pyiceberg.catalog import load_catalog
+
+catalog = load_catalog(
+ "docs",
+ **{
+ "uri": "http://127.0.0.1:8181",
+ "s3.endpoint": "http://127.0.0.1:9000",
+ "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ }
+)
+
If the catalog has not been initialized before, you need to run:
+ +Let's create a namespace:
+ +And then list them:
+ +And then list tables in the namespace:
+ +To create a table from a catalog:
+from pyiceberg.schema import Schema
+from pyiceberg.types import (
+ TimestampType,
+ FloatType,
+ DoubleType,
+ StringType,
+ NestedField,
+ StructType,
+)
+
+schema = Schema(
+ NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
+ NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
+ NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
+ NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
+ NestedField(
+ field_id=5,
+ name="details",
+ field_type=StructType(
+ NestedField(
+ field_id=4, name="created_by", field_type=StringType(), required=False
+ ),
+ ),
+ required=False,
+ ),
+)
+
+from pyiceberg.partitioning import PartitionSpec, PartitionField
+from pyiceberg.transforms import DayTransform
+
+partition_spec = PartitionSpec(
+ PartitionField(
+ source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
+ )
+)
+
+from pyiceberg.table.sorting import SortOrder, SortField
+from pyiceberg.transforms import IdentityTransform
+
+# Sort on the symbol
+sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
+
+catalog.create_table(
+ identifier="docs_example.bids",
+ schema=schema,
+ location="s3://pyiceberg",
+ partition_spec=partition_spec,
+ sort_order=sort_order,
+)
+
Loading the bids
table:
table = catalog.load_table("docs_example.bids")
+# Equivalent to:
+table = catalog.load_table(("docs_example", "bids"))
+# The tuple syntax can be used if the namespace or table contains a dot.
+
This returns a Table
that represents an Iceberg table that can be queried and altered.
To load a table directly from a metadata file (i.e., without using a catalog), you can use a StaticTable
as follows:
from pyiceberg.table import StaticTable
+
+static_table = StaticTable.from_metadata(
+ "s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
+)
+
The static-table is considered read-only.
+PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
+In the examples below, the .update_schema()
is called from the table itself.
You can also initiate a transaction if you want to make more changes than just evolving the schema:
+with table.transaction() as transaction:
+ with transaction.update_schema() as update_schema:
+ update.add_column("some_other_field", IntegerType(), "doc")
+ # ... Update properties etc
+
Using add_column
you can add a column, without having to worry about the field-id:
with table.update_schema() as update:
+ update.add_column("retries", IntegerType(), "Number of retries to place the bid")
+ # In a struct
+ update.add_column("details.confirmed_by", StringType(), "Name of the exchange")
+
Renaming a field in an Iceberg table is simple:
+with table.update_schema() as update:
+ update.rename_column("retries", "num_retries")
+ # This will rename `confirmed_by` to `exchange`
+ update.rename_column("properties.confirmed_by", "exchange")
+
Move a field inside of struct:
+with table.update_schema() as update:
+ update.move_first("symbol")
+ update.move_after("bid", "ask")
+ # This will move `confirmed_by` before `exchange`
+ update.move_before("details.created_by", "details.exchange")
+
Update a fields' type, description or required.
+with table.update_schema() as update:
+ # Promote a float to a double
+ update.update_column("bid", field_type=DoubleType())
+ # Make a field optional
+ update.update_column("symbol", required=False)
+ # Update the documentation
+ update.update_column("symbol", doc="Name of the share on the exchange")
+
Be careful, some operations are not compatible, but can still be done at your own risk by setting allow_incompatible_changes
:
with table.update_schema(allow_incompatible_changes=True) as update:
+ # Incompatible change, cannot require an optional field
+ update.update_column("symbol", required=True)
+
Delete a field, careful this is a incompatible change (readers/writers might expect this field):
+with table.update_schema(allow_incompatible_changes=True) as update:
+ update.delete_column("some_field")
+
Set and remove properties through the Transaction
API:
with table.transaction() as transaction:
+ transaction.set_properties(abc="def")
+
+assert table.properties == {"abc": "def"}
+
+with table.transaction() as transaction:
+ transaction.remove_properties("abc")
+
+assert table.properties == {}
+
Or, without context manager:
+table = table.transaction().set_properties(abc="def").commit_transaction()
+
+assert table.properties == {"abc": "def"}
+
+table = table.transaction().remove_properties("abc").commit_transaction()
+
+assert table.properties == {}
+
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
+from pyiceberg.catalog import load_catalog
+from pyiceberg.expressions import GreaterThanOrEqual
+
+catalog = load_catalog("default")
+table = catalog.load_table("nyc.taxis")
+
+scan = table.scan(
+ row_filter=GreaterThanOrEqual("trip_distance", 10.0),
+ selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
+ limit=100,
+)
+
+# Or filter using a string predicate
+scan = table.scan(
+ row_filter="trip_distance > 10.0",
+)
+
+[task.file.file_path for task in scan.plan_files()]
+
The low level API plan_files
methods returns a set of tasks that provide the files that might contain matching rows:
[
+ "s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet"
+]
+
In this case it is up to the engine itself to filter the file itself. Below, to_arrow()
and to_duckdb()
that already do this for you.
Requirements
+This requires pyarrow
to be installed.
Using PyIceberg it is filter out data from a huge table and pull it into a PyArrow table:
+table.scan(
+ row_filter=GreaterThanOrEqual("trip_distance", 10.0),
+ selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
+).to_arrow()
+
This will return a PyArrow table:
+pyarrow.Table
+VendorID: int64
+tpep_pickup_datetime: timestamp[us, tz=+00:00]
+tpep_dropoff_datetime: timestamp[us, tz=+00:00]
+----
+VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
+tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
+tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]
+
This will only pull in the files that that might contain matching rows.
+Requirements
+This requires pandas
to be installed.
PyIceberg makes it easy to filter out data from a huge table and pull it into a Pandas dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost.
+table.scan(
+ row_filter="trip_distance >= 10.0",
+ selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
+).to_pandas()
+
This will return a Pandas dataframe:
+ VendorID tpep_pickup_datetime tpep_dropoff_datetime
+0 2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00
+1 1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00
+2 2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00
+3 1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00
+4 1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00
+... ... ... ...
+116976 2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00
+116977 2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00
+116978 2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00
+116979 2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00
+116980 2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00
+
+[116981 rows x 3 columns]
+
It is recommended to use Pandas 2 or later, because it stores the data in an Apache Arrow backend which avoids copies of data.
+Requirements
+This requires DuckDB to be installed.
+A table scan can also be converted into a in-memory DuckDB table:
+con = table.scan(
+ row_filter=GreaterThanOrEqual("trip_distance", 10.0),
+ selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
+).to_duckdb(table_name="distant_taxi_trips")
+
Using the cursor that we can run queries on the DuckDB table:
+print(
+ con.execute(
+ "SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4"
+ ).fetchall()
+)
+[
+ (datetime.timedelta(seconds=1194),),
+ (datetime.timedelta(seconds=1118),),
+ (datetime.timedelta(seconds=1697),),
+ (datetime.timedelta(seconds=1581),),
+]
+
Requirements
+This requires Ray to be installed.
+A table scan can also be converted into a Ray dataset:
+ray_dataset = table.scan(
+ row_filter=GreaterThanOrEqual("trip_distance", 10.0),
+ selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
+).to_ray()
+
This will return a Ray dataset:
+Dataset(
+ num_blocks=1,
+ num_rows=1168798,
+ schema={
+ VendorID: int64,
+ tpep_pickup_datetime: timestamp[us, tz=UTC],
+ tpep_dropoff_datetime: timestamp[us, tz=UTC]
+ }
+)
+
Using Ray Dataset API to interact with the dataset:
+print(ray_dataset.take(2))
+[
+ {
+ "VendorID": 2,
+ "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50),
+ "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31),
+ },
+ {
+ "VendorID": 2,
+ "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3),
+ "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18),
+ },
+]
+
{"use strict";/*!
+ * escape-html
+ * Copyright(c) 2012-2013 TJ Holowaychuk
+ * Copyright(c) 2015 Andreas Lubbe
+ * Copyright(c) 2015 Tiancheng "Timothy" Gu
+ * MIT Licensed
+ */var $a=/["'&<>]/;Un.exports=Ra;function Ra(e){var t=""+e,r=$a.exec(t);if(!r)return t;var o,n="",i=0,s=0;for(i=r.index;i