From 2bbc93f3d7dac539e911da521aeebcb799c7bb08 Mon Sep 17 00:00:00 2001 From: yonipeleg33 <51454184+yonipeleg33@users.noreply.github.com> Date: Mon, 17 Jun 2024 18:19:47 +0300 Subject: [PATCH] prevent unnecessary operations on write actions to read-only repository in s3 gateway (#7844) * fail fast on write actions to read-only repository in s3 gateway add checks in put, delete and copy add tests make TestS3UploadToReadOnlyRepoError fail if the entire file was read * simplify test --- esti/s3_gateway_test.go | 73 +++++++++++++++++++++++++ esti/system_test.go | 12 +++- esti/zero_reader.go | 31 +++++++++++ pkg/gateway/operations/deleteobject.go | 4 ++ pkg/gateway/operations/deleteobjects.go | 6 +- pkg/gateway/operations/postobject.go | 5 +- pkg/gateway/operations/putobject.go | 4 ++ 7 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 esti/zero_reader.go diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 1a150c1b12d..1941b7417f0 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/go-openapi/swag" "io" "math/rand" "net/http" @@ -54,6 +55,61 @@ func newMinioClient(t *testing.T, getCredentials GetCredentials) *minio.Client { return client } +func TestS3UploadToReadOnlyRepoError(t *testing.T) { + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + + readOnlyRepo := createReadOnlyRepositoryByName(ctx, t, "tests3uploadobjectdestreadonly") + defer deleteRepositoryIfAskedTo(ctx, readOnlyRepo) + + minioClient := newMinioClient(t, credentials.NewStaticV4) + const tenMibi = 10 * 1024 * 1024 + reader := NewZeroReader(tenMibi) + + _, err := minioClient.PutObject(ctx, readOnlyRepo, gatewayTestPrefix+"/test", reader, tenMibi, minio.PutObjectOptions{ + // this prevents minio from reading the entire file before sending the request + SendContentMd5: false, + }) + require.NotNil(t, err) + require.Contains(t, err.Error(), "read-only") + + // The read-only check should occur before we read the file. + // To ensure that, we're asserting that the file was not read entirely. + // (The minio client reads at least one chunk of the file before sending the request, + // so `NumBytesRead` is probably not 0, but must be < 10MB.) + require.Less(t, reader.NumBytesRead, tenMibi) +} + +func TestS3DeleteFromReadOnlyRepoError(t *testing.T) { + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + + readOnlyRepo := createReadOnlyRepositoryByName(ctx, t, "tests3deleteobjectdestreadonly") + defer deleteRepositoryIfAskedTo(ctx, readOnlyRepo) + + minioClient := newMinioClient(t, credentials.NewStaticV4) + content := "some random data" + contentReader := strings.NewReader(content) + + path := gatewayTestPrefix + "test" + _, uploadErr := client.UploadObjectWithBodyWithResponse(ctx, readOnlyRepo, mainBranch, &apigen.UploadObjectParams{ + Path: path, + Force: swag.Bool(true), + }, "application/octet-stream", contentReader) + require.Nil(t, uploadErr) + + t.Run("existing object", func(t *testing.T) { + err := minioClient.RemoveObject(ctx, readOnlyRepo, path, minio.RemoveObjectOptions{}) + require.NotNil(t, err) + require.Contains(t, err.Error(), "read-only") + }) + t.Run("non existing object", func(t *testing.T) { + err := minioClient.RemoveObject(ctx, readOnlyRepo, path+"not-existing", minio.RemoveObjectOptions{}) + require.NotNil(t, err) + require.Contains(t, err.Error(), "read-only") + }) +} + func TestS3UploadAndDownload(t *testing.T) { const parallelism = 10 @@ -574,6 +630,9 @@ func TestS3CopyObjectErrors(t *testing.T) { ctx, _, repo := setupTest(t) defer tearDownTest(repo) + readOnlyRepo := createReadOnlyRepositoryByName(ctx, t, "tests3copyobjectdestreadonly") + defer deleteRepositoryIfAskedTo(ctx, readOnlyRepo) + requireBlockstoreType(t, block.BlockstoreTypeS3) destPath := gatewayTestPrefix + "dest-file" @@ -627,6 +686,20 @@ func TestS3CopyObjectErrors(t *testing.T) { require.NotNil(t, err) require.Contains(t, err.Error(), "NoSuchKey") }) + + t.Run("readonly repo from non-existing source", func(t *testing.T) { + _, err := s3lakefsClient.CopyObject(ctx, + minio.CopyDestOptions{ + Bucket: readOnlyRepo, + Object: destPath, + }, + minio.CopySrcOptions{ + Bucket: repo, + Object: "not-a-branch/data/not-found", + }) + require.NotNil(t, err) + require.Contains(t, err.Error(), "read-only") + }) } func TestS3ReadObjectRedirect(t *testing.T) { diff --git a/esti/system_test.go b/esti/system_test.go index 2d64255ab7d..1233460d833 100644 --- a/esti/system_test.go +++ b/esti/system_test.go @@ -93,7 +93,14 @@ func createRepositoryForTest(ctx context.Context, t testing.TB) string { func createRepositoryByName(ctx context.Context, t testing.TB, name string) string { storageNamespace := generateUniqueStorageNamespace(name) name = makeRepositoryName(name) - createRepository(ctx, t, name, storageNamespace) + createRepository(ctx, t, name, storageNamespace, false) + return name +} + +func createReadOnlyRepositoryByName(ctx context.Context, t testing.TB, name string) string { + storageNamespace := generateUniqueStorageNamespace(name) + name = makeRepositoryName(name) + createRepository(ctx, t, name, storageNamespace, true) return name } @@ -114,7 +121,7 @@ func generateUniqueStorageNamespace(repoName string) string { return ns + xid.New().String() + "/" + repoName } -func createRepository(ctx context.Context, t testing.TB, name string, repoStorage string) { +func createRepository(ctx context.Context, t testing.TB, name string, repoStorage string, isReadOnly bool) { logger.WithFields(logging.Fields{ "repository": name, "storage_namespace": repoStorage, @@ -124,6 +131,7 @@ func createRepository(ctx context.Context, t testing.TB, name string, repoStorag DefaultBranch: apiutil.Ptr(mainBranch), Name: name, StorageNamespace: repoStorage, + ReadOnly: &isReadOnly, }) require.NoErrorf(t, err, "failed to create repository '%s', storage '%s'", name, repoStorage) require.NoErrorf(t, verifyResponse(resp.HTTPResponse, resp.Body), diff --git a/esti/zero_reader.go b/esti/zero_reader.go new file mode 100644 index 00000000000..11dea85145e --- /dev/null +++ b/esti/zero_reader.go @@ -0,0 +1,31 @@ +package esti + +import "io" + +// ZeroReader reads only zeros into the provided buffer, until `Amount` is reached. +type ZeroReader struct { + Amount int + NumBytesRead int +} + +func NewZeroReader(amount int) *ZeroReader { + return &ZeroReader{ + Amount: amount, + NumBytesRead: 0, + } +} + +func (zr *ZeroReader) Read(p []byte) (n int, err error) { + if zr.NumBytesRead >= zr.Amount { + return 0, io.EOF + } + n = len(p) + if zr.NumBytesRead+n > zr.Amount { + n = zr.Amount - zr.NumBytesRead + } + for i := 0; i < n; i++ { + p[i] = 0 + } + zr.NumBytesRead += n + return n, nil +} diff --git a/pkg/gateway/operations/deleteobject.go b/pkg/gateway/operations/deleteobject.go index 095e5ca3721..c76a1519fc8 100644 --- a/pkg/gateway/operations/deleteobject.go +++ b/pkg/gateway/operations/deleteobject.go @@ -63,6 +63,10 @@ func (controller *DeleteObject) Handle(w http.ResponseWriter, req *http.Request, if o.HandleUnsupported(w, req, "tagging", "acl", "torrent") { return } + if o.Repository.ReadOnly { + _ = o.EncodeError(w, req, nil, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrReadOnlyRepository)) + return + } query := req.URL.Query() if query.Has(QueryParamUploadID) { controller.HandleAbortMultipartUpload(w, req, o) diff --git a/pkg/gateway/operations/deleteobjects.go b/pkg/gateway/operations/deleteobjects.go index fb9e20ba2f1..6a709de75c4 100644 --- a/pkg/gateway/operations/deleteobjects.go +++ b/pkg/gateway/operations/deleteobjects.go @@ -33,8 +33,12 @@ func (controller *DeleteObjects) Handle(w http.ResponseWriter, req *http.Request _ = o.EncodeError(w, req, nil, gerrors.ERRLakeFSNotSupported.ToAPIErr()) return } - o.Incr("delete_objects", o.Principal, o.Repository.Name, "") + if o.Repository.ReadOnly { + _ = o.EncodeError(w, req, nil, gerrors.Codes.ToAPIErr(gerrors.ErrReadOnlyRepository)) + return + } + decodedXML := &serde.Delete{} err := DecodeXMLBody(req.Body, decodedXML) if err != nil { diff --git a/pkg/gateway/operations/postobject.go b/pkg/gateway/operations/postobject.go index 4da68fcaac2..3cdda78258a 100644 --- a/pkg/gateway/operations/postobject.go +++ b/pkg/gateway/operations/postobject.go @@ -170,7 +170,10 @@ func (controller *PostObject) Handle(w http.ResponseWriter, req *http.Request, o if o.HandleUnsupported(w, req, "select", "restore") { return } - + if o.Repository.ReadOnly { + _ = o.EncodeError(w, req, nil, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrReadOnlyRepository)) + return + } // POST is only supported for CreateMultipartUpload/CompleteMultipartUpload // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index 9fede072d69..06f87d0c80d 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -241,6 +241,10 @@ func (controller *PutObject) Handle(w http.ResponseWriter, req *http.Request, o if o.HandleUnsupported(w, req, "torrent", "acl") { return } + if o.Repository.ReadOnly { + _ = o.EncodeError(w, req, nil, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrReadOnlyRepository)) + return + } // verify branch before we upload data - fail early branchExists, err := o.Catalog.BranchExists(req.Context(), o.Repository.Name, o.Reference)