Skip to content

Commit

Permalink
Fix: Get mtime from storage server (#8329)
Browse files Browse the repository at this point in the history
* Fix: Get mtime from storage server

* CR Fixes
  • Loading branch information
N-o-Z authored Oct 31, 2024
1 parent 79b2f4d commit d4f0c19
Show file tree
Hide file tree
Showing 19 changed files with 103 additions and 64 deletions.
11 changes: 5 additions & 6 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac
return err
}

if err := c.BlockAdapter.Put(ctx, obj, objLen, strings.NewReader(dummyData), block.PutOpts{}); err != nil {
if _, err := c.BlockAdapter.Put(ctx, obj, objLen, strings.NewReader(dummyData), block.PutOpts{}); err != nil {
return err
}

Expand Down Expand Up @@ -3227,11 +3227,10 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
}
}
// write metadata
writeTime := time.Now()
entryBuilder := catalog.NewDBEntryBuilder().
Path(params.Path).
PhysicalAddress(blob.PhysicalAddress).
CreationDate(writeTime).
CreationDate(blob.CreationDate).
Size(blob.Size).
Checksum(blob.Checksum).
ContentType(contentType)
Expand Down Expand Up @@ -3268,7 +3267,7 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi

response := apigen.ObjectStats{
Checksum: blob.Checksum,
Mtime: writeTime.Unix(),
Mtime: blob.CreationDate.Unix(),
Path: params.Path,
PathType: entryTypeObject,
PhysicalAddress: qk.Format(),
Expand Down Expand Up @@ -3825,7 +3824,7 @@ func (c *Controller) DumpRefs(w http.ResponseWriter, r *http.Request, repository
writeError(w, r, http.StatusInternalServerError, err)
return
}
err = c.BlockAdapter.Put(ctx, block.ObjectPointer{
_, err = c.BlockAdapter.Put(ctx, block.ObjectPointer{
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: fmt.Sprintf("%s/refs_manifest.json", c.Config.Committed.BlockStoragePrefix),
Expand Down Expand Up @@ -4165,7 +4164,7 @@ func (c *Controller) CreateSymlinkFile(w http.ResponseWriter, r *http.Request, r
func writeSymlink(ctx context.Context, repo *catalog.Repository, branch, path string, addresses []string, adapter block.Adapter) error {
address := fmt.Sprintf("%s/%s/%s/%s/symlink.txt", lakeFSPrefix, repo.Name, branch, path)
data := strings.Join(addresses, "\n")
err := adapter.Put(ctx, block.ObjectPointer{
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageNamespace: repo.StorageNamespace,
IdentifierType: block.IdentifierTypeRelative,
Identifier: address,
Expand Down
13 changes: 12 additions & 1 deletion pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,22 @@ type BlockstoreMetadata struct {
Region *string
}

type PutResponse struct {
ModTime *time.Time
}

func (r *PutResponse) GetMtime() time.Time {
if r != nil && r.ModTime != nil {
return *r.ModTime
}
return time.Now()
}

// Adapter abstract Storage Adapter for persistence of version controlled data. The methods generally map to S3 API methods
// - Generally some type of Object Storage
// - Can also be block storage or even in-memory
type Adapter interface {
Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error
Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) (*PutResponse, error)
Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error)

// GetWalker is never called on the server side.
Expand Down
15 changes: 9 additions & 6 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,23 @@ func (a *Adapter) log(ctx context.Context) logging.Logger {
return logging.FromContext(ctx)
}

func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error {
func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) {
var err error
defer reportMetrics("Put", time.Now(), &sizeBytes, &err)
qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
return err
return nil, err
}
o := a.translatePutOpts(ctx, opts)
containerClient, err := a.clientCache.NewContainerClient(qualifiedKey.StorageAccountName, qualifiedKey.ContainerName)
if err != nil {
return err
return nil, err
}
_, err = containerClient.NewBlockBlobClient(qualifiedKey.BlobURL).UploadStream(ctx, reader, &o)
return err
res, err := containerClient.NewBlockBlobClient(qualifiedKey.BlobURL).UploadStream(ctx, reader, &o)
if err != nil {
return nil, err
}
return &block.PutResponse{ModTime: res.LastModified}, nil
}

func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadCloser, error) {
Expand Down Expand Up @@ -583,7 +586,7 @@ func (a *Adapter) BlockstoreType() string {
return block.BlockstoreTypeAzure
}

func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) {
func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) {
return nil, block.ErrOperationNotSupported
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/block/blocktest/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s
ctx := context.Background()
part1 := "this is the first part "
part2 := "this is the last part"
err := adapter.Put(ctx, block.ObjectPointer{
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: "test_file",
IdentifierType: block.IdentifierTypeRelative,
Expand Down Expand Up @@ -88,7 +88,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str

for i := 0; i < filesAndFolders; i++ {
for j := 0; j < filesAndFolders; j++ {
err := adapter.Put(ctx, block.ObjectPointer{
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: fmt.Sprintf("%s/folder_%d/test_file_%d", testPrefix, filesAndFolders-i-1, filesAndFolders-j-1),
IdentifierType: block.IdentifierTypeRelative,
Expand All @@ -97,7 +97,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str
}
}

err := adapter.Put(ctx, block.ObjectPointer{
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: fmt.Sprintf("%s/folder_0.txt", testPrefix),
IdentifierType: block.IdentifierTypeRelative,
Expand Down
13 changes: 7 additions & 6 deletions pkg/block/blocktest/basic_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func testAdapterPutGet(t *testing.T, adapter block.Adapter, storageNamespace, ex
IdentifierType: c.identifierType,
}

err := adapter.Put(ctx, obj, size, strings.NewReader(contents), block.PutOpts{})
_, err := adapter.Put(ctx, obj, size, strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

reader, err := adapter.Get(ctx, obj)
Expand Down Expand Up @@ -73,8 +73,8 @@ func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace strin
Identifier: "export/to/dst",
IdentifierType: block.IdentifierTypeRelative,
}

require.NoError(t, adapter.Put(ctx, src, int64(len(contents)), strings.NewReader(contents), block.PutOpts{}))
_, err := adapter.Put(ctx, src, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

require.NoError(t, adapter.Copy(ctx, src, dst))
reader, err := adapter.Get(ctx, dst)
Expand Down Expand Up @@ -134,7 +134,8 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str
Identifier: tt.name + "/" + p,
IdentifierType: block.IdentifierTypeRelative,
}
require.NoError(t, adapter.Put(ctx, obj, int64(len(content)), strings.NewReader(content), block.PutOpts{}))
_, err := adapter.Put(ctx, obj, int64(len(content)), strings.NewReader(content), block.PutOpts{})
require.NoError(t, err)
}

// test Remove
Expand Down Expand Up @@ -163,14 +164,14 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str
// TODO (niro): Test abs paths
const contents = "exists"
ctx := context.Background()
err := adapter.Put(ctx, block.ObjectPointer{
_, err := adapter.Put(ctx, block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: contents,
IdentifierType: block.IdentifierTypeRelative,
}, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

err = adapter.Put(ctx, block.ObjectPointer{
_, err = adapter.Put(ctx, block.ObjectPointer{
StorageNamespace: storageNamespace,
Identifier: "nested/and/" + contents,
IdentifierType: block.IdentifierTypeRelative,
Expand Down
12 changes: 6 additions & 6 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,24 @@ func (o *storageObjectHandle) newComposer(a *Adapter, srcs ...*storage.ObjectHan
return c
}

func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, _ block.PutOpts) error {
func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, _ block.PutOpts) (*block.PutResponse, error) {
var err error
defer reportMetrics("Put", time.Now(), &sizeBytes, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return err
return nil, err
}
h := storageObjectHandle{a.client.Bucket(bucket).Object(key)}
w := h.withWriteHandle(a).newWriter(ctx, a)
_, err = io.Copy(w, reader)
if err != nil {
return fmt.Errorf("io.Copy: %w", err)
return nil, fmt.Errorf("io.Copy: %w", err)
}
err = w.Close()
if err != nil {
return fmt.Errorf("writer.Close: %w", err)
return nil, fmt.Errorf("writer.Close: %w", err)
}
return nil
return &block.PutResponse{}, nil
}

func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadCloser, error) {
Expand Down Expand Up @@ -654,7 +654,7 @@ func (a *Adapter) BlockstoreType() string {
return block.BlockstoreTypeGS
}

func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) {
func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) {
return nil, block.ErrOperationNotSupported
}

Expand Down
19 changes: 11 additions & 8 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,24 @@ func (l *Adapter) Path() string {
return l.path
}

func (l *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, _ block.PutOpts) error {
func (l *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, _ block.PutOpts) (*block.PutResponse, error) {
p, err := l.extractParamsFromObj(obj)
if err != nil {
return err
return nil, err
}
p = filepath.Clean(p)
f, err := l.maybeMkdir(p, os.Create)
if err != nil {
return err
return nil, err
}
defer func() {
_ = f.Close()
}()
_, err = io.Copy(f, reader)
return err
if err != nil {
return nil, err
}
return &block.PutResponse{}, nil
}

func (l *Adapter) Remove(_ context.Context, obj block.ObjectPointer) error {
Expand Down Expand Up @@ -243,7 +246,7 @@ func (l *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj
}
md5Read := block.NewHashingReader(r, block.HashFunctionMD5)
fName := uploadID + fmt.Sprintf("-%05d", partNumber)
err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
_, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
if err != nil {
return nil, fmt.Errorf("copy put: %w", err)
}
Expand All @@ -263,7 +266,7 @@ func (l *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinatio
}
md5Read := block.NewHashingReader(r, block.HashFunctionMD5)
fName := uploadID + fmt.Sprintf("-%05d", partNumber)
err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
_, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
if err != nil {
return nil, fmt.Errorf("copy range put: %w", err)
}
Expand Down Expand Up @@ -395,7 +398,7 @@ func (l *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, _ int
}
md5Read := block.NewHashingReader(reader, block.HashFunctionMD5)
fName := uploadID + fmt.Sprintf("-%05d", partNumber)
err := l.Put(ctx, block.ObjectPointer{StorageNamespace: obj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
_, err := l.Put(ctx, block.ObjectPointer{StorageNamespace: obj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{})
etag := hex.EncodeToString(md5Read.Md5.Sum(nil))
return &block.UploadPartResponse{
ETag: etag,
Expand Down Expand Up @@ -522,7 +525,7 @@ func (l *Adapter) BlockstoreType() string {
return block.BlockstoreTypeLocal
}

func (l *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) {
func (l *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) {
return nil, block.ErrOperationNotSupported
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ func getKey(obj block.ObjectPointer) string {
return fmt.Sprintf("%s:%s", obj.StorageNamespace, obj.Identifier)
}

func (a *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) error {
func (a *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) {
if err := verifyObjectPointer(obj); err != nil {
return err
return nil, err
}
a.mutex.Lock()
defer a.mutex.Unlock()
data, err := io.ReadAll(reader)
if err != nil {
return err
return nil, err
}
key := getKey(obj)
a.data[key] = data
a.properties[key] = block.Properties(opts)
return nil
return &block.PutResponse{}, nil
}

func (a *Adapter) Get(_ context.Context, obj block.ObjectPointer) (io.ReadCloser, error) {
Expand Down Expand Up @@ -339,7 +339,7 @@ func (a *Adapter) BlockstoreType() string {
return block.BlockstoreTypeMem
}

func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMetadata, error) {
func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetadata, error) {
return nil, block.ErrOperationNotSupported
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (m *MetricsAdapter) InnerAdapter() Adapter {
return m.adapter
}

func (m *MetricsAdapter) Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) error {
func (m *MetricsAdapter) Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) (*PutResponse, error) {
ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType())
return m.adapter.Put(ctx, obj, sizeBytes, reader, opts)
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
Expand Down Expand Up @@ -217,19 +218,31 @@ func (a *Adapter) log(ctx context.Context) logging.Logger {
return logging.FromContext(ctx)
}

func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error {
func getServerTimeFromResponseMetadata(metadata middleware.Metadata) time.Time {
value, ok := awsmiddleware.GetServerTime(metadata)
if ok {
return value
}

return time.Now()
}

func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) {
var err error
defer reportMetrics("Put", time.Now(), &sizeBytes, &err)

// for unknown size, we assume we like to stream content, will use s3manager to perform the request.
// we assume the caller may not have 1:1 request to s3 put object in this case as it may perform multipart upload
if sizeBytes == -1 {
return a.managerUpload(ctx, obj, reader, opts)
if err = a.managerUpload(ctx, obj, reader, opts); err != nil {
return nil, err
}
return &block.PutResponse{}, nil
}

bucket, key, _, err := a.extractParamsFromObj(obj)
if err != nil {
return err
return nil, err
}

putObject := s3.PutObjectInput{
Expand Down Expand Up @@ -258,13 +271,14 @@ func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes in
a.registerCaptureServerMiddleware(),
)
if err != nil {
return err
return nil, err
}
etag := aws.ToString(resp.ETag)
if etag == "" {
return ErrMissingETag
return nil, ErrMissingETag
}
return nil
mtime := getServerTimeFromResponseMetadata(resp.ResultMetadata)
return &block.PutResponse{ModTime: &mtime}, nil
}

// retryMaxAttemptsByReader return s3 options function
Expand Down
Loading

0 comments on commit d4f0c19

Please sign in to comment.