Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE Partitions in AWS Athena #555

Open
4 of 14 tasks
sh-rp opened this issue Aug 14, 2023 · 8 comments
Open
4 of 14 tasks

HIVE Partitions in AWS Athena #555

sh-rp opened this issue Aug 14, 2023 · 8 comments

Comments

@sh-rp
Copy link
Collaborator

sh-rp commented Aug 14, 2023

We want to enable the user to make use of AWS Athena partitions. For this we will most likely need to change the schema a bit and add the option for the user to define partitions on the resources.

Analyzing the discussion on this topic, we can implement this in 3 steps of increasing complexity (3 PRs expected)

support iceberg tables

here partitioning is available out of the box on the create table command. partitions are created based on the data in the tables

support regular, hive tables

after loading a file ADD PARTITION must be executed to map file into a partition. paritions are created based on the metadata that must be available via file layout placeholders
so if a given partition column is declared in the adapter - the same column name must be a placeholder in the file layout (or be present in placeholders list)

  • * we use ADD PARTITON as in Scenario 2 here https://docs.aws.amazon.com/athena/latest/ug/partitions.html - after adding any new file to the bucket
  • * we need render the partition column value using path_utils.py that are used to render file names. see how filesystem does that. we may need to give athena access to rendering file names via staging_config
    note: I'm not sure that the table must have a partition column physically or we just may declare it and fill the data from layout placeholder. that needs to be tested

hive partitioning based on table data

when #1382 is merged we can easily add more table formats (ie. hive) and use it for partitioning. form our discussion we know that MSCK REPAIR TABLE costs too much for incremental load so I do not yet see a value in it


this is old disussion
~Tasks:

  • Alter schema to have an order of partition keys (possibly replace the "bool" hint with an "int" hint which is null or 0 if this column should not be a partition, and a number if it should which then defines the order?)
  • Expose partition config on the resource as array of strings which are the column names?
  • When creating parquet files, respect the partitions and create a file for every partition key combination and upload that to the right path. Pyarrow can do this for us, so we need to change our load jobs to also take folders, not only file paths.
  • Make the partitions known to Athena, either we can run explicit ddl commands, but it should also be possible to infer the partitions with a repair command, the latter is more costly though, so doing this with our schema makes sense. This means we will have schema m migration operations that not only add columns but also partition definitions.

To think about:

  • How to guard against the user uploading to the same bucket with different partitionk key settings?
  • How to migrate existing schemas if we change the type of the partition settings?
  • How to behave if there are partitions defined, but the yielded resource item does not have a value for this column? Will there always be a "None" path for those cases? Will this be queryable by Athena?
  • If we use hive style partitions, will the resulting paths be more or less universal, or do we need to use different patterns for different data lake providers?~
@rudolfix
Copy link
Collaborator

@sh-rp let's take a look at native partitioning that pyarrow has. OFC in that case we'll need to deal with several files in folders so our jobs mechanism are getting complicated (but why not to have jobs that are folders ;> OMG)

in Weaviate PR we are adding a concept of a destination adapter that injects additional information in schema to be interpreted by loader/normalizer (and a data writer - also has access to it).

so it all fits together but still looks like bigger project

@rudolfix rudolfix moved this from Todo to Planned in dlt core library Aug 14, 2023
@rudolfix rudolfix moved this from Planned to Todo in dlt core library Sep 27, 2023
@gamgi
Copy link

gamgi commented Nov 14, 2023

Documenting a potential use-case. I'm loading files from S3 + SFTP + API sources to S3 destination.

I'd like to use properties from the source (not data itself) to design the partition layout in the destination. The properties may be resource arguments, or indirectly source metadata like object path and/or file name.

Examples:

  • Single daily object from SFTP
    • sftp://example.com/foo/bar/report_20231114000000.csv --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report.csv
  • Multiple daily objects from S3
    • s3://example-bucket/foo/bar/report_20231114000000.csv --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report_20231114000000.csv
    • s3://example-bucket/foo/bar/report_20231114005000.csv --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report_20231114005000.csv
  • Single daily object from API call
    • https://example.com/api/report?date=2023-11-14 --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report.csv

How others have (not) done it:

Implications for this ticket:

  • Partition config should support resource parameters
    • For example, if date (or year, month, date) is a parameter to a resource, that should be available in the partition layout.
  • Partition config should support resource metadata, such as object path or creation date.
    • For example, in order to replicate the partitioning from the source to destination.
  • Consider: Is partitioning a generic issue, or something purely related to filesystem source/destination, see API case above?

TL;DR I'm looking to partition based on metadata, not the data itself.

@MRocholl
Copy link

Just wanted to add a thing to think about.

Iceberg tables make use of hidden partitioning. And keep track of different partitioning over time.
This might be much simpler to implement than the classical hive table.

https://trino.io/blog/2021/05/03/a-gentle-introduction-to-iceberg#hidden-partitions

Considering this issue is about AWS Athena partitions I do not believe this is out of scope and might be the "simpler" part of the issue.

Best

@Pipboyguy
Copy link
Collaborator

Pipboyguy commented Feb 26, 2024

(Partly) subsumed by #930

@nicor88
Copy link

nicor88 commented Apr 15, 2024

In case native iceberg tables are used in Athena, the partition implementation can be delegated to iceberg directly (hidden partitioning), and it's possible to use an implementation similar to what is done in BigQuery, where the PARTITION BY clause is added via SQL. Pretty much as @MRocholl mentioned.

For pure parquet writing of course what mention doesn't work, because partitions are part of the object path of the data written to S3.

@sh-rp @rudolfix @sultaniman as I'm interested in using Athena/Iceberg destination with partitions, do you see anything against what I proposed above? it's just about adding the right SQL for iceberg managed tables.

@sultaniman
Copy link
Contributor

@nicor88 I've been working on extending layout placeholders for filesystem destination, atm we don't yet support syncing or taking date & time from parquet files for partitioning.

@nicor88
Copy link

nicor88 commented Apr 15, 2024

@sultaniman Layout placehoders for file-system for what I can see won't work with Iceberg athena managed tables.

For example an iceberg table can be created like that:

CREATE TABLE events (id bigint, event_date date)
  PARTITIONED BY (event_date, bucket(16, id))
  LOCATION 's3://DOC-EXAMPLE-BUCKET/your-folder/'
  TBLPROPERTIES ( 'table_type' = 'ICEBERG' )

The writing to the file systems is handle under the hood by athena itself in case of Iceberg table, not by the writer (dlt), this because of Iceberg hidden partitioning.

Therefore the only thing to do with athena iceberg tables is to specify the partition definition on table creation, or as a post-hook (e.g.using pyiceberg) to modify the partition specs of a table.

Said so, I was planning to propose a PR for that, because it's the only limitation that is blocking us to use dlt in production.

All our tables are partitioned by ingestion date for example, to reduce data scans down-streams.

Happy to have a chat with more details if the above is not clear.

@sultaniman
Copy link
Contributor

@nicor88 sure we can have a chat about it, I think once we merge the original PR I will be more that happy for allowing iceberg support 🙂

@rudolfix rudolfix moved this from Planned to In Progress in dlt core library May 22, 2024
@rudolfix rudolfix moved this from In Progress to Done in dlt core library Jun 2, 2024
@rudolfix rudolfix moved this from Done to Planned in dlt core library Jun 2, 2024
@rudolfix rudolfix changed the title Partitions in AWS Athena HIVE Partitions in AWS Athena Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

No branches or pull requests

8 participants