Skip to content

Commit

Permalink
works
Browse files Browse the repository at this point in the history
  • Loading branch information
ItamarYuran committed Dec 16, 2024
1 parent 90e491f commit d92b29d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
7 changes: 6 additions & 1 deletion pkg/gateway/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ const (

// S3 extended errors.
ErrContentSHA256Mismatch
ErrObjectExists

// Lakefs errors
ERRLakeFSNotSupported
Expand Down Expand Up @@ -756,7 +757,11 @@ var Codes = errorCodeMap{
Description: "Invalid version found in the request",
HTTPStatusCode: http.StatusNotFound,
},

ErrObjectExists: {
Code: "ErrObjectExists",
Description: "Object path already exists in DB",
HTTPStatusCode: http.StatusNotModified,
},
// LakeFS errors
ERRLakeFSNotSupported: {
Code: "ERRLakeFSNotSupported",
Expand Down
31 changes: 30 additions & 1 deletion pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operations

import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
Expand All @@ -20,6 +21,7 @@ import (
)

const (
IfNoneMatchHeader = "If-None-Match"
CopySourceHeader = "x-amz-copy-source"
CopySourceRangeHeader = "x-amz-copy-source-range"
QueryParamUploadID = "uploadId"
Expand All @@ -30,7 +32,8 @@ type PutObject struct{}

func (controller *PutObject) RequiredPermissions(req *http.Request, repoID, _, destPath string) (permissions.Node, error) {
copySource := req.Header.Get(CopySourceHeader)

noneMatch := req.Header.Get(CopySourceHeader)
fmt.Println(copySource, noneMatch)
if len(copySource) == 0 {
return permissions.Node{
Permission: permissions.Permission{
Expand Down Expand Up @@ -298,6 +301,11 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
o.Incr("put_object", o.Principal, o.Repository.Name, o.Reference)
storageClass := StorageClassFromHeader(req.Header)
opts := block.PutOpts{StorageClass: storageClass}
err := o.checkIfAbsent(req)
if err != nil {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrObjectExists))
return
}
address := o.PathProvider.NewPath()
blob, err := upload.WriteBlob(req.Context(), o.BlockStore, o.Repository.StorageNamespace, address, req.Body, req.ContentLength, opts)
if err != nil {
Expand Down Expand Up @@ -325,3 +333,24 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
o.SetHeader(w, "ETag", httputil.ETag(blob.Checksum))
w.WriteHeader(http.StatusOK)
}

func (o *PathOperation) checkIfAbsent(req *http.Request) error {
Header := req.Header.Get(IfNoneMatchHeader)
fmt.Println("o.path", o.Path)
fmt.Println("Header ", Header)
switch Header {
case "":
return nil
case "*":
_, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{})
if err == nil {
return errors.New("path exists in storage")
}
default:
_, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, Header, catalog.GetEntryParams{})
if err == nil {
return errors.New("path exists in storage")
}
}
return nil
}

0 comments on commit d92b29d

Please sign in to comment.