diff --git a/go.mod b/go.mod index 6bea61d..5e1aea8 100644 --- a/go.mod +++ b/go.mod @@ -20,14 +20,43 @@ module github.com/apache/iceberg-go go 1.20 require ( + github.com/aws/aws-sdk-go-v2 v1.20.3 + github.com/aws/aws-sdk-go-v2/config v1.18.35 + github.com/aws/aws-sdk-go-v2/credentials v1.13.34 + github.com/aws/aws-sdk-go-v2/service/s3 v1.38.4 + github.com/hamba/avro/v2 v2.14.1 github.com/stretchr/testify v1.8.4 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 + github.com/wolfeidau/s3iofs v1.2.0 + golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb ) require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.10 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.41 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.35 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.34 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.21.4 // indirect + github.com/aws/smithy-go v1.14.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + github.com/stretchr/objx v0.5.0 // indirect + golang.org/x/mod v0.11.0 // indirect + golang.org/x/net v0.11.0 // indirect + golang.org/x/sys v0.9.0 // indirect + golang.org/x/tools v0.10.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 460287c..d12be7b 100644 --- a/go.sum +++ b/go.sum @@ -1,24 +1,111 @@ -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/aws/aws-sdk-go-v2 v1.20.3 h1:lgeKmAZhlj1JqN43bogrM75spIvYnRxqTAh1iupu1yE= +github.com/aws/aws-sdk-go-v2 v1.20.3/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 h1:OPLEkmhXf6xFPiz0bLeDArZIDx1NNS4oJyG4nv3Gct0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13/go.mod h1:gpAbvyDGQFozTEmlTFO8XcQKHzubdq0LzRyJpG6MiXM= +github.com/aws/aws-sdk-go-v2/config v1.18.35 h1:uU9rgCzrW/pVRUUlRULiwKQe8RoEDst1NQu4Qo8kOtk= +github.com/aws/aws-sdk-go-v2/config v1.18.35/go.mod h1:7xF1yr9GBMfYRQI4PLHO8iceqKLM6DpGVEvXI38HB/A= +github.com/aws/aws-sdk-go-v2/credentials v1.13.34 h1:/EYG4lzayDd5PY6HQQ2Qyj/cD6CR3kz96BjTZAO5tNo= +github.com/aws/aws-sdk-go-v2/credentials v1.13.34/go.mod h1:+wgdxCGNulHme6kTMZuDL9KOagLPloemoYkfjpQkSEU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.10 h1:mgOrtwYfJZ4e3QJe1TrliC/xIkauafGMdLLuCExOqcs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.10/go.mod h1:wMsSLVM2hRpDVhd+3dtLUzqwm7/fjuhNN+b1aOLDt6g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40 h1:CXceCS9BrDInRc74GDCQ8Qyk/Gp9VLdK+Rlve+zELSE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40/go.mod h1:5kKmFhLeOVy6pwPDpDNA6/hK/d6URC98pqDDqHgdBx4= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34 h1:B+nZtd22cbko5+793hg7LEaTeLMiZwlgCLUrN5Y0uzg= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34/go.mod h1:RZP0scceAyhMIQ9JvFp7HvkpcgqjL4l/4C+7RAeGbuM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.41 h1:EcSFdpLdkF3FWizimox0qYLuorn9e4PNMR27mvshGLs= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.41/go.mod h1:mKxUXW+TuwpCKKHVlmHGVVuBi9y9LKW8AiQodg23M5E= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3 h1:uHhWcrNBgpm9gi3o8NSQcsAqha/U9OFYzi2k4+0UVz8= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3/go.mod h1:jYLMm3Dh0wbeV3lxth5ryks/O2M/omVXWyYm3YcEVqQ= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14 h1:m0QTSI6pZYJTk5WSKx3fm5cNW/DCicVzULBgU/6IyD0= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14/go.mod h1:dDilntgHy9WnHXsh7dDtUPgHKEfTJIBUTHM8OWm0f/0= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.35 h1:oCUrlTzh9GwhlYdyDGNAS6UgqJRzJp5rKoYCJWqLyZI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.35/go.mod h1:YVHrksq36j0sbXCT6rSuQafpfYkMYqy0QTk7JTCTBIU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.34 h1:JwvXk+1ePAD9xkFHprhHYqwsxLDcbNFsPI1IAT2sPS0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.34/go.mod h1:ytsF+t+FApY2lFnN51fJKPhH6ICKOPXKEcwwgmJEdWI= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.3 h1:rPDAISw3FjEhrJoaxmQjuD+GgBfv2p3AVhmAcnyqq3k= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.3/go.mod h1:TXBww3ANB+QRj+/dUoYDvI8d/u4F4WzTxD4mxtDoxrg= +github.com/aws/aws-sdk-go-v2/service/s3 v1.38.4 h1:P4p346B+YMTTCH9D4I/FWYl+E7BjSLQxqk1e2KYDI5w= +github.com/aws/aws-sdk-go-v2/service/s3 v1.38.4/go.mod h1:uDxTlJiuPhbtRRPMHrPYRkn1Ck7Mtk3BEJiDut+gR5Y= +github.com/aws/aws-sdk-go-v2/service/sso v1.13.4 h1:WZPZ7Zf6Yo13lsfTetFrLU/7hZ9CXESDpdIHvmLxQFQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.13.4/go.mod h1:FP05hDXTLouXwAMQ1swqybHy7tHySblMkBMKSumaKg0= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4 h1:pYFM2U/3/4RLrlMSYXwL1XPBCWvaePk2p+0+i/BgHOs= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.4/go.mod h1:4pdlNASc29u0j9bq2jIQcBghG5Lx2oQAIj91vo1u1t8= +github.com/aws/aws-sdk-go-v2/service/sts v1.21.4 h1:zj4jxK3L54tGyqKleKDMK4vHolENxlq11dF0v1oBkJo= +github.com/aws/aws-sdk-go-v2/service/sts v1.21.4/go.mod h1:CQRMCzYvl5eeAQW3AWkRLS+zGGXCucBnsiQlrs+tCeo= +github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ= +github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hamba/avro/v2 v2.14.1 h1:mRkiRKjRTTs+yx0nVuM6z/q5zg3VBZfOe/01ngAnU6A= +github.com/hamba/avro/v2 v2.14.1/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +github.com/wolfeidau/s3iofs v1.2.0 h1:kqUIBKZHBLPzE4nThAHJmKhM4/EuIil4bP5U4LvGPfY= +github.com/wolfeidau/s3iofs v1.2.0/go.mod h1:y/7CrhZ5S9hBJ2psOEmvMhTj3u/sqVWYuGoIegHEP7I= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA= +golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= +golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/avro_schemas.go b/internal/avro_schemas.go new file mode 100644 index 0000000..4628787 --- /dev/null +++ b/internal/avro_schemas.go @@ -0,0 +1,568 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import "github.com/hamba/avro/v2" + +const ( + ManifestListV1Key = "manifest-list-v1" + ManifestListV2Key = "manifest-list-v2" + ManifestEntryV1Key = "manifest-entry-v1" + ManifestEntryV2Key = "manifest-entry-v2" +) + +var ( + AvroSchemaCache avro.SchemaCache +) + +func init() { + AvroSchemaCache.Add(ManifestListV1Key, avro.MustParse(`{ + "type": "record", + "name": "manifest_file", + "fields": [ + {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500}, + {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}, + {"name": "partition_spec_id", "type": "int", "doc": "Spec ID used to write", "field-id": 502}, + { + "name": "added_snapshot_id", + "type": ["null", "long"], + "doc": "Snapshot ID that added the manifest", + "field-id": 503 + }, + { + "name": "added_data_files_count", + "type": ["null", "int"], + "doc": "Added entry count", + "field-id": 504 + }, + { + "name": "existing_data_files_count", + "type": ["null", "int"], + "doc": "Existing entry count", + "field-id": 505 + }, + { + "name": "deleted_data_files_count", + "type": ["null", "int"], + "doc": "Deleted entry count", + "field-id": 506 + }, + { + "name": "partitions", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "r508", + "fields": [ + { + "name": "contains_null", + "type": "boolean", + "doc": "True if any file has a null partition value", + "field-id": 509 + }, + { + "name": "contains_nan", + "type": ["null", "boolean"], + "doc": "True if any file has a nan partition value", + "field-id": 518 + }, + { + "name": "lower_bound", + "type": ["null", "bytes"], + "doc": "Partition lower bound for all files", + "field-id": 510 + }, + { + "name": "upper_bound", + "type": ["null", "bytes"], + "doc": "Partition upper bound for all files", + "field-id": 511 + } + ] + }, + "element-id": 508 + } + ], + "doc": "Summary for each partition", + "field-id": 507 + }, + {"name": "added_rows_count", "type": ["null", "long"], "doc": "Added rows count", "field-id": 512}, + { + "name": "existing_rows_count", + "type": ["null", "long"], + "doc": "Existing rows count", + "field-id": 513 + }, + { + "name": "deleted_rows_count", + "type": ["null", "long"], + "doc": "Deleted rows count", + "field-id": 514 + } + ] + }`)) + + AvroSchemaCache.Add(ManifestListV2Key, avro.MustParse(`{ + "type": "record", + "name": "manifest_file", + "fields": [ + {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500}, + {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}, + {"name": "partition_spec_id", "type": "int", "doc": "Spec ID used to write", "field-id": 502}, + {"name": "content", "type": "int", "doc": "Contents of the manifest: 0=data, 1=deletes", "field-id": 517}, + { + "name": "sequence_number", + "type": "long", + "doc": "Sequence number when the manifest was added", + "field-id": 515 + }, + { + "name": "min_sequence_number", + "type": "long", + "doc": "Lowest sequence number in the manifest", + "field-id": 516 + }, + {"name": "added_snapshot_id", "type": "long", "doc": "Snapshot ID that added the manifest", "field-id": 503}, + {"name": "added_files_count", "type": "int", "doc": "Added entry count", "field-id": 504}, + {"name": "existing_files_count", "type": "int", "doc": "Existing entry count", "field-id": 505}, + {"name": "deleted_files_count", "type": "int", "doc": "Deleted entry count", "field-id": 506}, + {"name": "added_rows_count", "type": "long", "doc": "Added rows count", "field-id": 512}, + {"name": "existing_rows_count", "type": "long", "doc": "Existing rows count", "field-id": 513}, + {"name": "deleted_rows_count", "type": "long", "doc": "Deleted rows count", "field-id": 514}, + { + "name": "partitions", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "r508", + "fields": [ + { + "name": "contains_null", + "type": "boolean", + "doc": "True if any file has a null partition value", + "field-id": 509 + }, + { + "name": "contains_nan", + "type": ["null", "boolean"], + "doc": "True if any file has a nan partition value", + "field-id": 518 + }, + { + "name": "lower_bound", + "type": ["null", "bytes"], + "doc": "Partition lower bound for all files", + "field-id": 510 + }, + { + "name": "upper_bound", + "type": ["null", "bytes"], + "doc": "Partition upper bound for all files", + "field-id": 511 + } + ] + }, + "element-id": 508 + } + ], + "doc": "Summary for each partition", + "field-id": 507 + } + ] + }`)) + + AvroSchemaCache.Add(ManifestEntryV1Key, avro.MustParse(`{ + "type": "record", + "name": "manifest_entry", + "fields": [ + {"name": "status", "type": "int", "field-id": 0}, + {"name": "snapshot_id", "type": "long", "field-id": 1}, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": [ + {"name": "file_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 100}, + { + "name": "file_format", + "type": "string", + "doc": "File format name: avro, orc, or parquet", + "field-id": 101 + }, + { + "name": "partition", + "type": { + "type": "record", + "name": "r102", + "fields": [ + {"field-id": 1000, "name": "VendorID", "type": ["null", "int"]}, + { + "field-id": 1001, + "name": "tpep_pickup_datetime", + "type": ["null", {"type": "int", "logicalType": "date"}] + } + ] + }, + "field-id": 102 + }, + {"name": "record_count", "type": "long", "doc": "Number of records in the file", "field-id": 103}, + {"name": "file_size_in_bytes", "type": "long", "doc": "Total file size in bytes", "field-id": 104}, + {"name": "block_size_in_bytes", "type": "long", "field-id": 105}, + { + "name": "column_sizes", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k117_v118", + "fields": [ + {"name": "key", "type": "int", "field-id": 117}, + {"name": "value", "type": "long", "field-id": 118} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to total size on disk", + "field-id": 108 + }, + { + "name": "value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k119_v120", + "fields": [ + {"name": "key", "type": "int", "field-id": 119}, + {"name": "value", "type": "long", "field-id": 120} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to total count, including null and NaN", + "field-id": 109 + }, + { + "name": "null_value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k121_v122", + "fields": [ + {"name": "key", "type": "int", "field-id": 121}, + {"name": "value", "type": "long", "field-id": 122} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to null value count", + "field-id": 110 + }, + { + "name": "nan_value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k138_v139", + "fields": [ + {"name": "key", "type": "int", "field-id": 138}, + {"name": "value", "type": "long", "field-id": 139} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to number of NaN values in the column", + "field-id": 137 + }, + { + "name": "lower_bounds", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k126_v127", + "fields": [ + {"name": "key", "type": "int", "field-id": 126}, + {"name": "value", "type": "bytes", "field-id": 127} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to lower bound", + "field-id": 125 + }, + { + "name": "upper_bounds", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k129_v130", + "fields": [ + {"name": "key", "type": "int", "field-id": 129}, + {"name": "value", "type": "bytes", "field-id": 130} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to upper bound", + "field-id": 128 + }, + { + "name": "key_metadata", + "type": ["null", "bytes"], + "doc": "Encryption key metadata blob", + "field-id": 131 + }, + { + "name": "split_offsets", + "type": ["null", {"type": "array", "items": "long", "element-id": 133}], + "doc": "Splittable offsets", + "field-id": 132 + }, + { + "name": "sort_order_id", + "type": ["null", "int"], + "doc": "Sort order ID", + "field-id": 140 + } + ] + }, + "field-id": 2 + } + ] + }`)) + + AvroSchemaCache.Add(ManifestEntryV2Key, avro.MustParse(`{ + "type": "record", + "name": "manifest_entry", + "fields": [ + {"name": "status", "type": "int", "field-id": 0}, + {"name": "snapshot_id", "type": ["null", "long"], "field-id": 1}, + {"name": "sequence_number", "type": ["null", "long"], "field-id": 3}, + {"name": "file_sequence_number", "type": ["null", "long"], "field-id": 4}, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": [ + {"name": "content", "type": "int", "doc": "Type of content stored by the data file", "field-id": 134}, + {"name": "file_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 100}, + { + "name": "file_format", + "type": "string", + "doc": "File format name: avro, orc, or parquet", + "field-id": 101 + }, + { + "name": "partition", + "type": { + "type": "record", + "name": "r102", + "fields": [ + {"field-id": 1000, "name": "VendorID", "type": ["null", "int"]}, + { + "field-id": 1001, + "name": "tpep_pickup_datetime", + "type": ["null", {"type": "int", "logicalType": "date"}] + } + ] + }, + "field-id": 102 + }, + {"name": "record_count", "type": "long", "doc": "Number of records in the file", "field-id": 103}, + {"name": "file_size_in_bytes", "type": "long", "doc": "Total file size in bytes", "field-id": 104}, + { + "name": "column_sizes", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k117_v118", + "fields": [ + {"name": "key", "type": "int", "field-id": 117}, + {"name": "value", "type": "long", "field-id": 118} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to total size on disk", + "field-id": 108 + }, + { + "name": "value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k119_v120", + "fields": [ + {"name": "key", "type": "int", "field-id": 119}, + {"name": "value", "type": "long", "field-id": 120} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to total count, including null and NaN", + "field-id": 109 + }, + { + "name": "null_value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k121_v122", + "fields": [ + {"name": "key", "type": "int", "field-id": 121}, + {"name": "value", "type": "long", "field-id": 122} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to null value count", + "field-id": 110 + }, + { + "name": "nan_value_counts", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k138_v139", + "fields": [ + {"name": "key", "type": "int", "field-id": 138}, + {"name": "value", "type": "long", "field-id": 139} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to number of NaN values in the column", + "field-id": 137 + }, + { + "name": "lower_bounds", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k126_v127", + "fields": [ + {"name": "key", "type": "int", "field-id": 126}, + {"name": "value", "type": "bytes", "field-id": 127} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to lower bound", + "field-id": 125 + }, + { + "name": "upper_bounds", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "k129_v130", + "fields": [ + {"name": "key", "type": "int", "field-id": 129}, + {"name": "value", "type": "bytes", "field-id": 130} + ] + }, + "logicalType": "map" + } + ], + "doc": "Map of column id to upper bound", + "field-id": 128 + }, + { + "name": "key_metadata", + "type": ["null", "bytes"], + "doc": "Encryption key metadata blob", + "field-id": 131 + }, + { + "name": "split_offsets", + "type": ["null", {"type": "array", "items": "long", "element-id": 133}], + "doc": "Splittable offsets", + "field-id": 132 + }, + { + "name": "equality_ids", + "type": ["null", {"type": "array", "items": "int", "element-id": 136}], + "doc": "Field ids used to determine row equality for delete files", + "field-id": 135 + }, + { + "name": "sort_order_id", + "type": ["null", "int"], + "doc": "Sort order ID", + "field-id": 140 + } + ] + }, + "field-id": 2 + } + ] + }`)) +} diff --git a/internal/mock_fs.go b/internal/mock_fs.go new file mode 100644 index 0000000..95f6c3f --- /dev/null +++ b/internal/mock_fs.go @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "bytes" + "errors" + "io/fs" + + "github.com/apache/iceberg-go/io" + "github.com/stretchr/testify/mock" +) + +type MockFS struct { + mock.Mock +} + +func (m *MockFS) Open(name string) (io.File, error) { + args := m.Called(name) + return args.Get(0).(io.File), args.Error(1) +} + +func (m *MockFS) Remove(name string) error { + return m.Called(name).Error(0) +} + +type MockFSReadFile struct { + MockFS +} + +func (m *MockFSReadFile) ReadFile(name string) ([]byte, error) { + args := m.Called(name) + return args.Get(0).([]byte), args.Error(1) +} + +type MockFile struct { + Contents *bytes.Reader + + closed bool +} + +func (m *MockFile) Stat() (fs.FileInfo, error) { + return nil, nil +} + +func (m *MockFile) Read(p []byte) (int, error) { + return m.Contents.Read(p) +} + +func (m *MockFile) Close() error { + if m.closed { + return errors.New("already closed") + } + m.closed = true + return nil +} + +func (m *MockFile) ReadAt(p []byte, off int64) (n int, err error) { + if m.closed { + return 0, errors.New("already closed") + } + return m.Contents.ReadAt(p, off) +} + +func (m *MockFile) Seek(offset int64, whence int) (n int64, err error) { + if m.closed { + return 0, errors.New("already closed") + } + return m.Contents.Seek(offset, whence) +} diff --git a/io/io.go b/io/io.go new file mode 100644 index 0000000..abe5971 --- /dev/null +++ b/io/io.go @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import ( + "errors" + "fmt" + "io" + "io/fs" + "net/url" + "strings" +) + +// IO is an interface to a hierarchical file system. +// +// The IO interface is the minimum implementation required for a file +// system to utilize an iceberg table. A file system may implement +// additional interfaces, such as ReadFileIO, to provide additional or +// optimized functionality. +type IO interface { + // Open opens the named file. + // + // When Open returns an error, it should be of type *PathError + // with the Op field set to "open", the Path field set to name, + // and the Err field describing the problem. + // + // Open should reject attempts to open names that do not satisfy + // fs.ValidPath(name), returning a *PathError with Err set to + // ErrInvalid or ErrNotExist. + Open(name string) (File, error) + + // Remove removes the named file or (empty) directory. + // + // If there is an error, it will be of type *PathError. + Remove(name string) error +} + +// ReadFileIO is the interface implemented by a file system that +// provides an optimized implementation of ReadFile. +type ReadFileIO interface { + IO + + // ReadFile reads the named file and returns its contents. + // A successful call returns a nil error, not io.EOF. + // (Because ReadFile reads the whole file, the expected EOF + // from the final Read is not treated as an error to be reported.) + // + // The caller is permitted to modify the returned byte slice. + // This method should return a copy of the underlying data. + ReadFile(name string) ([]byte, error) +} + +// A File provides access to a single file. The File interface is the +// minimum implementation required for Iceberg to interact with a file. +// Directory files should also implement +type File interface { + fs.File + io.ReadSeekCloser + io.ReaderAt +} + +// A ReadDirFile is a directory file whose entries can be read with the +// ReadDir method. Every directory file should implement this interface. +// (It is permissible for any file to implement this interface, but +// if so ReadDir should return an error for non-directories.) +type ReadDirFile interface { + File + + // ReadDir read the contents of the directory and returns a slice + // of up to n DirEntry values in directory order. Subsequent calls + // on the same file will yield further DirEntry values. + // + // If n > 0, ReadDir returns at most n DirEntry structures. In this + // case, if ReadDir returns an empty slice, it will return a non-nil + // error explaining why. + // + // At the end of a directory, the error is io.EOF. (ReadDir must return + // io.EOF itself, not an error wrapping io.EOF.) + // + // If n <= 0, ReadDir returns all the DirEntry values from the directory + // in a single slice. In this case, if ReadDir succeeds (reads all the way + // to the end of the directory), it returns the slice and a nil error. + // If it encounters an error before the end of the directory, ReadDir + // returns the DirEntry list read until that point and a non-nil error. + ReadDir(n int) ([]fs.DirEntry, error) +} + +// FS wraps an io/fs.FS as an IO interface. +func FS(fsys fs.FS) IO { + if _, ok := fsys.(fs.ReadFileFS); ok { + return readFileFS{ioFS{fsys, nil}} + } + return ioFS{fsys, nil} +} + +// FSPreProcName wraps an io/fs.FS like FS, only if fn is non-nil then +// it is called to preprocess any filenames before they are passed to +// the underlying fsys. +func FSPreProcName(fsys fs.FS, fn func(string) string) IO { + if _, ok := fsys.(fs.ReadFileFS); ok { + return readFileFS{ioFS{fsys, fn}} + } + return ioFS{fsys, fn} +} + +type readFileFS struct { + ioFS +} + +func (r readFileFS) ReadFile(name string) ([]byte, error) { + if r.preProcessName != nil { + name = r.preProcessName(name) + } + + rfs, ok := r.fsys.(fs.ReadFileFS) + if !ok { + return nil, errMissingReadFile + } + return rfs.ReadFile(name) +} + +type ioFS struct { + fsys fs.FS + + preProcessName func(string) string +} + +func (f ioFS) Open(name string) (File, error) { + if f.preProcessName != nil { + name = f.preProcessName(name) + } + + if name == "/" { + name = "." + } else { + name = strings.TrimPrefix(name, "/") + } + file, err := f.fsys.Open(name) + if err != nil { + return nil, err + } + + return ioFile{file}, nil +} + +func (f ioFS) Remove(name string) error { + r, ok := f.fsys.(interface{ Remove(name string) error }) + if !ok { + return errMissingRemove + } + return r.Remove(name) +} + +var ( + errMissingReadDir = errors.New("fs.File directory missing ReadDir method") + errMissingSeek = errors.New("fs.File missing Seek method") + errMissingReadAt = errors.New("fs.File missing ReadAt") + errMissingRemove = errors.New("fs.FS missing Remove method") + errMissingReadFile = errors.New("fs.FS missing ReadFile method") +) + +type ioFile struct { + file fs.File +} + +func (f ioFile) Close() error { return f.file.Close() } +func (f ioFile) Read(b []byte) (int, error) { return f.file.Read(b) } +func (f ioFile) Stat() (fs.FileInfo, error) { return f.file.Stat() } +func (f ioFile) Seek(offset int64, whence int) (int64, error) { + s, ok := f.file.(io.Seeker) + if !ok { + return 0, errMissingSeek + } + return s.Seek(offset, whence) +} + +func (f ioFile) ReadAt(p []byte, off int64) (n int, err error) { + r, ok := f.file.(io.ReaderAt) + if !ok { + return 0, errMissingReadAt + } + return r.ReadAt(p, off) +} + +func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) { + d, ok := f.file.(fs.ReadDirFile) + if !ok { + return nil, errMissingReadDir + } + + return d.ReadDir(count) +} + +func inferFileIOFromSchema(path string, props map[string]string) (IO, error) { + parsed, err := url.Parse(path) + if err != nil { + return nil, err + } + + switch parsed.Scheme { + case "s3", "s3a", "s3n": + return createS3FileIO(parsed, props) + case "file", "": + return LocalFS{}, nil + default: + return nil, fmt.Errorf("IO for file '%s' not implemented", path) + } +} + +// LoadFS takes a map of properties and an optional URI location +// and attempts to infer an IO object from it. +// +// A schema of "file://" or an empty string will result in a LocalFS +// implementation. Otherwise this will return an error if the schema +// does not yet have an implementation here. +// +// Currently only LocalFS and S3 are implemented. +func LoadFS(props map[string]string, location string) (IO, error) { + if location == "" { + location = props["warehouse"] + } + + iofs, err := inferFileIOFromSchema(location, props) + if err != nil { + return nil, err + } + + if iofs == nil { + iofs = LocalFS{} + } + + return iofs, nil +} diff --git a/io/local.go b/io/local.go new file mode 100644 index 0000000..befa831 --- /dev/null +++ b/io/local.go @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import "os" + +// LocalFS is an implementation of IO that implements interaction with +// the local file system. +type LocalFS struct{} + +func (LocalFS) Open(name string) (File, error) { + return os.Open(name) +} + +func (LocalFS) Remove(name string) error { + return os.Remove(name) +} diff --git a/io/s3.go b/io/s3.go new file mode 100644 index 0000000..19f0887 --- /dev/null +++ b/io/s3.go @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/wolfeidau/s3iofs" +) + +// Constants for S3 configuration options +const ( + S3Region = "s3.region" + S3SessionToken = "s3.session-token" + S3SecretAccessKey = "s3.secret-access-key" + S3AccessKeyID = "s3.access-key-id" + S3EndpointURL = "s3.endpoint" + S3ProxyURI = "s3.proxy-uri" +) + +func createS3FileIO(parsed *url.URL, props map[string]string) (IO, error) { + opts := []func(*config.LoadOptions) error{} + endpoint, ok := props[S3EndpointURL] + if !ok { + endpoint = os.Getenv("AWS_S3_ENDPOINT") + } + + if endpoint != "" { + opts = append(opts, config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + if service != s3.ServiceID { + // fallback to default resolution for the service + return aws.Endpoint{}, &aws.EndpointNotFoundError{} + } + + return aws.Endpoint{ + URL: endpoint, + SigningRegion: region, + HostnameImmutable: true, + }, nil + }))) + } + + if region, ok := props[S3Region]; ok { + opts = append(opts, config.WithRegion(region)) + } + + accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey] + token := props[S3SessionToken] + if accessKey != "" || secretAccessKey != "" || token != "" { + opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken]))) + } + + if proxy, ok := props[S3ProxyURI]; ok { + proxyURL, err := url.Parse(proxy) + if err != nil { + return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy) + } + + opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions( + func(t *http.Transport) { + t.Proxy = http.ProxyURL(proxyURL) + }, + ))) + } + + awscfg, err := config.LoadDefaultConfig(context.Background(), opts...) + if err != nil { + return nil, err + } + + preprocess := func(n string) string { + _, after, found := strings.Cut(n, "://") + if found { + n = after + } + + return strings.TrimPrefix(n, parsed.Host) + } + + s3fs := s3iofs.New(parsed.Host, awscfg) + return FSPreProcName(s3fs, preprocess), nil +} diff --git a/manifest.go b/manifest.go new file mode 100644 index 0000000..628b721 --- /dev/null +++ b/manifest.go @@ -0,0 +1,803 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "io" + "sync" + + iceio "github.com/apache/iceberg-go/io" + + "github.com/hamba/avro/v2/ocf" +) + +// ManifestContent indicates the type of data inside of the files +// described by a manifest. This will indicate whether the data files +// contain active data or deleted rows. +type ManifestContent int32 + +const ( + ManifestContentData ManifestContent = 0 + ManifestContentDeletes ManifestContent = 1 +) + +type FieldSummary struct { + ContainsNull bool `avro:"contains_null"` + ContainsNaN *bool `avro:"contains_nan"` + LowerBound *[]byte `avro:"lower_bound"` + UpperBound *[]byte `avro:"upper_bound"` +} + +// ManifestV1Builder is a helper for building a V1 manifest file +// struct which will conform to the ManifestFile interface. +type ManifestV1Builder struct { + m *manifestFileV1 +} + +// NewManifestV1Builder is passed all of the required fields and then allows +// all of the optional fields to be set by calling the corresponding methods +// before calling [ManifestV1Builder.Build] to construct the object. +func NewManifestV1Builder(path string, length int64, partitionSpecID int32) *ManifestV1Builder { + return &ManifestV1Builder{ + m: &manifestFileV1{ + Path: path, + Len: length, + SpecID: partitionSpecID, + }, + } +} + +func (b *ManifestV1Builder) AddedSnapshotID(id int64) *ManifestV1Builder { + b.m.AddedSnapshotID = &id + return b +} + +func (b *ManifestV1Builder) AddedFiles(cnt int32) *ManifestV1Builder { + b.m.AddedFilesCount = &cnt + return b +} + +func (b *ManifestV1Builder) ExistingFiles(cnt int32) *ManifestV1Builder { + b.m.ExistingFilesCount = &cnt + return b +} + +func (b *ManifestV1Builder) DeletedFiles(cnt int32) *ManifestV1Builder { + b.m.DeletedFilesCount = &cnt + return b +} + +func (b *ManifestV1Builder) AddedRows(cnt int64) *ManifestV1Builder { + b.m.AddedRowsCount = &cnt + return b +} + +func (b *ManifestV1Builder) ExistingRows(cnt int64) *ManifestV1Builder { + b.m.ExistingRowsCount = &cnt + return b +} + +func (b *ManifestV1Builder) DeletedRows(cnt int64) *ManifestV1Builder { + b.m.DeletedRowsCount = &cnt + return b +} + +func (b *ManifestV1Builder) Partitions(p []FieldSummary) *ManifestV1Builder { + b.m.PartitionList = &p + return b +} + +func (b *ManifestV1Builder) KeyMetadata(km []byte) *ManifestV1Builder { + b.m.Key = km + return b +} + +// Build returns the constructed manifest file, after calling Build this +// builder should not be used further as we avoid copying by just returning +// a pointer to the constructed manifest file. Further calls to the modifier +// methods after calling build would modify the constructed ManifestFile. +func (b *ManifestV1Builder) Build() ManifestFile { + return b.m +} + +type manifestFileV1 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + SpecID int32 `avro:"partition_spec_id"` + AddedSnapshotID *int64 `avro:"added_snapshot_id"` + AddedFilesCount *int32 `avro:"added_data_files_count"` + ExistingFilesCount *int32 `avro:"existing_data_files_count"` + DeletedFilesCount *int32 `avro:"deleted_data_files_count"` + AddedRowsCount *int64 `avro:"added_rows_count"` + ExistingRowsCount *int64 `avro:"existing_rows_count"` + DeletedRowsCount *int64 `avro:"deleted_rows_count"` + PartitionList *[]FieldSummary `avro:"partitions"` + Key []byte `avro:"key_metadata"` +} + +func (*manifestFileV1) Version() int { return 1 } +func (m *manifestFileV1) FilePath() string { return m.Path } +func (m *manifestFileV1) Length() int64 { return m.Len } +func (m *manifestFileV1) PartitionSpecID() int32 { return m.SpecID } +func (m *manifestFileV1) ManifestContent() ManifestContent { + return ManifestContentData +} +func (m *manifestFileV1) SnapshotID() int64 { + if m.AddedSnapshotID == nil { + return 0 + } + return *m.AddedSnapshotID +} + +func (m *manifestFileV1) AddedDataFiles() int32 { + if m.AddedFilesCount == nil { + return 0 + } + return *m.AddedFilesCount +} + +func (m *manifestFileV1) ExistingDataFiles() int32 { + if m.ExistingFilesCount == nil { + return 0 + } + return *m.ExistingFilesCount +} + +func (m *manifestFileV1) DeletedDataFiles() int32 { + if m.DeletedFilesCount == nil { + return 0 + } + return *m.DeletedFilesCount +} + +func (m *manifestFileV1) AddedRows() int64 { + if m.AddedRowsCount == nil { + return 0 + } + return *m.AddedRowsCount +} + +func (m *manifestFileV1) ExistingRows() int64 { + if m.ExistingRowsCount == nil { + return 0 + } + return *m.ExistingRowsCount +} + +func (m *manifestFileV1) DeletedRows() int64 { + if m.DeletedRowsCount == nil { + return 0 + } + return *m.DeletedRowsCount +} + +func (m *manifestFileV1) HasAddedFiles() bool { + return m.AddedFilesCount == nil || *m.AddedFilesCount > 0 +} + +func (m *manifestFileV1) HasExistingFiles() bool { + return m.ExistingFilesCount == nil || *m.ExistingFilesCount > 0 +} + +func (m *manifestFileV1) SequenceNum() int64 { return 0 } +func (m *manifestFileV1) MinSequenceNum() int64 { return 0 } +func (m *manifestFileV1) KeyMetadata() []byte { return m.Key } +func (m *manifestFileV1) Partitions() []FieldSummary { + if m.PartitionList == nil { + return nil + } + return *m.PartitionList +} + +func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +// ManifestV2Builder is a helper for building a V2 manifest file +// struct which will conform to the ManifestFile interface. +type ManifestV2Builder struct { + m *manifestFileV2 +} + +// NewManifestV2Builder is constructed with the primary fields, with the remaining +// fields set to their zero value unless modified by calling the corresponding +// methods of the builder. Then calling [ManifestV2Builder.Build] to retrieve the +// constructed ManifestFile. +func NewManifestV2Builder(path string, length int64, partitionSpecID int32, content ManifestContent, addedSnapshotID int64) *ManifestV2Builder { + return &ManifestV2Builder{ + m: &manifestFileV2{ + Path: path, + Len: length, + SpecID: partitionSpecID, + Content: content, + AddedSnapshotID: addedSnapshotID, + }, + } +} + +func (b *ManifestV2Builder) SequenceNum(num, minSeqNum int64) *ManifestV2Builder { + b.m.SeqNumber, b.m.MinSeqNumber = num, minSeqNum + return b +} + +func (b *ManifestV2Builder) AddedFiles(cnt int32) *ManifestV2Builder { + b.m.AddedFilesCount = cnt + return b +} + +func (b *ManifestV2Builder) ExistingFiles(cnt int32) *ManifestV2Builder { + b.m.ExistingFilesCount = cnt + return b +} + +func (b *ManifestV2Builder) DeletedFiles(cnt int32) *ManifestV2Builder { + b.m.DeletedFilesCount = cnt + return b +} + +func (b *ManifestV2Builder) AddedRows(cnt int64) *ManifestV2Builder { + b.m.AddedRowsCount = cnt + return b +} + +func (b *ManifestV2Builder) ExistingRows(cnt int64) *ManifestV2Builder { + b.m.ExistingRowsCount = cnt + return b +} + +func (b *ManifestV2Builder) DeletedRows(cnt int64) *ManifestV2Builder { + b.m.DeletedRowsCount = cnt + return b +} + +func (b *ManifestV2Builder) Partitions(p []FieldSummary) *ManifestV2Builder { + b.m.PartitionList = &p + return b +} + +func (b *ManifestV2Builder) KeyMetadata(km []byte) *ManifestV2Builder { + b.m.Key = km + return b +} + +// Build returns the constructed manifest file, after calling Build this +// builder should not be used further as we avoid copying by just returning +// a pointer to the constructed manifest file. Further calls to the modifier +// methods after calling build would modify the constructed ManifestFile. +func (b *ManifestV2Builder) Build() ManifestFile { + return b.m +} + +type manifestFileV2 struct { + Path string `avro:"manifest_path"` + Len int64 `avro:"manifest_length"` + SpecID int32 `avro:"partition_spec_id"` + Content ManifestContent `avro:"content"` + SeqNumber int64 `avro:"sequence_number"` + MinSeqNumber int64 `avro:"min_sequence_number"` + AddedSnapshotID int64 `avro:"added_snapshot_id"` + AddedFilesCount int32 `avro:"added_files_count"` + ExistingFilesCount int32 `avro:"existing_files_count"` + DeletedFilesCount int32 `avro:"deleted_files_count"` + AddedRowsCount int64 `avro:"added_rows_count"` + ExistingRowsCount int64 `avro:"existing_rows_count"` + DeletedRowsCount int64 `avro:"deleted_rows_count"` + PartitionList *[]FieldSummary `avro:"partitions"` + Key []byte `avro:"key_metadata"` +} + +func (*manifestFileV2) Version() int { return 2 } + +func (m *manifestFileV2) FilePath() string { return m.Path } +func (m *manifestFileV2) Length() int64 { return m.Len } +func (m *manifestFileV2) PartitionSpecID() int32 { return m.SpecID } +func (m *manifestFileV2) ManifestContent() ManifestContent { return m.Content } +func (m *manifestFileV2) SnapshotID() int64 { + return m.AddedSnapshotID +} + +func (m *manifestFileV2) AddedDataFiles() int32 { + return m.AddedFilesCount +} + +func (m *manifestFileV2) ExistingDataFiles() int32 { + return m.ExistingFilesCount +} + +func (m *manifestFileV2) DeletedDataFiles() int32 { + return m.DeletedFilesCount +} + +func (m *manifestFileV2) AddedRows() int64 { + return m.AddedRowsCount +} + +func (m *manifestFileV2) ExistingRows() int64 { + return m.ExistingRowsCount +} + +func (m *manifestFileV2) DeletedRows() int64 { + return m.DeletedRowsCount +} + +func (m *manifestFileV2) SequenceNum() int64 { return m.SeqNumber } +func (m *manifestFileV2) MinSequenceNum() int64 { return m.MinSeqNumber } +func (m *manifestFileV2) KeyMetadata() []byte { return m.Key } + +func (m *manifestFileV2) Partitions() []FieldSummary { + if m.PartitionList == nil { + return nil + } + return *m.PartitionList +} + +func (m *manifestFileV2) HasAddedFiles() bool { + return m.AddedFilesCount > 0 +} + +func (m *manifestFileV2) HasExistingFiles() bool { + return m.ExistingFilesCount > 0 +} + +func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + return fetchManifestEntries(m, fs, discardDeleted) +} + +func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { + f, err := fs.Open(m.FilePath()) + if err != nil { + return nil, err + } + defer f.Close() + + dec, err := ocf.NewDecoder(f) + if err != nil { + return nil, err + } + + metadata := dec.Metadata() + isVer1 := true + if string(metadata["format-version"]) == "2" { + isVer1 = false + } + + results := make([]ManifestEntry, 0) + for dec.HasNext() { + var tmp ManifestEntry + if isVer1 { + tmp = &manifestEntryV1{} + } else { + tmp = &manifestEntryV2{} + } + + if err := dec.Decode(tmp); err != nil { + return nil, err + } + + if !discardDeleted || tmp.Status() != EntryStatusDELETED { + tmp.inheritSeqNum(m) + results = append(results, tmp) + } + } + + return results, dec.Error() +} + +// ManifestFile is the interface which covers both V1 and V2 manifest files. +type ManifestFile interface { + // Version returns the version number of this manifest file. + // It should be 1 or 2. + Version() int + // FilePath is the location URI of this manifest file. + FilePath() string + // Length is the length in bytes of the manifest file. + Length() int64 + // PartitionSpecID is the ID of the partition spec used to write + // this manifest. It must be listed in the table metadata + // partition-specs. + PartitionSpecID() int32 + // ManifestContent is the type of files tracked by this manifest, + // either data or delete files. All v1 manifests track data files. + ManifestContent() ManifestContent + // SnapshotID is the ID of the snapshot where this manifest file + // was added. + SnapshotID() int64 + // AddedDataFiles returns the number of entries in the manifest that + // have the status of EntryStatusADDED. + AddedDataFiles() int32 + // ExistingDataFiles returns the number of entries in the manifest + // which have the status of EntryStatusEXISTING. + ExistingDataFiles() int32 + // DeletedDataFiles returns the number of entries in the manifest + // which have the status of EntryStatusDELETED. + DeletedDataFiles() int32 + // AddedRows returns the number of rows in all files of the manifest + // that have status EntryStatusADDED. + AddedRows() int64 + // ExistingRows returns the number of rows in all files of the manifest + // which have status EntryStatusEXISTING. + ExistingRows() int64 + // DeletedRows returns the number of rows in all files of the manifest + // which have status EntryStatusDELETED. + DeletedRows() int64 + // SequenceNum returns the sequence number when this manifest was + // added to the table. Will be 0 for v1 manifest lists. + SequenceNum() int64 + // MinSequenceNum is the minimum data sequence number of all live data + // or delete files in the manifest. Will be 0 for v1 manifest lists. + MinSequenceNum() int64 + // KeyMetadata returns implementation-specific key metadata for encryption + // if it exists in the manifest list. + KeyMetadata() []byte + // Partitions returns a list of field summaries for each partition + // field in the spec. Each field in the list corresponds to a field in + // the manifest file's partition spec. + Partitions() []FieldSummary + + // HasAddedFiles returns true if AddedDataFiles > 0 or if it was null. + HasAddedFiles() bool + // HasExistingFiles returns true if ExistingDataFiles > 0 or if it was null. + HasExistingFiles() bool + // FetchEntries reads the manifest list file to fetch the list of + // manifest entries using the provided file system IO interface. + // If discardDeleted is true, entries for files containing deleted rows + // will be skipped. + FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) +} + +// ReadManifestList reads in an avro manifest list file and returns a slice +// of manifest files or an error if one is encountered. +func ReadManifestList(in io.Reader) ([]ManifestFile, error) { + dec, err := ocf.NewDecoder(in) + if err != nil { + return nil, err + } + + out := make([]ManifestFile, 0) + + for dec.HasNext() { + var file ManifestFile + if string(dec.Metadata()["format-version"]) == "2" { + file = &manifestFileV2{} + } else { + file = &manifestFileV1{} + } + + if err := dec.Decode(file); err != nil { + return nil, err + } + out = append(out, file) + } + + return out, dec.Error() +} + +// ManifestEntryStatus defines constants for the entry status of +// existing, added or deleted. +type ManifestEntryStatus int8 + +const ( + EntryStatusEXISTING ManifestEntryStatus = 0 + EntryStatusADDED ManifestEntryStatus = 1 + EntryStatusDELETED ManifestEntryStatus = 2 +) + +// ManifestEntryContent defines constants for the type of file contents +// in the file entries. Data, Position based deletes and equality based +// deletes. +type ManifestEntryContent int8 + +const ( + EntryContentData ManifestEntryContent = 0 + EntryContentPosDeletes ManifestEntryContent = 1 + EntryContentEqDeletes ManifestEntryContent = 2 +) + +// FileFormat defines constants for the format of data files. +type FileFormat string + +const ( + AvroFile FileFormat = "AVRO" + OrcFile FileFormat = "ORC" + ParquetFile FileFormat = "PARQUET" +) + +type colMap[K, V any] struct { + Key K `avro:"key"` + Value V `avro:"value"` +} + +func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V { + if c == nil { + return nil + } + + out := make(map[K]V) + for _, data := range *c { + out[data.Key] = data.Value + } + return out +} + +type dataFile struct { + Content ManifestEntryContent `avro:"content"` + Path string `avro:"file_path"` + Format FileFormat `avro:"file_format"` + PartitionData map[string]any `avro:"partition"` + RecordCount int64 `avro:"record_count"` + FileSize int64 `avro:"file_size_in_bytes"` + BlockSizeInBytes int64 `avro:"block_size_in_bytes"` + ColSizes *[]colMap[int, int64] `avro:"column_sizes"` + ValCounts *[]colMap[int, int64] `avro:"value_counts"` + NullCounts *[]colMap[int, int64] `avro:"null_value_counts"` + NaNCounts *[]colMap[int, int64] `avro:"nan_value_counts"` + DistinctCounts *[]colMap[int, int64] `avro:"distinct_counts"` + LowerBounds *[]colMap[int, []byte] `avro:"lower_bounds"` + UpperBounds *[]colMap[int, []byte] `avro:"upper_bounds"` + Key *[]byte `avro:"key_metadata"` + Splits *[]int64 `avro:"split_offsets"` + EqualityIDs *[]int `avro:"equality_ids"` + SortOrder *int `avro:"sort_order_id"` + + colSizeMap map[int]int64 + valCntMap map[int]int64 + nullCntMap map[int]int64 + nanCntMap map[int]int64 + distinctCntMap map[int]int64 + lowerBoundMap map[int][]byte + upperBoundMap map[int][]byte + + initMaps sync.Once +} + +func (d *dataFile) initializeMapData() { + d.initMaps.Do(func() { + d.colSizeMap = avroColMapToMap(d.ColSizes) + d.valCntMap = avroColMapToMap(d.ValCounts) + d.nullCntMap = avroColMapToMap(d.NullCounts) + d.nanCntMap = avroColMapToMap(d.NaNCounts) + d.distinctCntMap = avroColMapToMap(d.DistinctCounts) + d.lowerBoundMap = avroColMapToMap(d.LowerBounds) + d.upperBoundMap = avroColMapToMap(d.UpperBounds) + }) +} + +func (d *dataFile) ContentType() ManifestEntryContent { return d.Content } +func (d *dataFile) FilePath() string { return d.Path } +func (d *dataFile) FileFormat() FileFormat { return d.Format } +func (d *dataFile) Partition() map[string]any { return d.PartitionData } +func (d *dataFile) Count() int64 { return d.RecordCount } +func (d *dataFile) FileSizeBytes() int64 { return d.FileSize } + +func (d *dataFile) ColumnSizes() map[int]int64 { + d.initializeMapData() + return d.colSizeMap +} + +func (d *dataFile) ValueCounts() map[int]int64 { + d.initializeMapData() + return d.valCntMap +} + +func (d *dataFile) NullValueCounts() map[int]int64 { + d.initializeMapData() + return d.nullCntMap +} + +func (d *dataFile) NaNValueCounts() map[int]int64 { + d.initializeMapData() + return d.nanCntMap +} + +func (d *dataFile) DistinctValueCounts() map[int]int64 { + d.initializeMapData() + return d.distinctCntMap +} + +func (d *dataFile) LowerBoundValues() map[int][]byte { + d.initializeMapData() + return d.lowerBoundMap +} + +func (d *dataFile) UpperBoundValues() map[int][]byte { + d.initializeMapData() + return d.upperBoundMap +} + +func (d *dataFile) KeyMetadata() []byte { + if d.Key == nil { + return nil + } + return *d.Key +} + +func (d *dataFile) SplitOffsets() []int64 { + if d.Splits == nil { + return nil + } + return *d.Splits +} + +func (d *dataFile) EqualityFieldIDs() []int { + if d.EqualityIDs == nil { + return nil + } + return d.EqualityFieldIDs() +} + +func (d *dataFile) SortOrderID() *int { return d.SortOrder } + +type manifestEntryV1 struct { + EntryStatus ManifestEntryStatus `avro:"status"` + Snapshot int64 `avro:"snapshot_id"` + SeqNum *int64 + FileSeqNum *int64 + Data dataFile `avro:"data_file"` +} + +func (m *manifestEntryV1) inheritSeqNum(manifest ManifestFile) {} + +func (m *manifestEntryV1) Status() ManifestEntryStatus { return m.EntryStatus } +func (m *manifestEntryV1) SnapshotID() int64 { return m.Snapshot } + +func (m *manifestEntryV1) SequenceNum() int64 { + if m.SeqNum == nil { + return 0 + } + return *m.SeqNum +} + +func (m *manifestEntryV1) FileSequenceNum() *int64 { + return m.FileSeqNum +} + +func (m *manifestEntryV1) DataFile() DataFile { return &m.Data } + +type manifestEntryV2 struct { + EntryStatus ManifestEntryStatus `avro:"status"` + Snapshot *int64 `avro:"snapshot_id"` + SeqNum *int64 `avro:"sequence_number"` + FileSeqNum *int64 `avro:"file_sequence_number"` + Data dataFile `avro:"data_file"` +} + +func (m *manifestEntryV2) inheritSeqNum(manifest ManifestFile) { + if m.Snapshot == nil { + snap := manifest.SnapshotID() + m.Snapshot = &snap + } + + manifestSequenceNum := manifest.SequenceNum() + if m.SeqNum == nil && (manifestSequenceNum == 0 || m.EntryStatus == EntryStatusADDED) { + m.SeqNum = &manifestSequenceNum + } + + if m.FileSeqNum == nil && (manifestSequenceNum == 0 || m.EntryStatus == EntryStatusADDED) { + m.FileSeqNum = &manifestSequenceNum + } +} + +func (m *manifestEntryV2) Status() ManifestEntryStatus { return m.EntryStatus } +func (m *manifestEntryV2) SnapshotID() int64 { + if m.Snapshot == nil { + return 0 + } + return *m.Snapshot +} + +func (m *manifestEntryV2) SequenceNum() int64 { + if m.SeqNum == nil { + return 0 + } + return *m.SeqNum +} + +func (m *manifestEntryV2) FileSequenceNum() *int64 { + return m.FileSeqNum +} + +func (m *manifestEntryV2) DataFile() DataFile { return &m.Data } + +// DataFile is the interface for reading the information about a +// given data file indicated by an entry in a manifest list. +type DataFile interface { + // ContentType is the type of the content stored by the data file, + // either Data, Equality deletes, or Position deletes. All v1 files + // are Data files. + ContentType() ManifestEntryContent + // FilePath is the full URI for the file, complete with FS scheme. + FilePath() string + // FileFormat is the format of the data file, AVRO, Orc, or Parquet. + FileFormat() FileFormat + // Partition returns a mapping of field name to partition value for + // each of the partition spec's fields. + Partition() map[string]any + // Count returns the number of records in this file. + Count() int64 + // FileSizeBytes is the total file size in bytes. + FileSizeBytes() int64 + // ColumnSizes is a mapping from column id to the total size on disk + // of all regions that store the column. Does not include bytes + // necessary to read other columns, like footers. Map will be nil for + // row-oriented formats (avro). + ColumnSizes() map[int]int64 + // ValueCounts is a mapping from column id to the number of values + // in the column, including null and NaN values. + ValueCounts() map[int]int64 + // NullValueCounts is a mapping from column id to the number of + // null values in the column. + NullValueCounts() map[int]int64 + // NaNValueCounts is a mapping from column id to the number of NaN + // values in the column. + NaNValueCounts() map[int]int64 + // DistictValueCounts is a mapping from column id to the number of + // distinct values in the column. Distinct counts must be derived + // using values in the file by counting or using sketches, but not + // using methods like merging existing distinct counts. + DistinctValueCounts() map[int]int64 + // LowerBoundValues is a mapping from column id to the lower bounded + // value of the column, serialized as binary. Each value in the column + // must be less than or requal to all non-null, non-NaN values in the + // column for the file. + LowerBoundValues() map[int][]byte + // UpperBoundValues is a mapping from column id to the upper bounded + // value of the column, serialized as binary. Each value in the column + // must be greater than or equal to all non-null, non-NaN values in + // the column for the file. + UpperBoundValues() map[int][]byte + // KeyMetadata is implementation-specific key metadata for encryption. + KeyMetadata() []byte + // SplitOffsets are the split offsets for the data file. For example, + // all row group offsets in a Parquet file. Must be sorted ascending. + SplitOffsets() []int64 + // EqualityFieldIDs are used to determine row equality in equality + // delete files. It is required when the content type is + // EntryContentEqDeletes. + EqualityFieldIDs() []int + // SortOrderID returns the id representing the sort order for this + // file, or nil if there is no sort order. + SortOrderID() *int +} + +// ManifestEntry is an interface for both v1 and v2 manifest entries. +type ManifestEntry interface { + // Status returns the type of the file tracked by this entry. + // Deletes are informational only and not used in scans. + Status() ManifestEntryStatus + // SnapshotID is the id where the file was added, or deleted, + // if null it is inherited from the manifest list. + SnapshotID() int64 + // SequenceNum returns the data sequence number of the file. + // If it was null and the status is EntryStatusADDED then it + // is inherited from the manifest list. + SequenceNum() int64 + // FileSequenceNum returns the file sequence number indicating + // when the file was added. If it was null and the status is + // EntryStatusADDED then it is inherited from the manifest list. + FileSequenceNum() *int64 + // DataFile provides the information about the data file indicated + // by this manifest entry. + DataFile() DataFile + + inheritSeqNum(manifest ManifestFile) +} + +var PositionalDeleteSchema = NewSchema(0, + NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true}, + NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true}, +) diff --git a/manifest_test.go b/manifest_test.go new file mode 100644 index 0000000..74d063e --- /dev/null +++ b/manifest_test.go @@ -0,0 +1,773 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "bytes" + "testing" + "time" + + "github.com/apache/iceberg-go/internal" + "github.com/hamba/avro/v2/ocf" + "github.com/stretchr/testify/suite" +) + +var ( + falseBool = false + snapshotID int64 = 9182715666859759686 + addedRows int64 = 237993 + manifestFileRecordsV1 = []ManifestFile{ + NewManifestV1Builder("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro", + 7989, 0). + AddedSnapshotID(snapshotID). + AddedFiles(3). + ExistingFiles(0). + DeletedFiles(0). + AddedRows(addedRows). + ExistingRows(0). + DeletedRows(0). + Partitions([]FieldSummary{{ + ContainsNull: true, ContainsNaN: &falseBool, + LowerBound: &[]byte{0x01, 0x00, 0x00, 0x00}, + UpperBound: &[]byte{0x02, 0x00, 0x00, 0x00}, + }}).Build()} + + manifestFileRecordsV2 = []ManifestFile{ + NewManifestV2Builder("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro", + 7989, 0, ManifestContentDeletes, snapshotID). + SequenceNum(3, 3). + AddedFiles(3). + ExistingFiles(0). + DeletedFiles(0). + AddedRows(addedRows). + ExistingRows(0). + DeletedRows(0). + Partitions([]FieldSummary{{ + ContainsNull: true, + ContainsNaN: &falseBool, + LowerBound: &[]byte{0x01, 0x00, 0x00, 0x00}, + UpperBound: &[]byte{0x02, 0x00, 0x00, 0x00}, + }}).Build()} + + entrySnapshotID int64 = 8744736658442914487 + intZero = 0 + manifestEntryV1Records = []*manifestEntryV1{ + { + EntryStatus: EntryStatusADDED, + Snapshot: entrySnapshotID, + Data: dataFile{ + // bad value for Content but this field doesn't exist in V1 + // so it shouldn't get written and shouldn't be read back out + // so the roundtrip test asserts that we get the default value + // back out. + Content: EntryContentEqDeletes, + Path: "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet", + Format: ParquetFile, + PartitionData: map[string]any{"VendorID": int(1), "tpep_pickup_datetime": time.Unix(1925, 0)}, + RecordCount: 19513, + FileSize: 388872, + BlockSizeInBytes: 67108864, + ColSizes: &[]colMap[int, int64]{ + {Key: 1, Value: 53}, + {Key: 2, Value: 98153}, + {Key: 3, Value: 98693}, + {Key: 4, Value: 53}, + {Key: 5, Value: 53}, + {Key: 6, Value: 53}, + {Key: 7, Value: 17425}, + {Key: 8, Value: 18528}, + {Key: 9, Value: 53}, + {Key: 10, Value: 44788}, + {Key: 11, Value: 35571}, + {Key: 12, Value: 53}, + {Key: 13, Value: 1243}, + {Key: 14, Value: 2355}, + {Key: 15, Value: 12750}, + {Key: 16, Value: 4029}, + {Key: 17, Value: 110}, + {Key: 18, Value: 47194}, + {Key: 19, Value: 2948}, + }, + ValCounts: &[]colMap[int, int64]{ + {Key: 1, Value: 19513}, + {Key: 2, Value: 19513}, + {Key: 3, Value: 19513}, + {Key: 4, Value: 19513}, + {Key: 5, Value: 19513}, + {Key: 6, Value: 19513}, + {Key: 7, Value: 19513}, + {Key: 8, Value: 19513}, + {Key: 9, Value: 19513}, + {Key: 10, Value: 19513}, + {Key: 11, Value: 19513}, + {Key: 12, Value: 19513}, + {Key: 13, Value: 19513}, + {Key: 14, Value: 19513}, + {Key: 15, Value: 19513}, + {Key: 16, Value: 19513}, + {Key: 17, Value: 19513}, + {Key: 18, Value: 19513}, + {Key: 19, Value: 19513}, + }, + NullCounts: &[]colMap[int, int64]{ + {Key: 1, Value: 19513}, + {Key: 2, Value: 0}, + {Key: 3, Value: 0}, + {Key: 4, Value: 19513}, + {Key: 5, Value: 19513}, + {Key: 6, Value: 19513}, + {Key: 7, Value: 0}, + {Key: 8, Value: 0}, + {Key: 9, Value: 19513}, + {Key: 10, Value: 0}, + {Key: 11, Value: 0}, + {Key: 12, Value: 19513}, + {Key: 13, Value: 0}, + {Key: 14, Value: 0}, + {Key: 15, Value: 0}, + {Key: 16, Value: 0}, + {Key: 17, Value: 0}, + {Key: 18, Value: 0}, + {Key: 19, Value: 0}, + }, + NaNCounts: &[]colMap[int, int64]{ + {Key: 16, Value: 0}, + {Key: 17, Value: 0}, + {Key: 18, Value: 0}, + {Key: 19, Value: 0}, + {Key: 10, Value: 0}, + {Key: 11, Value: 0}, + {Key: 12, Value: 0}, + {Key: 13, Value: 0}, + {Key: 14, Value: 0}, + {Key: 15, Value: 0}, + }, + LowerBounds: &[]colMap[int, []byte]{ + {Key: 2, Value: []byte("2020-04-01 00:00")}, + {Key: 3, Value: []byte("2020-04-01 00:12")}, + {Key: 7, Value: []byte{0x03, 0x00, 0x00, 0x00}}, + {Key: 8, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 10, Value: []byte{0xf6, 0x28, 0x5c, 0x8f, 0xc2, 0x05, 'S', 0xc0}}, + {Key: 11, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 14, Value: []byte{0, 0, 0, 0, 0, 0, 0xe0, 0xbf}}, + {Key: 15, Value: []byte{')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0}}, + {Key: 16, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 17, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 18, Value: []byte{0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0}}, + {Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0x04, 0xc0}}, + }, + UpperBounds: &[]colMap[int, []byte]{ + {Key: 2, Value: []byte("2020-04-30 23:5:")}, + {Key: 3, Value: []byte("2020-05-01 00:41")}, + {Key: 7, Value: []byte{'\t', 0x01, 0x00, 0x00}}, + {Key: 8, Value: []byte{'\t', 0x01, 0x00, 0x00}}, + {Key: 10, Value: []byte{0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'}}, + {Key: 11, Value: []byte{0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'}}, + {Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0x12, '@'}}, + {Key: 14, Value: []byte{0, 0, 0, 0, 0, 0, 0xe0, '?'}}, + {Key: 15, Value: []byte{'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'}}, + {Key: 16, Value: []byte{0, 0, 0, 0, 0, '`', 'B', '@'}}, + {Key: 17, Value: []byte{'3', '3', '3', '3', '3', '3', 0xd3, '?'}}, + {Key: 18, Value: []byte{0, 0, 0, 0, 0, 0x18, 'b', '@'}}, + {Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0x04, '@'}}, + }, + Splits: &[]int64{4}, + SortOrder: &intZero, + }, + }, + { + EntryStatus: EntryStatusADDED, + Snapshot: 8744736658442914487, + Data: dataFile{ + Path: "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet", + Format: ParquetFile, + PartitionData: map[string]any{"VendorID": int(1), "tpep_pickup_datetime": time.Unix(1925, 0)}, + RecordCount: 95050, + FileSize: 1265950, + BlockSizeInBytes: 67108864, + ColSizes: &[]colMap[int, int64]{ + {Key: 1, Value: 318}, + {Key: 2, Value: 329806}, + {Key: 3, Value: 331632}, + {Key: 4, Value: 15343}, + {Key: 5, Value: 2351}, + {Key: 6, Value: 3389}, + {Key: 7, Value: 71269}, + {Key: 8, Value: 76429}, + {Key: 9, Value: 16383}, + {Key: 10, Value: 86992}, + {Key: 11, Value: 89608}, + {Key: 12, Value: 265}, + {Key: 13, Value: 19377}, + {Key: 14, Value: 1692}, + {Key: 15, Value: 76162}, + {Key: 16, Value: 4354}, + {Key: 17, Value: 759}, + {Key: 18, Value: 120650}, + {Key: 19, Value: 11804}, + }, + ValCounts: &[]colMap[int, int64]{ + {Key: 1, Value: 95050}, + {Key: 2, Value: 95050}, + {Key: 3, Value: 95050}, + {Key: 4, Value: 95050}, + {Key: 5, Value: 95050}, + {Key: 6, Value: 95050}, + {Key: 7, Value: 95050}, + {Key: 8, Value: 95050}, + {Key: 9, Value: 95050}, + {Key: 10, Value: 95050}, + {Key: 11, Value: 95050}, + {Key: 12, Value: 95050}, + {Key: 13, Value: 95050}, + {Key: 14, Value: 95050}, + {Key: 15, Value: 95050}, + {Key: 16, Value: 95050}, + {Key: 17, Value: 95050}, + {Key: 18, Value: 95050}, + {Key: 19, Value: 95050}, + }, + NullCounts: &[]colMap[int, int64]{ + {Key: 1, Value: 0}, + {Key: 2, Value: 0}, + {Key: 3, Value: 0}, + {Key: 4, Value: 0}, + {Key: 5, Value: 0}, + {Key: 6, Value: 0}, + {Key: 7, Value: 0}, + {Key: 8, Value: 0}, + {Key: 9, Value: 0}, + {Key: 10, Value: 0}, + {Key: 11, Value: 0}, + {Key: 12, Value: 95050}, + {Key: 13, Value: 0}, + {Key: 14, Value: 0}, + {Key: 15, Value: 0}, + {Key: 16, Value: 0}, + {Key: 17, Value: 0}, + {Key: 18, Value: 0}, + {Key: 19, Value: 0}, + }, + NaNCounts: &[]colMap[int, int64]{ + {Key: 16, Value: 0}, + {Key: 17, Value: 0}, + {Key: 18, Value: 0}, + {Key: 19, Value: 0}, + {Key: 10, Value: 0}, + {Key: 11, Value: 0}, + {Key: 12, Value: 0}, + {Key: 13, Value: 0}, + {Key: 14, Value: 0}, + {Key: 15, Value: 0}, + }, + LowerBounds: &[]colMap[int, []byte]{ + {Key: 1, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 2, Value: []byte("2020-04-01 00:00")}, + {Key: 3, Value: []byte("2020-04-01 00:13")}, + {Key: 4, Value: []byte{0x00, 0x00, 0x00, 0x00}}, + {Key: 5, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 6, Value: []byte("N")}, + {Key: 7, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 8, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 9, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 10, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 11, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 14, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 15, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 16, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 17, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 18, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + {Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0, 0}}, + }, + UpperBounds: &[]colMap[int, []byte]{ + {Key: 1, Value: []byte{0x01, 0x00, 0x00, 0x00}}, + {Key: 2, Value: []byte("2020-04-30 23:5:")}, + {Key: 3, Value: []byte("2020-05-01 00:1:")}, + {Key: 4, Value: []byte{0x06, 0x00, 0x00, 0x00}}, + {Key: 5, Value: []byte{'c', 0x00, 0x00, 0x00}}, + {Key: 6, Value: []byte("Y")}, + {Key: 7, Value: []byte{'\t', 0x01, 0x00, 0x00}}, + {Key: 8, Value: []byte{'\t', 0x01, 0x00, 0x00}}, + {Key: 9, Value: []byte{0x04, 0x01, 0x00, 0x00}}, + {Key: 10, Value: []byte{'\\', 0x8f, 0xc2, 0xf5, '(', '8', 0x8c, '@'}}, + {Key: 11, Value: []byte{0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', 'f', '@'}}, + {Key: 13, Value: []byte{0, 0, 0, 0, 0, 0, 0x1c, '@'}}, + {Key: 14, Value: []byte{0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, 0xf1, '?'}}, + {Key: 15, Value: []byte{0, 0, 0, 0, 0, 0, 'Y', '@'}}, + {Key: 16, Value: []byte{0, 0, 0, 0, 0, 0xb0, 'X', '@'}}, + {Key: 17, Value: []byte{'3', '3', '3', '3', '3', '3', 0xd3, '?'}}, + {Key: 18, Value: []byte{0xc3, 0xf5, '(', '\\', 0x8f, ':', 0x8c, '@'}}, + {Key: 19, Value: []byte{0, 0, 0, 0, 0, 0, 0x04, '@'}}, + }, + Splits: &[]int64{4}, + SortOrder: &intZero, + }, + }, + } + + manifestEntryV2Records = []*manifestEntryV2{ + { + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: dataFile{ + Path: manifestEntryV1Records[0].Data.Path, + Format: manifestEntryV1Records[0].Data.Format, + PartitionData: manifestEntryV1Records[0].Data.PartitionData, + RecordCount: manifestEntryV1Records[0].Data.RecordCount, + FileSize: manifestEntryV1Records[0].Data.FileSize, + BlockSizeInBytes: manifestEntryV1Records[0].Data.BlockSizeInBytes, + ColSizes: manifestEntryV1Records[0].Data.ColSizes, + ValCounts: manifestEntryV1Records[0].Data.ValCounts, + NullCounts: manifestEntryV1Records[0].Data.NullCounts, + NaNCounts: manifestEntryV1Records[0].Data.NaNCounts, + LowerBounds: manifestEntryV1Records[0].Data.LowerBounds, + UpperBounds: manifestEntryV1Records[0].Data.UpperBounds, + Splits: manifestEntryV1Records[0].Data.Splits, + SortOrder: manifestEntryV1Records[0].Data.SortOrder, + }, + }, + { + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: dataFile{ + Path: manifestEntryV1Records[1].Data.Path, + Format: manifestEntryV1Records[1].Data.Format, + PartitionData: manifestEntryV1Records[1].Data.PartitionData, + RecordCount: manifestEntryV1Records[1].Data.RecordCount, + FileSize: manifestEntryV1Records[1].Data.FileSize, + BlockSizeInBytes: manifestEntryV1Records[1].Data.BlockSizeInBytes, + ColSizes: manifestEntryV1Records[1].Data.ColSizes, + ValCounts: manifestEntryV1Records[1].Data.ValCounts, + NullCounts: manifestEntryV1Records[1].Data.NullCounts, + NaNCounts: manifestEntryV1Records[1].Data.NaNCounts, + LowerBounds: manifestEntryV1Records[1].Data.LowerBounds, + UpperBounds: manifestEntryV1Records[1].Data.UpperBounds, + Splits: manifestEntryV1Records[1].Data.Splits, + SortOrder: manifestEntryV1Records[1].Data.SortOrder, + }, + }, + } +) + +type ManifestTestSuite struct { + suite.Suite + + v1ManifestList bytes.Buffer + v1ManifestEntries bytes.Buffer + + v2ManifestList bytes.Buffer + v2ManifestEntries bytes.Buffer +} + +func (m *ManifestTestSuite) writeManifestList() { + enc, err := ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestListV1Key).String(), + &m.v1ManifestList, ocf.WithMetadata(map[string][]byte{ + "avro.codec": []byte("deflate"), + }), + ocf.WithCodec(ocf.Deflate)) + m.Require().NoError(err) + + m.Require().NoError(enc.Encode(manifestFileRecordsV1[0])) + enc.Close() + + enc, err = ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestListV2Key).String(), + &m.v2ManifestList, ocf.WithMetadata(map[string][]byte{ + "format-version": []byte("2"), + "avro.codec": []byte("deflate"), + }), ocf.WithCodec(ocf.Deflate)) + m.Require().NoError(err) + + m.Require().NoError(enc.Encode(manifestFileRecordsV2[0])) + enc.Close() +} + +func (m *ManifestTestSuite) writeManifestEntries() { + enc, err := ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestEntryV1Key).String(), &m.v1ManifestEntries, + ocf.WithMetadata(map[string][]byte{ + "format-version": []byte("1"), + }), ocf.WithCodec(ocf.Deflate)) + m.Require().NoError(err) + + for _, ent := range manifestEntryV1Records { + m.Require().NoError(enc.Encode(ent)) + } + m.Require().NoError(enc.Close()) + + enc, err = ocf.NewEncoder(internal.AvroSchemaCache.Get(internal.ManifestEntryV2Key).String(), + &m.v2ManifestEntries, ocf.WithMetadata(map[string][]byte{ + "format-version": []byte("2"), + "avro.codec": []byte("deflate"), + }), ocf.WithCodec(ocf.Deflate)) + m.Require().NoError(err) + + for _, ent := range manifestEntryV2Records { + m.Require().NoError(enc.Encode(ent)) + } + m.Require().NoError(enc.Close()) +} + +func (m *ManifestTestSuite) SetupSuite() { + m.writeManifestList() + m.writeManifestEntries() +} + +func (m *ManifestTestSuite) TestManifestEntriesV1() { + var mockfs internal.MockFS + manifest := manifestFileV1{ + Path: manifestFileRecordsV1[0].FilePath(), + } + + mockfs.Test(m.T()) + mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{ + Contents: bytes.NewReader(m.v1ManifestEntries.Bytes())}, nil) + defer mockfs.AssertExpectations(m.T()) + entries, err := manifest.FetchEntries(&mockfs, false) + m.Require().NoError(err) + m.Len(entries, 2) + m.Zero(manifest.PartitionSpecID()) + m.Zero(manifest.SnapshotID()) + m.Zero(manifest.AddedDataFiles()) + m.Zero(manifest.ExistingDataFiles()) + m.Zero(manifest.DeletedDataFiles()) + m.Zero(manifest.ExistingRows()) + m.Zero(manifest.DeletedRows()) + m.Zero(manifest.AddedRows()) + + entry1 := entries[0] + + m.Equal(EntryStatusADDED, entry1.Status()) + m.EqualValues(8744736658442914487, entry1.SnapshotID()) + m.Zero(entry1.SequenceNum()) + m.Nil(entry1.FileSequenceNum()) + + datafile := entry1.DataFile() + m.Equal(EntryContentData, datafile.ContentType()) + m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet", datafile.FilePath()) + m.Equal(ParquetFile, datafile.FileFormat()) + m.EqualValues(19513, datafile.Count()) + m.EqualValues(388872, datafile.FileSizeBytes()) + m.Equal(map[int]int64{ + 1: 53, + 2: 98153, + 3: 98693, + 4: 53, + 5: 53, + 6: 53, + 7: 17425, + 8: 18528, + 9: 53, + 10: 44788, + 11: 35571, + 12: 53, + 13: 1243, + 14: 2355, + 15: 12750, + 16: 4029, + 17: 110, + 18: 47194, + 19: 2948, + }, datafile.ColumnSizes()) + m.Equal(map[int]int64{ + 1: 19513, + 2: 19513, + 3: 19513, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 19513, + 8: 19513, + 9: 19513, + 10: 19513, + 11: 19513, + 12: 19513, + 13: 19513, + 14: 19513, + 15: 19513, + 16: 19513, + 17: 19513, + 18: 19513, + 19: 19513, + }, datafile.ValueCounts()) + m.Equal(map[int]int64{ + 1: 19513, + 2: 0, + 3: 0, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 0, + 8: 0, + 9: 19513, + 10: 0, + 11: 0, + 12: 19513, + 13: 0, + 14: 0, + 15: 0, + 16: 0, + 17: 0, + 18: 0, + 19: 0, + }, datafile.NullValueCounts()) + m.Equal(map[int]int64{ + 16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, + }, datafile.NaNValueCounts()) + + m.Equal(map[int][]byte{ + 2: []byte("2020-04-01 00:00"), + 3: []byte("2020-04-01 00:12"), + 7: {0x03, 0x00, 0x00, 0x00}, + 8: {0x01, 0x00, 0x00, 0x00}, + 10: {0xf6, '(', '\\', 0x8f, 0xc2, 0x05, 'S', 0xc0}, + 11: {0, 0, 0, 0, 0, 0, 0, 0}, + 13: {0, 0, 0, 0, 0, 0, 0, 0}, + 14: {0, 0, 0, 0, 0, 0, 0xe0, 0xbf}, + 15: {')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0}, + 16: {0, 0, 0, 0, 0, 0, 0, 0}, + 17: {0, 0, 0, 0, 0, 0, 0, 0}, + 18: {0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0}, + 19: {0, 0, 0, 0, 0, 0, 0x04, 0xc0}, + }, datafile.LowerBoundValues()) + + m.Equal(map[int][]byte{ + 2: []byte("2020-04-30 23:5:"), + 3: []byte("2020-05-01 00:41"), + 7: {'\t', 0x01, 0, 0}, + 8: {'\t', 0x01, 0, 0}, + 10: {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'}, + 11: {0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'}, + 13: {0, 0, 0, 0, 0, 0, 0x12, '@'}, + 14: {0, 0, 0, 0, 0, 0, 0xe0, '?'}, + 15: {'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'}, + 16: {0, 0, 0, 0, 0, '`', 'B', '@'}, + 17: {'3', '3', '3', '3', '3', '3', 0xd3, '?'}, + 18: {0, 0, 0, 0, 0, 0x18, 'b', '@'}, + 19: {0, 0, 0, 0, 0, 0, 0x04, '@'}, + }, datafile.UpperBoundValues()) + + m.Nil(datafile.KeyMetadata()) + m.Equal([]int64{4}, datafile.SplitOffsets()) + m.Nil(datafile.EqualityFieldIDs()) + m.Zero(*datafile.SortOrderID()) +} + +func (m *ManifestTestSuite) TestReadManifestListV1() { + list, err := ReadManifestList(&m.v1ManifestList) + m.Require().NoError(err) + + m.Len(list, 1) + m.Equal(1, list[0].Version()) + m.EqualValues(7989, list[0].Length()) + m.Equal(ManifestContentData, list[0].ManifestContent()) + m.Zero(list[0].SequenceNum()) + m.Zero(list[0].MinSequenceNum()) + m.EqualValues(9182715666859759686, list[0].SnapshotID()) + m.EqualValues(3, list[0].AddedDataFiles()) + m.True(list[0].HasAddedFiles()) + m.Zero(list[0].ExistingDataFiles()) + m.False(list[0].HasExistingFiles()) + m.Zero(list[0].DeletedDataFiles()) + m.Equal(addedRows, list[0].AddedRows()) + m.Zero(list[0].ExistingRows()) + m.Zero(list[0].DeletedRows()) + m.Nil(list[0].KeyMetadata()) + m.Zero(list[0].PartitionSpecID()) + m.Equal(snapshotID, list[0].SnapshotID()) + + part := list[0].Partitions()[0] + m.True(part.ContainsNull) + m.False(*part.ContainsNaN) + m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound) + m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound) +} + +func (m *ManifestTestSuite) TestReadManifestListV2() { + list, err := ReadManifestList(&m.v2ManifestList) + m.Require().NoError(err) + + m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro", list[0].FilePath()) + m.Len(list, 1) + m.Equal(2, list[0].Version()) + m.EqualValues(7989, list[0].Length()) + m.Equal(ManifestContentDeletes, list[0].ManifestContent()) + m.EqualValues(3, list[0].SequenceNum()) + m.EqualValues(3, list[0].MinSequenceNum()) + m.EqualValues(9182715666859759686, list[0].SnapshotID()) + m.EqualValues(3, list[0].AddedDataFiles()) + m.True(list[0].HasAddedFiles()) + m.Zero(list[0].ExistingDataFiles()) + m.False(list[0].HasExistingFiles()) + m.Zero(list[0].DeletedDataFiles()) + m.Equal(addedRows, list[0].AddedRows()) + m.Zero(list[0].ExistingRows()) + m.Zero(list[0].DeletedRows()) + m.Nil(list[0].KeyMetadata()) + m.Zero(list[0].PartitionSpecID()) + + part := list[0].Partitions()[0] + m.True(part.ContainsNull) + m.False(*part.ContainsNaN) + m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound) + m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound) +} + +func (m *ManifestTestSuite) TestManifestEntriesV2() { + var mockfs internal.MockFS + manifest := manifestFileV2{ + Path: manifestFileRecordsV2[0].FilePath(), + } + + mockfs.Test(m.T()) + mockfs.On("Open", manifest.FilePath()).Return(&internal.MockFile{ + Contents: bytes.NewReader(m.v2ManifestEntries.Bytes())}, nil) + defer mockfs.AssertExpectations(m.T()) + entries, err := manifest.FetchEntries(&mockfs, false) + m.Require().NoError(err) + m.Len(entries, 2) + m.Zero(manifest.PartitionSpecID()) + m.Zero(manifest.SnapshotID()) + m.Zero(manifest.AddedDataFiles()) + m.Zero(manifest.ExistingDataFiles()) + m.Zero(manifest.DeletedDataFiles()) + m.Zero(manifest.ExistingRows()) + m.Zero(manifest.DeletedRows()) + m.Zero(manifest.AddedRows()) + + entry1 := entries[0] + + m.Equal(EntryStatusADDED, entry1.Status()) + m.Equal(entrySnapshotID, entry1.SnapshotID()) + m.Zero(entry1.SequenceNum()) + m.Zero(*entry1.FileSequenceNum()) + + datafile := entry1.DataFile() + m.Equal(EntryContentData, datafile.ContentType()) + m.Equal("/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet", datafile.FilePath()) + m.Equal(ParquetFile, datafile.FileFormat()) + m.EqualValues(19513, datafile.Count()) + m.EqualValues(388872, datafile.FileSizeBytes()) + m.Equal(map[int]int64{ + 1: 53, + 2: 98153, + 3: 98693, + 4: 53, + 5: 53, + 6: 53, + 7: 17425, + 8: 18528, + 9: 53, + 10: 44788, + 11: 35571, + 12: 53, + 13: 1243, + 14: 2355, + 15: 12750, + 16: 4029, + 17: 110, + 18: 47194, + 19: 2948, + }, datafile.ColumnSizes()) + m.Equal(map[int]int64{ + 1: 19513, + 2: 19513, + 3: 19513, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 19513, + 8: 19513, + 9: 19513, + 10: 19513, + 11: 19513, + 12: 19513, + 13: 19513, + 14: 19513, + 15: 19513, + 16: 19513, + 17: 19513, + 18: 19513, + 19: 19513, + }, datafile.ValueCounts()) + m.Equal(map[int]int64{ + 1: 19513, + 2: 0, + 3: 0, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 0, + 8: 0, + 9: 19513, + 10: 0, + 11: 0, + 12: 19513, + 13: 0, + 14: 0, + 15: 0, + 16: 0, + 17: 0, + 18: 0, + 19: 0, + }, datafile.NullValueCounts()) + m.Equal(map[int]int64{ + 16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, + }, datafile.NaNValueCounts()) + + m.Equal(map[int][]byte{ + 2: []byte("2020-04-01 00:00"), + 3: []byte("2020-04-01 00:12"), + 7: {0x03, 0x00, 0x00, 0x00}, + 8: {0x01, 0x00, 0x00, 0x00}, + 10: {0xf6, '(', '\\', 0x8f, 0xc2, 0x05, 'S', 0xc0}, + 11: {0, 0, 0, 0, 0, 0, 0, 0}, + 13: {0, 0, 0, 0, 0, 0, 0, 0}, + 14: {0, 0, 0, 0, 0, 0, 0xe0, 0xbf}, + 15: {')', '\\', 0x8f, 0xc2, 0xf5, '(', 0x08, 0xc0}, + 16: {0, 0, 0, 0, 0, 0, 0, 0}, + 17: {0, 0, 0, 0, 0, 0, 0, 0}, + 18: {0xf6, '(', '\\', 0x8f, 0xc2, 0xc5, 'S', 0xc0}, + 19: {0, 0, 0, 0, 0, 0, 0x04, 0xc0}, + }, datafile.LowerBoundValues()) + + m.Equal(map[int][]byte{ + 2: []byte("2020-04-30 23:5:"), + 3: []byte("2020-05-01 00:41"), + 7: {'\t', 0x01, 0, 0}, + 8: {'\t', 0x01, 0, 0}, + 10: {0xcd, 0xcc, 0xcc, 0xcc, 0xcc, ',', '_', '@'}, + 11: {0x1f, 0x85, 0xeb, 'Q', '\\', 0xe2, 0xfe, '@'}, + 13: {0, 0, 0, 0, 0, 0, 0x12, '@'}, + 14: {0, 0, 0, 0, 0, 0, 0xe0, '?'}, + 15: {'q', '=', '\n', 0xd7, 0xa3, 0xf0, '1', '@'}, + 16: {0, 0, 0, 0, 0, '`', 'B', '@'}, + 17: {'3', '3', '3', '3', '3', '3', 0xd3, '?'}, + 18: {0, 0, 0, 0, 0, 0x18, 'b', '@'}, + 19: {0, 0, 0, 0, 0, 0, 0x04, '@'}, + }, datafile.UpperBoundValues()) + + m.Nil(datafile.KeyMetadata()) + m.Equal([]int64{4}, datafile.SplitOffsets()) + m.Nil(datafile.EqualityFieldIDs()) + m.Zero(*datafile.SortOrderID()) +} + +func TestManifests(t *testing.T) { + suite.Run(t, new(ManifestTestSuite)) +}