From d4f0c19e2f656b6e55de6c45fcb18c8f6924ef44 Mon Sep 17 00:00:00 2001 From: N-o-Z Date: Thu, 31 Oct 2024 16:40:10 -0400 Subject: [PATCH] Fix: Get mtime from storage server (#8329) * Fix: Get mtime from storage server * CR Fixes --- pkg/api/controller.go | 11 ++++---- pkg/block/adapter.go | 13 +++++++++- pkg/block/azure/adapter.go | 15 ++++++----- pkg/block/blocktest/adapter.go | 6 ++--- pkg/block/blocktest/basic_suite.go | 13 +++++----- pkg/block/gs/adapter.go | 12 ++++----- pkg/block/local/adapter.go | 19 ++++++++------ pkg/block/mem/adapter.go | 10 +++---- pkg/block/metrics.go | 2 +- pkg/block/s3/adapter.go | 26 ++++++++++++++----- pkg/block/transient/adapter.go | 9 ++++--- pkg/catalog/actions_output_writer.go | 3 ++- pkg/catalog/catalog.go | 2 +- pkg/gateway/operations/putobject.go | 2 +- .../retention/garbage_collection_manager.go | 8 +++--- pkg/pyramid/tier_fs.go | 2 +- pkg/samplerepo/samplecontent.go | 3 +-- pkg/testutil/adapter.go | 6 ++--- pkg/upload/write_blob.go | 5 +++- 19 files changed, 103 insertions(+), 64 deletions(-) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 1623dc74cfd..9eb95edde03 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -2108,7 +2108,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac return err } - if err := c.BlockAdapter.Put(ctx, obj, objLen, strings.NewReader(dummyData), block.PutOpts{}); err != nil { + if _, err := c.BlockAdapter.Put(ctx, obj, objLen, strings.NewReader(dummyData), block.PutOpts{}); err != nil { return err } @@ -3227,11 +3227,10 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi } } // write metadata - writeTime := time.Now() entryBuilder := catalog.NewDBEntryBuilder(). Path(params.Path). PhysicalAddress(blob.PhysicalAddress). - CreationDate(writeTime). + CreationDate(blob.CreationDate). Size(blob.Size). Checksum(blob.Checksum). ContentType(contentType) @@ -3268,7 +3267,7 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi response := apigen.ObjectStats{ Checksum: blob.Checksum, - Mtime: writeTime.Unix(), + Mtime: blob.CreationDate.Unix(), Path: params.Path, PathType: entryTypeObject, PhysicalAddress: qk.Format(), @@ -3825,7 +3824,7 @@ func (c *Controller) DumpRefs(w http.ResponseWriter, r *http.Request, repository writeError(w, r, http.StatusInternalServerError, err) return } - err = c.BlockAdapter.Put(ctx, block.ObjectPointer{ + _, err = c.BlockAdapter.Put(ctx, block.ObjectPointer{ StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: fmt.Sprintf("%s/refs_manifest.json", c.Config.Committed.BlockStoragePrefix), @@ -4165,7 +4164,7 @@ func (c *Controller) CreateSymlinkFile(w http.ResponseWriter, r *http.Request, r func writeSymlink(ctx context.Context, repo *catalog.Repository, branch, path string, addresses []string, adapter block.Adapter) error { address := fmt.Sprintf("%s/%s/%s/%s/symlink.txt", lakeFSPrefix, repo.Name, branch, path) data := strings.Join(addresses, "\n") - err := adapter.Put(ctx, block.ObjectPointer{ + _, err := adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: address, diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 4945c8ac517..77f82e6822b 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -150,11 +150,22 @@ type BlockstoreMetadata struct { Region *string } +type PutResponse struct { + ModTime *time.Time +} + +func (r *PutResponse) GetMtime() time.Time { + if r != nil && r.ModTime != nil { + return *r.ModTime + } + return time.Now() +} + // Adapter abstract Storage Adapter for persistence of version controlled data. The methods generally map to S3 API methods // - Generally some type of Object Storage // - Can also be block storage or even in-memory type Adapter interface { - Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error + Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) (*PutResponse, error) Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error) // GetWalker is never called on the server side. diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index 8138d0ff05c..3c9a332d6e8 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -194,20 +194,23 @@ func (a *Adapter) log(ctx context.Context) logging.Logger { return logging.FromContext(ctx) } -func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { +func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) { var err error defer reportMetrics("Put", time.Now(), &sizeBytes, &err) qualifiedKey, err := resolveBlobURLInfo(obj) if err != nil { - return err + return nil, err } o := a.translatePutOpts(ctx, opts) containerClient, err := a.clientCache.NewContainerClient(qualifiedKey.StorageAccountName, qualifiedKey.ContainerName) if err != nil { - return err + return nil, err } - _, err = containerClient.NewBlockBlobClient(qualifiedKey.BlobURL).UploadStream(ctx, reader, &o) - return err + res, err := containerClient.NewBlockBlobClient(qualifiedKey.BlobURL).UploadStream(ctx, reader, &o) + if err != nil { + return nil, err + } + return &block.PutResponse{ModTime: res.LastModified}, nil } func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadCloser, error) { @@ -583,7 +586,7 @@ func (a *Adapter) BlockstoreType() string { return block.BlockstoreTypeAzure } -func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) { +func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/blocktest/adapter.go b/pkg/block/blocktest/adapter.go index c1a6301afef..9db4c0f18a8 100644 --- a/pkg/block/blocktest/adapter.go +++ b/pkg/block/blocktest/adapter.go @@ -38,7 +38,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s ctx := context.Background() part1 := "this is the first part " part2 := "this is the last part" - err := adapter.Put(ctx, block.ObjectPointer{ + _, err := adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, Identifier: "test_file", IdentifierType: block.IdentifierTypeRelative, @@ -88,7 +88,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str for i := 0; i < filesAndFolders; i++ { for j := 0; j < filesAndFolders; j++ { - err := adapter.Put(ctx, block.ObjectPointer{ + _, err := adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, Identifier: fmt.Sprintf("%s/folder_%d/test_file_%d", testPrefix, filesAndFolders-i-1, filesAndFolders-j-1), IdentifierType: block.IdentifierTypeRelative, @@ -97,7 +97,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str } } - err := adapter.Put(ctx, block.ObjectPointer{ + _, err := adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, Identifier: fmt.Sprintf("%s/folder_0.txt", testPrefix), IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/block/blocktest/basic_suite.go b/pkg/block/blocktest/basic_suite.go index fc4708e5811..e4541be51b2 100644 --- a/pkg/block/blocktest/basic_suite.go +++ b/pkg/block/blocktest/basic_suite.go @@ -44,7 +44,7 @@ func testAdapterPutGet(t *testing.T, adapter block.Adapter, storageNamespace, ex IdentifierType: c.identifierType, } - err := adapter.Put(ctx, obj, size, strings.NewReader(contents), block.PutOpts{}) + _, err := adapter.Put(ctx, obj, size, strings.NewReader(contents), block.PutOpts{}) require.NoError(t, err) reader, err := adapter.Get(ctx, obj) @@ -73,8 +73,8 @@ func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace strin Identifier: "export/to/dst", IdentifierType: block.IdentifierTypeRelative, } - - require.NoError(t, adapter.Put(ctx, src, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})) + _, err := adapter.Put(ctx, src, int64(len(contents)), strings.NewReader(contents), block.PutOpts{}) + require.NoError(t, err) require.NoError(t, adapter.Copy(ctx, src, dst)) reader, err := adapter.Get(ctx, dst) @@ -134,7 +134,8 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str Identifier: tt.name + "/" + p, IdentifierType: block.IdentifierTypeRelative, } - require.NoError(t, adapter.Put(ctx, obj, int64(len(content)), strings.NewReader(content), block.PutOpts{})) + _, err := adapter.Put(ctx, obj, int64(len(content)), strings.NewReader(content), block.PutOpts{}) + require.NoError(t, err) } // test Remove @@ -163,14 +164,14 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str // TODO (niro): Test abs paths const contents = "exists" ctx := context.Background() - err := adapter.Put(ctx, block.ObjectPointer{ + _, err := adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, Identifier: contents, IdentifierType: block.IdentifierTypeRelative, }, int64(len(contents)), strings.NewReader(contents), block.PutOpts{}) require.NoError(t, err) - err = adapter.Put(ctx, block.ObjectPointer{ + _, err = adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, Identifier: "nested/and/" + contents, IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index 704e8922658..8695532d642 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -154,24 +154,24 @@ func (o *storageObjectHandle) newComposer(a *Adapter, srcs ...*storage.ObjectHan return c } -func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, _ block.PutOpts) error { +func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, _ block.PutOpts) (*block.PutResponse, error) { var err error defer reportMetrics("Put", time.Now(), &sizeBytes, &err) bucket, key, err := a.extractParamsFromObj(obj) if err != nil { - return err + return nil, err } h := storageObjectHandle{a.client.Bucket(bucket).Object(key)} w := h.withWriteHandle(a).newWriter(ctx, a) _, err = io.Copy(w, reader) if err != nil { - return fmt.Errorf("io.Copy: %w", err) + return nil, fmt.Errorf("io.Copy: %w", err) } err = w.Close() if err != nil { - return fmt.Errorf("writer.Close: %w", err) + return nil, fmt.Errorf("writer.Close: %w", err) } - return nil + return &block.PutResponse{}, nil } func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadCloser, error) { @@ -654,7 +654,7 @@ func (a *Adapter) BlockstoreType() string { return block.BlockstoreTypeGS } -func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) { +func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index 6c7427d433c..0b6fb31dd54 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -152,21 +152,24 @@ func (l *Adapter) Path() string { return l.path } -func (l *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, _ block.PutOpts) error { +func (l *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, _ block.PutOpts) (*block.PutResponse, error) { p, err := l.extractParamsFromObj(obj) if err != nil { - return err + return nil, err } p = filepath.Clean(p) f, err := l.maybeMkdir(p, os.Create) if err != nil { - return err + return nil, err } defer func() { _ = f.Close() }() _, err = io.Copy(f, reader) - return err + if err != nil { + return nil, err + } + return &block.PutResponse{}, nil } func (l *Adapter) Remove(_ context.Context, obj block.ObjectPointer) error { @@ -243,7 +246,7 @@ func (l *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj } md5Read := block.NewHashingReader(r, block.HashFunctionMD5) fName := uploadID + fmt.Sprintf("-%05d", partNumber) - err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) + _, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) if err != nil { return nil, fmt.Errorf("copy put: %w", err) } @@ -263,7 +266,7 @@ func (l *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinatio } md5Read := block.NewHashingReader(r, block.HashFunctionMD5) fName := uploadID + fmt.Sprintf("-%05d", partNumber) - err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) + _, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) if err != nil { return nil, fmt.Errorf("copy range put: %w", err) } @@ -395,7 +398,7 @@ func (l *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, _ int } md5Read := block.NewHashingReader(reader, block.HashFunctionMD5) fName := uploadID + fmt.Sprintf("-%05d", partNumber) - err := l.Put(ctx, block.ObjectPointer{StorageNamespace: obj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) + _, err := l.Put(ctx, block.ObjectPointer{StorageNamespace: obj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) etag := hex.EncodeToString(md5Read.Md5.Sum(nil)) return &block.UploadPartResponse{ ETag: etag, @@ -522,7 +525,7 @@ func (l *Adapter) BlockstoreType() string { return block.BlockstoreTypeLocal } -func (l *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) { +func (l *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/mem/adapter.go b/pkg/block/mem/adapter.go index 5782977839d..d394945bbf2 100644 --- a/pkg/block/mem/adapter.go +++ b/pkg/block/mem/adapter.go @@ -78,20 +78,20 @@ func getKey(obj block.ObjectPointer) string { return fmt.Sprintf("%s:%s", obj.StorageNamespace, obj.Identifier) } -func (a *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) error { +func (a *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) { if err := verifyObjectPointer(obj); err != nil { - return err + return nil, err } a.mutex.Lock() defer a.mutex.Unlock() data, err := io.ReadAll(reader) if err != nil { - return err + return nil, err } key := getKey(obj) a.data[key] = data a.properties[key] = block.Properties(opts) - return nil + return &block.PutResponse{}, nil } func (a *Adapter) Get(_ context.Context, obj block.ObjectPointer) (io.ReadCloser, error) { @@ -339,7 +339,7 @@ func (a *Adapter) BlockstoreType() string { return block.BlockstoreTypeMem } -func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) { +func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/metrics.go b/pkg/block/metrics.go index 2e2907cbe27..36c697f330e 100644 --- a/pkg/block/metrics.go +++ b/pkg/block/metrics.go @@ -22,7 +22,7 @@ func (m *MetricsAdapter) InnerAdapter() Adapter { return m.adapter } -func (m *MetricsAdapter) Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error { +func (m *MetricsAdapter) Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) (*PutResponse, error) { ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) return m.adapter.Put(ctx, obj, sizeBytes, reader, opts) } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index d9c12bbd24f..254d8064b28 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -13,6 +13,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware" v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" @@ -217,19 +218,31 @@ func (a *Adapter) log(ctx context.Context) logging.Logger { return logging.FromContext(ctx) } -func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { +func getServerTimeFromResponseMetadata(metadata middleware.Metadata) time.Time { + value, ok := awsmiddleware.GetServerTime(metadata) + if ok { + return value + } + + return time.Now() +} + +func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) { var err error defer reportMetrics("Put", time.Now(), &sizeBytes, &err) // for unknown size, we assume we like to stream content, will use s3manager to perform the request. // we assume the caller may not have 1:1 request to s3 put object in this case as it may perform multipart upload if sizeBytes == -1 { - return a.managerUpload(ctx, obj, reader, opts) + if err = a.managerUpload(ctx, obj, reader, opts); err != nil { + return nil, err + } + return &block.PutResponse{}, nil } bucket, key, _, err := a.extractParamsFromObj(obj) if err != nil { - return err + return nil, err } putObject := s3.PutObjectInput{ @@ -258,13 +271,14 @@ func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes in a.registerCaptureServerMiddleware(), ) if err != nil { - return err + return nil, err } etag := aws.ToString(resp.ETag) if etag == "" { - return ErrMissingETag + return nil, ErrMissingETag } - return nil + mtime := getServerTimeFromResponseMetadata(resp.ResultMetadata) + return &block.PutResponse{ModTime: &mtime}, nil } // retryMaxAttemptsByReader return s3 options function diff --git a/pkg/block/transient/adapter.go b/pkg/block/transient/adapter.go index 05d3dd18b6e..1d98dd1834b 100644 --- a/pkg/block/transient/adapter.go +++ b/pkg/block/transient/adapter.go @@ -24,9 +24,12 @@ func New(_ context.Context) *Adapter { return &Adapter{} } -func (a *Adapter) Put(_ context.Context, _ block.ObjectPointer, _ int64, reader io.Reader, _ block.PutOpts) error { +func (a *Adapter) Put(_ context.Context, _ block.ObjectPointer, _ int64, reader io.Reader, _ block.PutOpts) (*block.PutResponse, error) { _, err := io.Copy(io.Discard, reader) - return err + if err != nil { + return nil, err + } + return &block.PutResponse{}, nil } func (a *Adapter) Get(_ context.Context, _ block.ObjectPointer) (io.ReadCloser, error) { @@ -144,7 +147,7 @@ func (a *Adapter) BlockstoreType() string { return block.BlockstoreTypeTransient } -func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) { +func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/catalog/actions_output_writer.go b/pkg/catalog/actions_output_writer.go index 6a82f224b77..e9fc56f44d9 100644 --- a/pkg/catalog/actions_output_writer.go +++ b/pkg/catalog/actions_output_writer.go @@ -18,9 +18,10 @@ func NewActionsOutputWriter(blockAdapter block.Adapter) *ActionsOutputWriter { } func (o *ActionsOutputWriter) OutputWrite(ctx context.Context, storageNamespace, name string, reader io.Reader, size int64) error { - return o.adapter.Put(ctx, block.ObjectPointer{ + _, err := o.adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: name, }, size, reader, block.PutOpts{}) + return err } diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 0f08cce38e6..b0b6155db5b 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -2629,7 +2629,7 @@ func (c *Catalog) uploadFile(ctx context.Context, ns graveler.StorageNamespace, Identifier: identifier, IdentifierType: block.IdentifierTypeFull, } - err = c.BlockAdapter.Put(ctx, obj, size, fd, block.PutOpts{}) + _, err = c.BlockAdapter.Put(ctx, obj, size, fd, block.PutOpts{}) if err != nil { return "", err } diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index f54f601ead2..28c31ce7652 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -309,7 +309,7 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { // write metadata metadata := amzMetaAsMetadata(req) contentType := req.Header.Get("Content-Type") - err = o.finishUpload(req, nil, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType) + err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType) if errors.Is(err, graveler.ErrWriteToProtectedBranch) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch)) return diff --git a/pkg/graveler/retention/garbage_collection_manager.go b/pkg/graveler/retention/garbage_collection_manager.go index b61a6253c51..89ad027b4de 100644 --- a/pkg/graveler/retention/garbage_collection_manager.go +++ b/pkg/graveler/retention/garbage_collection_manager.go @@ -81,10 +81,11 @@ func (m *GarbageCollectionManager) SaveGarbageCollectionUncommitted(ctx context. location += "/" } location += filename - return m.blockAdapter.Put(ctx, block.ObjectPointer{ + _, err = m.blockAdapter.Put(ctx, block.ObjectPointer{ Identifier: location, IdentifierType: block.IdentifierTypeFull, }, stat.Size(), fd, block.PutOpts{}) + return err } type RepositoryCommitGetter interface { @@ -150,11 +151,12 @@ func (m *GarbageCollectionManager) SaveRules(ctx context.Context, storageNamespa if err != nil { return err } - return m.blockAdapter.Put(ctx, block.ObjectPointer{ + _, err = m.blockAdapter.Put(ctx, block.ObjectPointer{ StorageNamespace: string(storageNamespace), Identifier: fmt.Sprintf(configFileSuffixTemplate, m.committedBlockStoragePrefix), IdentifierType: block.IdentifierTypeRelative, }, int64(len(rulesBytes)), bytes.NewReader(rulesBytes), block.PutOpts{}) + return err } func (m *GarbageCollectionManager) SaveGarbageCollectionCommits(ctx context.Context, repository *graveler.RepositoryRecord, rules *graveler.GarbageCollectionRules) (string, error) { @@ -203,7 +205,7 @@ func (m *GarbageCollectionManager) SaveGarbageCollectionCommits(ctx context.Cont if err != nil { return "", err } - err = m.blockAdapter.Put(ctx, block.ObjectPointer{ + _, err = m.blockAdapter.Put(ctx, block.ObjectPointer{ Identifier: csvLocation, IdentifierType: block.IdentifierTypeFull, }, int64(len(commitsStr)), strings.NewReader(commitsStr), block.PutOpts{}) diff --git a/pkg/pyramid/tier_fs.go b/pkg/pyramid/tier_fs.go index 1e03b203e3c..3c01d94ba38 100644 --- a/pkg/pyramid/tier_fs.go +++ b/pkg/pyramid/tier_fs.go @@ -160,7 +160,7 @@ func (tfs *TierFS) store(ctx context.Context, namespace, originalPath, nsPath, f return fmt.Errorf("file stat %s: %w", originalPath, err) } - if err := tfs.adapter.Put(ctx, tfs.objPointer(namespace, filename), stat.Size(), f, block.PutOpts{}); err != nil { + if _, err = tfs.adapter.Put(ctx, tfs.objPointer(namespace, filename), stat.Size(), f, block.PutOpts{}); err != nil { return fmt.Errorf("adapter put %s %s: %w", namespace, filename, err) } diff --git a/pkg/samplerepo/samplecontent.go b/pkg/samplerepo/samplecontent.go index ad646bf14a0..b35a36cb21e 100644 --- a/pkg/samplerepo/samplecontent.go +++ b/pkg/samplerepo/samplecontent.go @@ -87,11 +87,10 @@ func PopulateSampleRepo(ctx context.Context, repo *catalog.Repository, cat *cata } // create metadata entry - writeTime := time.Now() entry := catalog.NewDBEntryBuilder(). Path(strings.TrimPrefix(contentPath, sampleRepoFSRootPath+"/")). PhysicalAddress(blob.PhysicalAddress). - CreationDate(writeTime). + CreationDate(blob.CreationDate). Size(blob.Size). Checksum(blob.Checksum). AddressType(catalog.AddressTypeRelative). diff --git a/pkg/testutil/adapter.go b/pkg/testutil/adapter.go index 8280ba43233..80923afa4c9 100644 --- a/pkg/testutil/adapter.go +++ b/pkg/testutil/adapter.go @@ -61,16 +61,16 @@ func (a *MockAdapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectP return "", block.ErrOperationNotSupported } -func (a *MockAdapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) error { +func (a *MockAdapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) { data, err := io.ReadAll(reader) if err != nil { - return err + return nil, err } a.TotalSize += int64(len(data)) a.Count++ a.LastBucket = obj.StorageNamespace a.LastStorageClass = opts.StorageClass - return nil + return &block.PutResponse{}, nil } func (a *MockAdapter) Exists(_ context.Context, _ block.ObjectPointer) (bool, error) { diff --git a/pkg/upload/write_blob.go b/pkg/upload/write_blob.go index 50fe9385531..db0b594e3b0 100644 --- a/pkg/upload/write_blob.go +++ b/pkg/upload/write_blob.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "io" + "time" "github.com/treeverse/lakefs/pkg/block" ) @@ -13,12 +14,13 @@ type Blob struct { RelativePath bool Checksum string Size int64 + CreationDate time.Time } func WriteBlob(ctx context.Context, adapter block.Adapter, bucketName, address string, body io.Reader, contentLength int64, opts block.PutOpts) (*Blob, error) { // handle the upload itself hashReader := block.NewHashingReader(body, block.HashFunctionMD5, block.HashFunctionSHA256) - err := adapter.Put(ctx, block.ObjectPointer{ + res, err := adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: bucketName, IdentifierType: block.IdentifierTypeRelative, Identifier: address, @@ -32,5 +34,6 @@ func WriteBlob(ctx context.Context, adapter block.Adapter, bucketName, address s RelativePath: true, Checksum: checksum, Size: hashReader.CopiedSize, + CreationDate: res.GetMtime(), }, nil }