diff --git a/.github/workflows/esti.yaml b/.github/workflows/esti.yaml index 8f02997da8d..c965900b0d2 100644 --- a/.github/workflows/esti.yaml +++ b/.github/workflows/esti.yaml @@ -956,6 +956,7 @@ jobs: LAKEFS_BLOCKSTORE_TYPE: azure ESTI_BLOCKSTORE_TYPE: azure ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }} + ESTI_LARGE_OBJECT_PATH: https://esti4hns.blob.core.windows.net/esti-system-testing-data/copy-test-data/data.6mib ESTI_AZURE_STORAGE_ACCOUNT: esti4hns ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }} diff --git a/clients/python-wrapper/lakefs/import_manager.py b/clients/python-wrapper/lakefs/import_manager.py index 540ec0be347..aa1dd115190 100644 --- a/clients/python-wrapper/lakefs/import_manager.py +++ b/clients/python-wrapper/lakefs/import_manager.py @@ -122,6 +122,7 @@ def start(self) -> str: async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.ImportStatus: while True: + await asyncio.sleep(poll_interval.total_seconds()) with api_exception_handler(): resp = self._client.sdk_client.import_api.import_status(repository=self._repo_id, branch=self._branch_id, @@ -131,8 +132,6 @@ async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.Imp if resp.error is not None: raise ImportManagerException(f"Import Error: {resp.error.message}") - await asyncio.sleep(poll_interval.total_seconds()) - def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> ImportStatus: """ Poll a started import task ID, blocking until completion diff --git a/cmd/lakectl/cmd/fs_presign.go b/cmd/lakectl/cmd/fs_presign.go new file mode 100644 index 00000000000..4283464cfcd --- /dev/null +++ b/cmd/lakectl/cmd/fs_presign.go @@ -0,0 +1,41 @@ +package cmd + +import ( + "fmt" + "net/http" + + "github.com/go-openapi/swag" + "github.com/spf13/cobra" + "github.com/treeverse/lakefs/pkg/api/apigen" +) + +var fsPresignCmd = &cobra.Command{ + Use: "presign ", + Short: "return a pre-signed URL for reading the specified object", + Args: cobra.ExactArgs(1), + ValidArgsFunction: ValidArgsRepository, + Run: func(cmd *cobra.Command, args []string) { + pathURI := MustParsePathURI("path URI", args[0]) + client := getClient() + preSignMode := getServerPreSignMode(cmd.Context(), client) + if !preSignMode.Enabled { + Die("Pre-signed URL support is currently disabled for this lakeFS server", 1) + } + + resp, err := client.StatObjectWithResponse(cmd.Context(), pathURI.Repository, pathURI.Ref, &apigen.StatObjectParams{ + Path: *pathURI.Path, + Presign: swag.Bool(preSignMode.Enabled), + UserMetadata: swag.Bool(true), + }) + DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) + if resp.JSON200 == nil { + Die("Bad response from server", 1) + } + fmt.Printf("%s\n", resp.JSON200.PhysicalAddress) + }, +} + +//nolint:gochecknoinits +func init() { + fsCmd.AddCommand(fsPresignCmd) +} diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 0608801f814..7520af43761 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -166,30 +166,31 @@ type PresignMode struct { Multipart bool } +func getServerPreSignMode(ctx context.Context, client *apigen.ClientWithResponses) PresignMode { + resp, err := client.GetConfigWithResponse(ctx) + DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) + if resp.JSON200 == nil { + Die("Bad response from server", 1) + } + storageConfig := resp.JSON200.StorageConfig + return PresignMode{ + Enabled: storageConfig.PreSignSupport, + Multipart: swag.BoolValue(storageConfig.PreSignMultipartUpload), + } +} + func getPresignMode(cmd *cobra.Command, client *apigen.ClientWithResponses) PresignMode { // use flags if set presignFlag := cmd.Flags().Lookup(presignFlagName) - var presignMode PresignMode if presignFlag.Changed { presignMode.Enabled = Must(cmd.Flags().GetBool(presignFlagName)) } - // fetch server config if needed // if presign flag is not set, use server config // if presign flag is set, check if server supports multipart upload - var storageConfig *apigen.StorageConfig if !presignFlag.Changed || presignMode.Enabled { - resp, err := client.GetConfigWithResponse(cmd.Context()) - DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) - if resp.JSON200 == nil { - Die("Bad response from server", 1) - } - storageConfig = resp.JSON200.StorageConfig - if !presignFlag.Changed { - presignMode.Enabled = storageConfig.PreSignSupport - } - presignMode.Multipart = swag.BoolValue(storageConfig.PreSignMultipartUpload) + presignMode = getServerPreSignMode(cmd.Context(), client) } return presignMode } diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 484252adc31..9b4a4d5d25b 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -2201,6 +2201,23 @@ lakectl fs ls [flags] +### lakectl fs presign + +return a pre-signed URL for reading the specified object + +``` +lakectl fs presign [flags] +``` + +#### Options +{:.no_toc} + +``` + -h, --help help for presign +``` + + + ### lakectl fs rm Delete object diff --git a/docs/reference/s3.md b/docs/reference/s3.md index 6370c2c8e2e..b6ca813b18f 100644 --- a/docs/reference/s3.md +++ b/docs/reference/s3.md @@ -39,7 +39,7 @@ lakeFS supports the following API operations: 1. [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html){:target="_blank"} 1. [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html){:target="_blank"} 1. [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html){:target="_blank"} - 1. [ListParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html){:target="_blank"} + 1. [~~ListParts~~](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html){:target="_blank"} **Currently unsupported.** [Link to tracked issue](https://github.com/treeverse/lakeFS/issues/7600){:target="_blank"} 1. [Upload Part](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html){:target="_blank"} 1. [UploadPartCopy](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html){:target="_blank"} diff --git a/esti/golden/lakectl_fs_presign.golden b/esti/golden/lakectl_fs_presign.golden new file mode 100644 index 00000000000..e3acea5fe56 --- /dev/null +++ b/esti/golden/lakectl_fs_presign.golden @@ -0,0 +1 @@ + diff --git a/esti/lakectl_test.go b/esti/lakectl_test.go index ece5cd72cd0..1a4592134b2 100644 --- a/esti/lakectl_test.go +++ b/esti/lakectl_test.go @@ -602,6 +602,37 @@ func getStorageConfig(t *testing.T) *apigen.StorageConfig { return storageResp.JSON200 } +func TestLakectlFsPresign(t *testing.T) { + config := getStorageConfig(t) + if !config.PreSignSupport { + t.Skip() + } + repoName := generateUniqueRepositoryName() + storage := generateUniqueStorageNamespace(repoName) + vars := map[string]string{ + "REPO": repoName, + "STORAGE": storage, + "BRANCH": mainBranch, + } + RunCmdAndVerifySuccessWithFile(t, Lakectl()+" repo create lakefs://"+repoName+" "+storage, false, "lakectl_repo_create", vars) + + // upload some data + const totalObjects = 2 + for i := 0; i < totalObjects; i++ { + vars["FILE_PATH"] = fmt.Sprintf("data/ro/ro_1k.%d", i) + RunCmdAndVerifySuccessWithFile(t, Lakectl()+" fs upload -s files/ro_1k lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"], false, "lakectl_fs_upload", vars) + } + + goldenFile := "lakectl_fs_presign" + RunCmdAndVerifySuccessWithFile(t, Lakectl()+" fs presign lakefs://"+repoName+"/"+mainBranch+"/data/ro/ro_1k.0", false, goldenFile, map[string]string{ + "REPO": repoName, + "STORAGE": storage, + "BRANCH": mainBranch, + "PATH": "data/ro", + "FILE": "ro_1k.0", + }) +} + func TestLakectlFsStat(t *testing.T) { repoName := generateUniqueRepositoryName() storage := generateUniqueStorageNamespace(repoName) diff --git a/esti/ops/docker-compose-external-db.yaml b/esti/ops/docker-compose-external-db.yaml index 62dddeb0639..7142df73e1b 100644 --- a/esti/ops/docker-compose-external-db.yaml +++ b/esti/ops/docker-compose-external-db.yaml @@ -50,6 +50,7 @@ services: - ESTI_LAKECTL_DIR=/app - ESTI_GOTEST_FLAGS - ESTI_FLAGS + - ESTI_LARGE_OBJECT_PATH - ESTI_FORCE_PATH_STYLE - ESTI_AZURE_STORAGE_ACCOUNT - ESTI_AZURE_STORAGE_ACCESS_KEY diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index c7031407822..db1ab98d475 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -2,6 +2,8 @@ package esti import ( "bytes" + "context" + "fmt" "io" "math/rand" "net/http" @@ -28,7 +30,8 @@ type GetCredentials = func(id, secret, token string) *credentials.Credentials const ( numUploads = 100 randomDataPathLength = 1020 - gatewayTestPrefix = "main/data/" + branch = "main" + gatewayTestPrefix = branch + "/data/" ) func newMinioClient(t *testing.T, getCredentials GetCredentials) *minio.Client { @@ -329,6 +332,40 @@ func TestS3HeadBucket(t *testing.T) { }) } +// getOrCreatePathToLargeObject returns a configured existing large +// (largeDataContentLength, 6MiB) object, or creates a new one under +// testPrefix. +func getOrCreatePathToLargeObject(t *testing.T, ctx context.Context, s3lakefsClient *minio.Client, repo, branch string) (string, int64) { + t.Helper() + + path := "source-file" + s3Path := fmt.Sprintf("%s/source-file", branch) + + if physicalAddress := viper.GetString("large_object_path"); physicalAddress != "" { + res, err := client.LinkPhysicalAddressWithResponse(ctx, repo, branch, &apigen.LinkPhysicalAddressParams{Path: path}, apigen.LinkPhysicalAddressJSONRequestBody{ + Checksum: "dont-care", + // TODO(ariels): Check actual length of object! + SizeBytes: largeDataContentLength, + Staging: apigen.StagingLocation{ + PhysicalAddress: &physicalAddress, + }, + }) + require.NoError(t, err) + require.NotNil(t, res.JSON200) + return s3Path, largeDataContentLength + } + + // content + r := rand.New(rand.NewSource(17)) + objContent := testutil.NewRandomReader(r, largeDataContentLength) + + // upload data + _, err := s3lakefsClient.PutObject(ctx, repo, s3Path, objContent, largeDataContentLength, + minio.PutObjectOptions{}) + require.NoError(t, err) + return s3Path, largeDataContentLength +} + func TestS3CopyObjectMultipart(t *testing.T) { ctx, _, repo := setupTest(t) defer tearDownTest(repo) @@ -338,17 +375,10 @@ func TestS3CopyObjectMultipart(t *testing.T) { destRepo := createRepositoryByName(ctx, t, destRepoName) defer deleteRepositoryIfAskedTo(ctx, destRepoName) - // content - r := rand.New(rand.NewSource(17)) - objContent := testutil.NewRandomReader(r, largeDataContentLength) - srcPath := gatewayTestPrefix + "source-file" - destPath := gatewayTestPrefix + "dest-file" - - // upload data s3lakefsClient := newMinioClient(t, credentials.NewStaticV4) - _, err := s3lakefsClient.PutObject(ctx, repo, srcPath, objContent, largeDataContentLength, - minio.PutObjectOptions{}) - require.NoError(t, err) + + srcPath, objectLength := getOrCreatePathToLargeObject(t, ctx, s3lakefsClient, repo, branch) + destPath := gatewayTestPrefix + "dest-file" dest := minio.CopyDestOptions{ Bucket: destRepo, @@ -367,7 +397,7 @@ func TestS3CopyObjectMultipart(t *testing.T) { Object: srcPath, MatchRange: true, Start: minDataContentLengthForMultipart, - End: largeDataContentLength - 1, + End: objectLength - 1, }, } @@ -376,8 +406,8 @@ func TestS3CopyObjectMultipart(t *testing.T) { t.Fatalf("minio.Client.ComposeObject from(%+v) to(%+v): %s", srcs, dest, err) } - if ui.Size != largeDataContentLength { - t.Errorf("Copied %d bytes when expecting %d", ui.Size, largeDataContentLength) + if ui.Size != objectLength { + t.Errorf("Copied %d bytes when expecting %d", ui.Size, objectLength) } // Comparing 2 readers is too much work. Instead just hash them. diff --git a/pkg/testutil/setup.go b/pkg/testutil/setup.go index a1f50a75004..5e3897de9fc 100644 --- a/pkg/testutil/setup.go +++ b/pkg/testutil/setup.go @@ -59,6 +59,7 @@ func SetupTestingEnv(params *SetupTestingEnvParams) (logging.Logger, apigen.Clie viper.SetDefault("lakectl_dir", filepath.Join(currDir, "..")) viper.SetDefault("azure_storage_account", "") viper.SetDefault("azure_storage_access_key", "") + viper.SetDefault("large_object_path", "") err = viper.ReadInConfig() if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { logger.WithError(err).Fatal("Failed to read configuration")