Skip to content

Commit

Permalink
Feature/import filter by prefix (#1085)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Dec 20, 2020
1 parent 6d4d81d commit 7c2c305
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 197 deletions.
2 changes: 1 addition & 1 deletion block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (a *Adapter) ValidateConfiguration(_ string) error {
return nil
}

func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) {
func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) {
return nil, fmt.Errorf("inventory %w", ErrNotImplemented)
}

Expand Down
2 changes: 1 addition & 1 deletion block/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type InventoryGenerator interface {
GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string, shouldSort bool) (Inventory, error)
GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string, shouldSort bool, prefixes []string) (Inventory, error)
}

// Inventory represents a snapshot of the storage space
Expand Down
2 changes: 1 addition & 1 deletion block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (l *Adapter) ValidateConfiguration(_ string) error {
return nil
}

func (l *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) {
func (l *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) {
return nil, ErrInventoryNotSupported
}

Expand Down
2 changes: 1 addition & 1 deletion block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (a *Adapter) ValidateConfiguration(_ string) error {
return nil
}

func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool) (block.Inventory, error) {
func (a *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string, _ bool, _ []string) (block.Inventory, error) {
return nil, ErrInventoryNotImplemented
}

Expand Down
67 changes: 52 additions & 15 deletions block/s3/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"sort"
"strings"

"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/service/s3"
Expand All @@ -29,35 +30,43 @@ type Manifest struct {
}

type inventoryFile struct {
Key string `json:"key"` // an s3 key for an inventory list file
Key string `json:"key"` // an s3 key for an inventory list file
firstKey string
lastKey string
}

func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool) (block.Inventory, error) {
return GenerateInventory(logger, manifestURL, a.s3, s3inventory.NewReader(ctx, a.s3, logger), shouldSort)
func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool, prefixes []string) (block.Inventory, error) {
return GenerateInventory(logger, manifestURL, a.s3, s3inventory.NewReader(ctx, a.s3, logger), shouldSort, prefixes)
}

func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader s3inventory.IReader, shouldSort bool) (block.Inventory, error) {
func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader s3inventory.IReader, shouldSort bool, prefixes []string) (block.Inventory, error) {
if logger == nil {
logger = logging.Default()
}
m, err := loadManifest(manifestURL, s3)
if err != nil {
return nil, err
}
if shouldSort {
if shouldSort || len(prefixes) > 0 {
err = sortManifest(m, logger, inventoryReader)
}
if err != nil {
return nil, err
}
return &Inventory{Manifest: m, logger: logger, shouldSort: shouldSort, reader: inventoryReader}, nil
if len(prefixes) > 0 {
manifestFileCount := len(m.Files)
m.Files = filterFiles(m.Files, prefixes)
logger.Debugf("manifest filtered from %d to %d files", manifestFileCount, len(m.Files))
}
return &Inventory{Manifest: m, logger: logger, shouldSort: shouldSort, reader: inventoryReader, prefixes: prefixes}, nil
}

type Inventory struct {
Manifest *Manifest
logger logging.Logger
shouldSort bool
reader s3inventory.IReader
prefixes []string
}

func (inv *Inventory) Iterator() block.InventoryIterator {
Expand Down Expand Up @@ -98,30 +107,58 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) {
return &m, nil
}

func filterFiles(files []inventoryFile, prefixes []string) []inventoryFile {
sort.Strings(prefixes)
currentPrefixIdx := 0
filteredFiles := make([]inventoryFile, 0)
for i := 0; i < len(files); i++ {
for {
// find a prefix that may have suitable keys in the current file
if prefixes[currentPrefixIdx] >= files[i].firstKey {
// prefix may be in scope of current file
break
}
if strings.HasPrefix(files[i].firstKey, prefixes[currentPrefixIdx]) {
// first object in file starts with prefix
break
}
// current prefix ends before this file - move to next prefix
currentPrefixIdx++
if currentPrefixIdx == len(prefixes) {
// no more prefixes - other files are irrelevant
return filteredFiles
}
}
if strings.HasPrefix(files[i].firstKey, prefixes[currentPrefixIdx]) ||
(prefixes[currentPrefixIdx] >= files[i].firstKey && prefixes[currentPrefixIdx] < files[i].lastKey) {
// file may contain keys starting with this prefix
filteredFiles = append(filteredFiles, files[i])
}
}
return filteredFiles
}

func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader) error {
firstKeyByInventoryFile := make(map[string]string)
lastKeyByInventoryFile := make(map[string]string)
for _, f := range m.Files {
for i, f := range m.Files {
mr, err := reader.GetMetadataReader(m.Format, m.inventoryBucket, f.Key)
if err != nil {
return fmt.Errorf("failed to sort inventory files in manifest: %w", err)
}
firstKeyByInventoryFile[f.Key] = mr.FirstObjectKey()
lastKeyByInventoryFile[f.Key] = mr.LastObjectKey()
m.Files[i].firstKey = mr.FirstObjectKey()
m.Files[i].lastKey = mr.LastObjectKey()
err = mr.Close()
if err != nil {
logger.Errorf("failed to close inventory file. file=%s, err=%w", f, err)
}
}
sort.Slice(m.Files, func(i, j int) bool {
return firstKeyByInventoryFile[m.Files[i].Key] < firstKeyByInventoryFile[m.Files[j].Key] ||
(firstKeyByInventoryFile[m.Files[i].Key] == firstKeyByInventoryFile[m.Files[j].Key] &&
lastKeyByInventoryFile[m.Files[i].Key] < lastKeyByInventoryFile[m.Files[j].Key])
return m.Files[i].firstKey < m.Files[j].firstKey ||
(m.Files[i].firstKey == m.Files[j].firstKey && m.Files[i].lastKey < m.Files[j].lastKey)
})
// validate sorting: if a file begins before the next one ends - the files cover overlapping ranges,
// which we don't know how to handle.
for i := 0; i < len(m.Files)-1; i++ {
if firstKeyByInventoryFile[m.Files[i+1].Key] < lastKeyByInventoryFile[m.Files[i].Key] {
if m.Files[i+1].firstKey < m.Files[i].lastKey {
return ErrInventoryFilesRangesOverlap
}
}
Expand Down
20 changes: 20 additions & 0 deletions block/s3/inventory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/treeverse/lakefs/block"
Expand All @@ -22,6 +23,7 @@ type InventoryIterator struct {
valIndexInBuffer int
inventoryFileProgress *cmdutils.Progress
currentFileProgress *cmdutils.Progress
currentPrefix int
}

func NewInventoryIterator(inv *Inventory) *InventoryIterator {
Expand Down Expand Up @@ -112,6 +114,24 @@ func (it *InventoryIterator) nextFromBuffer() *block.InventoryObject {
if !obj.IsLatest || obj.IsDeleteMarker {
continue
}
if len(it.prefixes) > 0 {
// check file against prefix filter
// loop while current prefix may fit current object
for it.prefixes[it.currentPrefix] < obj.Key {
if strings.HasPrefix(obj.Key, it.prefixes[it.currentPrefix]) {
// found current prefix
break
}
it.currentPrefix++
if it.currentPrefix == len(it.prefixes) {
// no more prefixes
return nil
}
}
if !strings.HasPrefix(obj.Key, it.prefixes[it.currentPrefix]) {
continue
}
}
res := block.InventoryObject{
Bucket: obj.Bucket,
Key: obj.Key,
Expand Down
Loading

0 comments on commit 7c2c305

Please sign in to comment.