Skip to content

Commit

Permalink
prevent unnecessary operations on write actions to read-only reposito…
Browse files Browse the repository at this point in the history
…ry in s3 gateway (#7844)

* fail fast on write actions to read-only repository in s3 gateway

add checks in put, delete and copy

add tests

make TestS3UploadToReadOnlyRepoError fail if the entire file was read

* simplify test
  • Loading branch information
yonipeleg33 authored Jun 17, 2024
1 parent 7559cd1 commit 2bbc93f
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 4 deletions.
73 changes: 73 additions & 0 deletions esti/s3_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/go-openapi/swag"
"io"
"math/rand"
"net/http"
Expand Down Expand Up @@ -54,6 +55,61 @@ func newMinioClient(t *testing.T, getCredentials GetCredentials) *minio.Client {
return client
}

func TestS3UploadToReadOnlyRepoError(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

readOnlyRepo := createReadOnlyRepositoryByName(ctx, t, "tests3uploadobjectdestreadonly")
defer deleteRepositoryIfAskedTo(ctx, readOnlyRepo)

minioClient := newMinioClient(t, credentials.NewStaticV4)
const tenMibi = 10 * 1024 * 1024
reader := NewZeroReader(tenMibi)

_, err := minioClient.PutObject(ctx, readOnlyRepo, gatewayTestPrefix+"/test", reader, tenMibi, minio.PutObjectOptions{
// this prevents minio from reading the entire file before sending the request
SendContentMd5: false,
})
require.NotNil(t, err)
require.Contains(t, err.Error(), "read-only")

// The read-only check should occur before we read the file.
// To ensure that, we're asserting that the file was not read entirely.
// (The minio client reads at least one chunk of the file before sending the request,
// so `NumBytesRead` is probably not 0, but must be < 10MB.)
require.Less(t, reader.NumBytesRead, tenMibi)
}

func TestS3DeleteFromReadOnlyRepoError(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

readOnlyRepo := createReadOnlyRepositoryByName(ctx, t, "tests3deleteobjectdestreadonly")
defer deleteRepositoryIfAskedTo(ctx, readOnlyRepo)

minioClient := newMinioClient(t, credentials.NewStaticV4)
content := "some random data"
contentReader := strings.NewReader(content)

path := gatewayTestPrefix + "test"
_, uploadErr := client.UploadObjectWithBodyWithResponse(ctx, readOnlyRepo, mainBranch, &apigen.UploadObjectParams{
Path: path,
Force: swag.Bool(true),
}, "application/octet-stream", contentReader)
require.Nil(t, uploadErr)

t.Run("existing object", func(t *testing.T) {
err := minioClient.RemoveObject(ctx, readOnlyRepo, path, minio.RemoveObjectOptions{})
require.NotNil(t, err)
require.Contains(t, err.Error(), "read-only")
})
t.Run("non existing object", func(t *testing.T) {
err := minioClient.RemoveObject(ctx, readOnlyRepo, path+"not-existing", minio.RemoveObjectOptions{})
require.NotNil(t, err)
require.Contains(t, err.Error(), "read-only")
})
}

func TestS3UploadAndDownload(t *testing.T) {
const parallelism = 10

Expand Down Expand Up @@ -574,6 +630,9 @@ func TestS3CopyObjectErrors(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

readOnlyRepo := createReadOnlyRepositoryByName(ctx, t, "tests3copyobjectdestreadonly")
defer deleteRepositoryIfAskedTo(ctx, readOnlyRepo)

requireBlockstoreType(t, block.BlockstoreTypeS3)
destPath := gatewayTestPrefix + "dest-file"

Expand Down Expand Up @@ -627,6 +686,20 @@ func TestS3CopyObjectErrors(t *testing.T) {
require.NotNil(t, err)
require.Contains(t, err.Error(), "NoSuchKey")
})

t.Run("readonly repo from non-existing source", func(t *testing.T) {
_, err := s3lakefsClient.CopyObject(ctx,
minio.CopyDestOptions{
Bucket: readOnlyRepo,
Object: destPath,
},
minio.CopySrcOptions{
Bucket: repo,
Object: "not-a-branch/data/not-found",
})
require.NotNil(t, err)
require.Contains(t, err.Error(), "read-only")
})
}

func TestS3ReadObjectRedirect(t *testing.T) {
Expand Down
12 changes: 10 additions & 2 deletions esti/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ func createRepositoryForTest(ctx context.Context, t testing.TB) string {
func createRepositoryByName(ctx context.Context, t testing.TB, name string) string {
storageNamespace := generateUniqueStorageNamespace(name)
name = makeRepositoryName(name)
createRepository(ctx, t, name, storageNamespace)
createRepository(ctx, t, name, storageNamespace, false)
return name
}

func createReadOnlyRepositoryByName(ctx context.Context, t testing.TB, name string) string {
storageNamespace := generateUniqueStorageNamespace(name)
name = makeRepositoryName(name)
createRepository(ctx, t, name, storageNamespace, true)
return name
}

Expand All @@ -114,7 +121,7 @@ func generateUniqueStorageNamespace(repoName string) string {
return ns + xid.New().String() + "/" + repoName
}

func createRepository(ctx context.Context, t testing.TB, name string, repoStorage string) {
func createRepository(ctx context.Context, t testing.TB, name string, repoStorage string, isReadOnly bool) {
logger.WithFields(logging.Fields{
"repository": name,
"storage_namespace": repoStorage,
Expand All @@ -124,6 +131,7 @@ func createRepository(ctx context.Context, t testing.TB, name string, repoStorag
DefaultBranch: apiutil.Ptr(mainBranch),
Name: name,
StorageNamespace: repoStorage,
ReadOnly: &isReadOnly,
})
require.NoErrorf(t, err, "failed to create repository '%s', storage '%s'", name, repoStorage)
require.NoErrorf(t, verifyResponse(resp.HTTPResponse, resp.Body),
Expand Down
31 changes: 31 additions & 0 deletions esti/zero_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package esti

import "io"

// ZeroReader reads only zeros into the provided buffer, until `Amount` is reached.
type ZeroReader struct {
Amount int
NumBytesRead int
}

func NewZeroReader(amount int) *ZeroReader {
return &ZeroReader{
Amount: amount,
NumBytesRead: 0,
}
}

func (zr *ZeroReader) Read(p []byte) (n int, err error) {
if zr.NumBytesRead >= zr.Amount {
return 0, io.EOF
}
n = len(p)
if zr.NumBytesRead+n > zr.Amount {
n = zr.Amount - zr.NumBytesRead
}
for i := 0; i < n; i++ {
p[i] = 0
}
zr.NumBytesRead += n
return n, nil
}
4 changes: 4 additions & 0 deletions pkg/gateway/operations/deleteobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (controller *DeleteObject) Handle(w http.ResponseWriter, req *http.Request,
if o.HandleUnsupported(w, req, "tagging", "acl", "torrent") {
return
}
if o.Repository.ReadOnly {
_ = o.EncodeError(w, req, nil, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrReadOnlyRepository))
return
}
query := req.URL.Query()
if query.Has(QueryParamUploadID) {
controller.HandleAbortMultipartUpload(w, req, o)
Expand Down
6 changes: 5 additions & 1 deletion pkg/gateway/operations/deleteobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ func (controller *DeleteObjects) Handle(w http.ResponseWriter, req *http.Request
_ = o.EncodeError(w, req, nil, gerrors.ERRLakeFSNotSupported.ToAPIErr())
return
}

o.Incr("delete_objects", o.Principal, o.Repository.Name, "")
if o.Repository.ReadOnly {
_ = o.EncodeError(w, req, nil, gerrors.Codes.ToAPIErr(gerrors.ErrReadOnlyRepository))
return
}

decodedXML := &serde.Delete{}
err := DecodeXMLBody(req.Body, decodedXML)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func (controller *PostObject) Handle(w http.ResponseWriter, req *http.Request, o
if o.HandleUnsupported(w, req, "select", "restore") {
return
}

if o.Repository.ReadOnly {
_ = o.EncodeError(w, req, nil, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrReadOnlyRepository))
return
}
// POST is only supported for CreateMultipartUpload/CompleteMultipartUpload
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
Expand Down
4 changes: 4 additions & 0 deletions pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (controller *PutObject) Handle(w http.ResponseWriter, req *http.Request, o
if o.HandleUnsupported(w, req, "torrent", "acl") {
return
}
if o.Repository.ReadOnly {
_ = o.EncodeError(w, req, nil, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrReadOnlyRepository))
return
}

// verify branch before we upload data - fail early
branchExists, err := o.Catalog.BranchExists(req.Context(), o.Repository.Name, o.Reference)
Expand Down

0 comments on commit 2bbc93f

Please sign in to comment.