Skip to content

Commit

Permalink
feat(partitioning): Add partition specs/fields and basic transform in…
Browse files Browse the repository at this point in the history
…terface (#2)
  • Loading branch information
zeroshade authored Sep 14, 2023
1 parent 4dc8f48 commit 39b0197
Show file tree
Hide file tree
Showing 10 changed files with 661 additions and 7 deletions.
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ LICENSE
NOTICE
go.sum
build
rat-results.txt
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ var (
ErrNotImplemented = errors.New("not implemented")
ErrInvalidArgument = errors.New("invalid argument")
ErrInvalidSchema = errors.New("invalid schema")
ErrInvalidTransform = errors.New("invalid transform syntax")
)
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ go 1.20

require (
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 15 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA=
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
232 changes: 232 additions & 0 deletions partitions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package iceberg

import (
"encoding/json"
"fmt"
"strings"

"golang.org/x/exp/slices"
)

const (
partitionDataIDStart = 1000
InitialPartitionSpecID = 0
)

// UnpartitionedSpec is the default unpartitioned spec which can
// be used for comparisons or to just provide a convenience for referencing
// the same unpartitioned spec object.
var UnpartitionedSpec = &PartitionSpec{id: 0}

// PartitionField represents how one partition value is derived from the
// source column by transformation.
type PartitionField struct {
// SourceID is the source column id of the table's schema
SourceID int `json:"source-id"`
// FieldID is the partition field id across all the table partition specs
FieldID int `json:"field-id"`
// Name is the name of the partition field itself
Name string `json:"name"`
// Transform is the transform used to produce the partition value
Transform Transform `json:"transform"`
}

func (p *PartitionField) String() string {
return fmt.Sprintf("%d: %s: %s(%d)", p.FieldID, p.Name, p.Transform, p.SourceID)
}

func (p *PartitionField) UnmarshalJSON(b []byte) error {
type Alias PartitionField
aux := struct {
TransformString string `json:"transform"`
*Alias
}{
Alias: (*Alias)(p),
}

err := json.Unmarshal(b, &aux)
if err != nil {
return err
}

if p.Transform, err = ParseTransform(aux.TransformString); err != nil {
return err
}

return nil
}

// PartitionSpec captures the transformation from table data to partition values
type PartitionSpec struct {
// any change to a PartitionSpec will produce a new spec id
id int
fields []PartitionField

// this is populated by initialize after creation
sourceIdToFields map[int][]PartitionField
}

func NewPartitionSpec(fields ...PartitionField) PartitionSpec {
return NewPartitionSpecID(InitialPartitionSpecID, fields...)
}

func NewPartitionSpecID(id int, fields ...PartitionField) PartitionSpec {
ret := PartitionSpec{id: id, fields: fields}
ret.initialize()
return ret
}

// CompatibleWith returns true if this partition spec is considered
// compatible with the passed in partition spec. This means that the two
// specs have equivalent field lists regardless of the spec id.
func (ps *PartitionSpec) CompatibleWith(other *PartitionSpec) bool {
if ps == other {
return true
}

if len(ps.fields) != len(other.fields) {
return false
}

return slices.EqualFunc(ps.fields, other.fields, func(left, right PartitionField) bool {
return left.SourceID == right.SourceID && left.Name == right.Name &&
left.Transform == right.Transform
})
}

// Equals returns true iff the field lists are the same AND the spec id
// is the same between this partition spec and the provided one.
func (ps *PartitionSpec) Equals(other PartitionSpec) bool {
return ps.id == other.id && slices.Equal(ps.fields, other.fields)
}

func (ps PartitionSpec) MarshalJSON() ([]byte, error) {
if ps.fields == nil {
ps.fields = []PartitionField{}
}
return json.Marshal(struct {
ID int `json:"spec-id"`
Fields []PartitionField `json:"fields"`
}{ps.id, ps.fields})
}

func (ps *PartitionSpec) UnmarshalJSON(b []byte) error {
aux := struct {
ID int `json:"spec-id"`
Fields []PartitionField `json:"fields"`
}{ID: ps.id, Fields: ps.fields}

if err := json.Unmarshal(b, &aux); err != nil {
return err
}

ps.id, ps.fields = aux.ID, aux.Fields
ps.initialize()
return nil
}

func (ps *PartitionSpec) initialize() {
ps.sourceIdToFields = make(map[int][]PartitionField)
for _, f := range ps.fields {
ps.sourceIdToFields[f.SourceID] =
append(ps.sourceIdToFields[f.SourceID], f)
}
}

func (ps *PartitionSpec) ID() int { return ps.id }
func (ps *PartitionSpec) NumFields() int { return len(ps.fields) }
func (ps *PartitionSpec) Field(i int) PartitionField { return ps.fields[i] }

func (ps *PartitionSpec) IsUnpartitioned() bool {
if len(ps.fields) == 0 {
return true
}

for _, f := range ps.fields {
if _, ok := f.Transform.(VoidTransform); !ok {
return false
}
}

return true
}

func (ps *PartitionSpec) FieldsBySourceID(fieldID int) []PartitionField {
return slices.Clone(ps.sourceIdToFields[fieldID])
}

func (ps PartitionSpec) String() string {
var b strings.Builder
b.WriteByte('[')
for i, f := range ps.fields {
if i == 0 {
b.WriteString("\n")
}
b.WriteString("\t")
b.WriteString(f.String())
b.WriteString("\n")
}
b.WriteByte(']')

return b.String()
}

func (ps *PartitionSpec) LastAssignedFieldID() int {
if len(ps.fields) == 0 {
return partitionDataIDStart - 1
}

id := ps.fields[0].FieldID
for _, f := range ps.fields[1:] {
if f.FieldID > id {
id = f.FieldID
}
}
return id
}

// PartitionType produces a struct of the partition spec.
//
// The partition fields should be optional:
// - All partition transforms are required to produce null if the input value
// is null. This can happen when the source column is optional.
// - Partition fields may be added later, in which case not all files would
// have the result field and it may be null.
//
// There is a case where we can guarantee that a partition field in the first
// and only parittion spec that uses a required source column will never be
// null, but it doesn't seem worth tracking this case.
func (ps *PartitionSpec) PartitionType(schema *Schema) *StructType {
nestedFields := []NestedField{}
for _, field := range ps.fields {
sourceType, ok := schema.FindTypeByID(field.SourceID)
if !ok {
continue
}
resultType := field.Transform.ResultType(sourceType)
nestedFields = append(nestedFields, NestedField{
ID: field.FieldID,
Name: field.Name,
Type: resultType,
Required: false,
})
}
return &StructType{FieldList: nestedFields}
}
Loading

0 comments on commit 39b0197

Please sign in to comment.