Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/treeverse/lakeFS into 757…
Browse files Browse the repository at this point in the history
…4-aws-remote-auth-login
  • Loading branch information
idanovo committed Mar 31, 2024
2 parents fec0a34 + 0a916a4 commit 4628cd0
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 30 deletions.
1 change: 1 addition & 0 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: azure
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_LARGE_OBJECT_PATH: https://esti4hns.blob.core.windows.net/esti-system-testing-data/copy-test-data/data.6mib
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

Expand Down
3 changes: 1 addition & 2 deletions clients/python-wrapper/lakefs/import_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def start(self) -> str:

async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.ImportStatus:
while True:
await asyncio.sleep(poll_interval.total_seconds())
with api_exception_handler():
resp = self._client.sdk_client.import_api.import_status(repository=self._repo_id,
branch=self._branch_id,
Expand All @@ -131,8 +132,6 @@ async def _wait_for_completion(self, poll_interval: timedelta) -> lakefs_sdk.Imp
if resp.error is not None:
raise ImportManagerException(f"Import Error: {resp.error.message}")

await asyncio.sleep(poll_interval.total_seconds())

def wait(self, poll_interval: Optional[timedelta] = timedelta(seconds=2)) -> ImportStatus:
"""
Poll a started import task ID, blocking until completion
Expand Down
41 changes: 41 additions & 0 deletions cmd/lakectl/cmd/fs_presign.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cmd

import (
"fmt"
"net/http"

"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
)

var fsPresignCmd = &cobra.Command{
Use: "presign <path URI>",
Short: "return a pre-signed URL for reading the specified object",
Args: cobra.ExactArgs(1),
ValidArgsFunction: ValidArgsRepository,
Run: func(cmd *cobra.Command, args []string) {
pathURI := MustParsePathURI("path URI", args[0])
client := getClient()
preSignMode := getServerPreSignMode(cmd.Context(), client)
if !preSignMode.Enabled {
Die("Pre-signed URL support is currently disabled for this lakeFS server", 1)
}

resp, err := client.StatObjectWithResponse(cmd.Context(), pathURI.Repository, pathURI.Ref, &apigen.StatObjectParams{
Path: *pathURI.Path,
Presign: swag.Bool(preSignMode.Enabled),
UserMetadata: swag.Bool(true),
})
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)
if resp.JSON200 == nil {
Die("Bad response from server", 1)
}
fmt.Printf("%s\n", resp.JSON200.PhysicalAddress)
},
}

//nolint:gochecknoinits
func init() {
fsCmd.AddCommand(fsPresignCmd)
}
27 changes: 14 additions & 13 deletions cmd/lakectl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,30 +166,31 @@ type PresignMode struct {
Multipart bool
}

func getServerPreSignMode(ctx context.Context, client *apigen.ClientWithResponses) PresignMode {
resp, err := client.GetConfigWithResponse(ctx)
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)
if resp.JSON200 == nil {
Die("Bad response from server", 1)
}
storageConfig := resp.JSON200.StorageConfig
return PresignMode{
Enabled: storageConfig.PreSignSupport,
Multipart: swag.BoolValue(storageConfig.PreSignMultipartUpload),
}
}

func getPresignMode(cmd *cobra.Command, client *apigen.ClientWithResponses) PresignMode {
// use flags if set
presignFlag := cmd.Flags().Lookup(presignFlagName)

var presignMode PresignMode
if presignFlag.Changed {
presignMode.Enabled = Must(cmd.Flags().GetBool(presignFlagName))
}

// fetch server config if needed
// if presign flag is not set, use server config
// if presign flag is set, check if server supports multipart upload
var storageConfig *apigen.StorageConfig
if !presignFlag.Changed || presignMode.Enabled {
resp, err := client.GetConfigWithResponse(cmd.Context())
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)
if resp.JSON200 == nil {
Die("Bad response from server", 1)
}
storageConfig = resp.JSON200.StorageConfig
if !presignFlag.Changed {
presignMode.Enabled = storageConfig.PreSignSupport
}
presignMode.Multipart = swag.BoolValue(storageConfig.PreSignMultipartUpload)
presignMode = getServerPreSignMode(cmd.Context(), client)
}
return presignMode
}
Expand Down
17 changes: 17 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,23 @@ lakectl fs ls <path URI> [flags]



### lakectl fs presign

return a pre-signed URL for reading the specified object

```
lakectl fs presign <path URI> [flags]
```

#### Options
{:.no_toc}

```
-h, --help help for presign
```



### lakectl fs rm

Delete object
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ lakeFS supports the following API operations:
1. [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html){:target="_blank"}
1. [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html){:target="_blank"}
1. [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html){:target="_blank"}
1. [ListParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html){:target="_blank"}
1. [~~ListParts~~](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html){:target="_blank"} **Currently unsupported.** [Link to tracked issue](https://github.com/treeverse/lakeFS/issues/7600){:target="_blank"}
1. [Upload Part](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html){:target="_blank"}
1. [UploadPartCopy](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html){:target="_blank"}

Expand Down
1 change: 1 addition & 0 deletions esti/golden/lakectl_fs_presign.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<PRE_SIGN_URL>
31 changes: 31 additions & 0 deletions esti/lakectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,37 @@ func getStorageConfig(t *testing.T) *apigen.StorageConfig {
return storageResp.JSON200
}

func TestLakectlFsPresign(t *testing.T) {
config := getStorageConfig(t)
if !config.PreSignSupport {
t.Skip()
}
repoName := generateUniqueRepositoryName()
storage := generateUniqueStorageNamespace(repoName)
vars := map[string]string{
"REPO": repoName,
"STORAGE": storage,
"BRANCH": mainBranch,
}
RunCmdAndVerifySuccessWithFile(t, Lakectl()+" repo create lakefs://"+repoName+" "+storage, false, "lakectl_repo_create", vars)

// upload some data
const totalObjects = 2
for i := 0; i < totalObjects; i++ {
vars["FILE_PATH"] = fmt.Sprintf("data/ro/ro_1k.%d", i)
RunCmdAndVerifySuccessWithFile(t, Lakectl()+" fs upload -s files/ro_1k lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"], false, "lakectl_fs_upload", vars)
}

goldenFile := "lakectl_fs_presign"
RunCmdAndVerifySuccessWithFile(t, Lakectl()+" fs presign lakefs://"+repoName+"/"+mainBranch+"/data/ro/ro_1k.0", false, goldenFile, map[string]string{
"REPO": repoName,
"STORAGE": storage,
"BRANCH": mainBranch,
"PATH": "data/ro",
"FILE": "ro_1k.0",
})
}

func TestLakectlFsStat(t *testing.T) {
repoName := generateUniqueRepositoryName()
storage := generateUniqueStorageNamespace(repoName)
Expand Down
1 change: 1 addition & 0 deletions esti/ops/docker-compose-external-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ services:
- ESTI_LAKECTL_DIR=/app
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_LARGE_OBJECT_PATH
- ESTI_FORCE_PATH_STYLE
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
Expand Down
58 changes: 44 additions & 14 deletions esti/s3_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package esti

import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"net/http"
Expand All @@ -28,7 +30,8 @@ type GetCredentials = func(id, secret, token string) *credentials.Credentials
const (
numUploads = 100
randomDataPathLength = 1020
gatewayTestPrefix = "main/data/"
branch = "main"
gatewayTestPrefix = branch + "/data/"
)

func newMinioClient(t *testing.T, getCredentials GetCredentials) *minio.Client {
Expand Down Expand Up @@ -329,6 +332,40 @@ func TestS3HeadBucket(t *testing.T) {
})
}

// getOrCreatePathToLargeObject returns a configured existing large
// (largeDataContentLength, 6MiB) object, or creates a new one under
// testPrefix.
func getOrCreatePathToLargeObject(t *testing.T, ctx context.Context, s3lakefsClient *minio.Client, repo, branch string) (string, int64) {
t.Helper()

path := "source-file"
s3Path := fmt.Sprintf("%s/source-file", branch)

if physicalAddress := viper.GetString("large_object_path"); physicalAddress != "" {
res, err := client.LinkPhysicalAddressWithResponse(ctx, repo, branch, &apigen.LinkPhysicalAddressParams{Path: path}, apigen.LinkPhysicalAddressJSONRequestBody{
Checksum: "dont-care",
// TODO(ariels): Check actual length of object!
SizeBytes: largeDataContentLength,
Staging: apigen.StagingLocation{
PhysicalAddress: &physicalAddress,
},
})
require.NoError(t, err)
require.NotNil(t, res.JSON200)
return s3Path, largeDataContentLength
}

// content
r := rand.New(rand.NewSource(17))
objContent := testutil.NewRandomReader(r, largeDataContentLength)

// upload data
_, err := s3lakefsClient.PutObject(ctx, repo, s3Path, objContent, largeDataContentLength,
minio.PutObjectOptions{})
require.NoError(t, err)
return s3Path, largeDataContentLength
}

func TestS3CopyObjectMultipart(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)
Expand All @@ -338,17 +375,10 @@ func TestS3CopyObjectMultipart(t *testing.T) {
destRepo := createRepositoryByName(ctx, t, destRepoName)
defer deleteRepositoryIfAskedTo(ctx, destRepoName)

// content
r := rand.New(rand.NewSource(17))
objContent := testutil.NewRandomReader(r, largeDataContentLength)
srcPath := gatewayTestPrefix + "source-file"
destPath := gatewayTestPrefix + "dest-file"

// upload data
s3lakefsClient := newMinioClient(t, credentials.NewStaticV4)
_, err := s3lakefsClient.PutObject(ctx, repo, srcPath, objContent, largeDataContentLength,
minio.PutObjectOptions{})
require.NoError(t, err)

srcPath, objectLength := getOrCreatePathToLargeObject(t, ctx, s3lakefsClient, repo, branch)
destPath := gatewayTestPrefix + "dest-file"

dest := minio.CopyDestOptions{
Bucket: destRepo,
Expand All @@ -367,7 +397,7 @@ func TestS3CopyObjectMultipart(t *testing.T) {
Object: srcPath,
MatchRange: true,
Start: minDataContentLengthForMultipart,
End: largeDataContentLength - 1,
End: objectLength - 1,
},
}

Expand All @@ -376,8 +406,8 @@ func TestS3CopyObjectMultipart(t *testing.T) {
t.Fatalf("minio.Client.ComposeObject from(%+v) to(%+v): %s", srcs, dest, err)
}

if ui.Size != largeDataContentLength {
t.Errorf("Copied %d bytes when expecting %d", ui.Size, largeDataContentLength)
if ui.Size != objectLength {
t.Errorf("Copied %d bytes when expecting %d", ui.Size, objectLength)
}

// Comparing 2 readers is too much work. Instead just hash them.
Expand Down
1 change: 1 addition & 0 deletions pkg/testutil/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func SetupTestingEnv(params *SetupTestingEnvParams) (logging.Logger, apigen.Clie
viper.SetDefault("lakectl_dir", filepath.Join(currDir, ".."))
viper.SetDefault("azure_storage_account", "")
viper.SetDefault("azure_storage_access_key", "")
viper.SetDefault("large_object_path", "")
err = viper.ReadInConfig()
if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) {
logger.WithError(err).Fatal("Failed to read configuration")
Expand Down

0 comments on commit 4628cd0

Please sign in to comment.