Skip to content

Commit

Permalink
Add physical address to the stat response (#1204)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Jan 18, 2021
1 parent 674d318 commit a4ab2eb
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 41 deletions.
60 changes: 44 additions & 16 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,18 +896,28 @@ func (c *Controller) ObjectsStatObjectHandler() objects.StatObjectHandler {
if errors.Is(err, db.ErrNotFound) {
return objects.NewStatObjectNotFound().WithPayload(responseError("resource not found"))
}
if err != nil {
return objects.NewStatObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

repo, err := cataloger.GetRepository(c.Context(), params.Repository)
if err != nil {
return objects.NewStatObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

qk, err := block.ResolveNamespace(repo.StorageNamespace, entry.PhysicalAddress)
if err != nil {
return objects.NewStatObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

// serialize entry
obj := &models.ObjectStats{
Checksum: entry.Checksum,
Mtime: entry.CreationDate.Unix(),
Path: params.Path,
PathType: models.ObjectStatsPathTypeObject,
SizeBytes: entry.Size,
Checksum: entry.Checksum,
Mtime: entry.CreationDate.Unix(),
Path: params.Path,
PhysicalAddress: qk.Format(),
PathType: models.ObjectStatsPathTypeObject,
SizeBytes: entry.Size,
}

if entry.Expired {
Expand Down Expand Up @@ -1138,9 +1148,19 @@ func (c *Controller) ObjectsListObjectsHandler() objects.ListObjectsHandler {
WithPayload(responseError("error while listing objects: %s", err))
}

repo, err := cataloger.GetRepository(c.Context(), params.Repository)
if err != nil {
return objects.NewStatObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

objList := make([]*models.ObjectStats, len(res))
var lastID string
for i, entry := range res {
qk, err := block.ResolveNamespace(repo.StorageNamespace, entry.PhysicalAddress)
if err != nil {
return objects.NewStatObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

if entry.CommonLevel {
objList[i] = &models.ObjectStats{
Path: entry.Path,
Expand All @@ -1152,11 +1172,12 @@ func (c *Controller) ObjectsListObjectsHandler() objects.ListObjectsHandler {
mtime = entry.CreationDate.Unix()
}
objList[i] = &models.ObjectStats{
Checksum: entry.Checksum,
Mtime: mtime,
Path: entry.Path,
PathType: models.ObjectStatsPathTypeObject,
SizeBytes: entry.Size,
Checksum: entry.Checksum,
Mtime: mtime,
Path: entry.Path,
PhysicalAddress: qk.Format(),
PathType: models.ObjectStatsPathTypeObject,
SizeBytes: entry.Size,
}
}
lastID = entry.Path
Expand Down Expand Up @@ -1213,7 +1234,7 @@ func (c *Controller) ObjectsUploadObjectHandler() objects.UploadObjectHandler {
}
byteSize := file.Header.Size

// read the content
// write the content
blob, err := upload.WriteBlob(deps.BlockAdapter, repo.StorageNamespace, params.Content, byteSize, block.PutOpts{StorageClass: params.StorageClass})
if err != nil {
return objects.NewUploadObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
Expand Down Expand Up @@ -1241,12 +1262,19 @@ func (c *Controller) ObjectsUploadObjectHandler() objects.UploadObjectHandler {
if err != nil {
return objects.NewUploadObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

qk, err := block.ResolveNamespace(repo.StorageNamespace, blob.PhysicalAddress)
if err != nil {
return objects.NewUploadObjectDefault(http.StatusInternalServerError).WithPayload(responseErrorFrom(err))
}

return objects.NewUploadObjectCreated().WithPayload(&models.ObjectStats{
Checksum: blob.Checksum,
Mtime: writeTime.Unix(),
Path: params.Path,
PathType: models.ObjectStatsPathTypeObject,
SizeBytes: blob.Size,
Checksum: blob.Checksum,
Mtime: writeTime.Unix(),
Path: params.Path,
PhysicalAddress: qk.Format(),
PathType: models.ObjectStatsPathTypeObject,
SizeBytes: blob.Size,
})
})
}
Expand Down
59 changes: 34 additions & 25 deletions api/api_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,21 +751,22 @@ func TestHandler_ObjectsStatObjectHandler(t *testing.T) {
clt.SetTransport(&handlerTransport{Handler: handler})

ctx := context.Background()
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://some-bucket", "master")
if err != nil {
t.Fatal(err)
}

t.Run("get object stats", func(t *testing.T) {
entry := catalog.Entry{
Path: "foo/bar",
PhysicalAddress: "this_is_bars_address",
CreationDate: time.Now(),
Size: 666,
Checksum: "this_is_a_checksum",
Metadata: nil,
}
testutil.Must(t,
deps.cataloger.CreateEntry(ctx, "repo1", "master", catalog.Entry{
Path: "foo/bar",
PhysicalAddress: "this_is_bars_address",
CreationDate: time.Now(),
Size: 666,
Checksum: "this_is_a_checksum",
Metadata: nil,
}, catalog.CreateEntryParams{}))
deps.cataloger.CreateEntry(ctx, "repo1", "master", entry, catalog.CreateEntryParams{}))
if err != nil {
t.Fatal(err)
}
Expand All @@ -778,12 +779,15 @@ func TestHandler_ObjectsStatObjectHandler(t *testing.T) {
if err != nil {
t.Fatalf("did not expect error for stat, got %s", err)
}
if resp.Payload.Path != "foo/bar" {
if resp.Payload.Path != entry.Path {
t.Fatalf("expected to get back our path, got %s", resp.Payload.Path)
}
if resp.Payload.SizeBytes != 666 {
if resp.Payload.SizeBytes != entry.Size {
t.Fatalf("expected correct size, got %d", resp.Payload.SizeBytes)
}
if resp.Payload.PhysicalAddress != "s3://some-bucket/"+entry.PhysicalAddress {
t.Fatalf("expected correct PhysicalAddress, got %s", resp.Payload.PhysicalAddress)
}

_, err = clt.Objects.StatObject(&objects.StatObjectParams{
Ref: "master:HEAD",
Expand All @@ -797,16 +801,17 @@ func TestHandler_ObjectsStatObjectHandler(t *testing.T) {
})

t.Run("get expired object stats", func(t *testing.T) {
entry := catalog.Entry{
Path: "foo/expired",
PhysicalAddress: "this_address_is_expired",
CreationDate: time.Now(),
Size: 999999,
Checksum: "eeee",
Metadata: nil,
Expired: true,
}
testutil.Must(t,
deps.cataloger.CreateEntry(ctx, "repo1", "master", catalog.Entry{
Path: "foo/expired",
PhysicalAddress: "this_address_is_expired",
CreationDate: time.Now(),
Size: 999999,
Checksum: "eeee",
Metadata: nil,
Expired: true,
}, catalog.CreateEntryParams{}))
deps.cataloger.CreateEntry(ctx, "repo1", "master", entry, catalog.CreateEntryParams{}))
if err != nil {
t.Fatal(err)
}
Expand All @@ -820,12 +825,16 @@ func TestHandler_ObjectsStatObjectHandler(t *testing.T) {
if !ok {
t.Fatalf("expected StatObjectGone error but got %#v (response %v)", err, resp)
}
if gone.Payload.Path != "foo/expired" {
if gone.Payload.Path != entry.Path {
t.Fatalf("expected to get back our path, got %s", gone.Payload.Path)
}
if gone.Payload.SizeBytes != 999999 {
if gone.Payload.SizeBytes != entry.Size {
t.Fatalf("expected correct size, got %d", gone.Payload.SizeBytes)
}
if gone.Payload.PhysicalAddress != "s3://some-bucket/"+entry.PhysicalAddress {
t.Fatalf("expected correct PhysicalAddress, got %s", gone.Payload.PhysicalAddress)
}

})
}

Expand All @@ -840,7 +849,7 @@ func TestHandler_ObjectsListObjectsHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "gs://bucket/prefix", "master")
testutil.Must(t, err)
testutil.Must(t,
deps.cataloger.CreateEntry(ctx, "repo1", "master", catalog.Entry{
Expand Down Expand Up @@ -1045,7 +1054,7 @@ func TestHandler_ObjectsUploadObjectHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "gs://bucket/prefix", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1145,7 +1154,7 @@ func TestHandler_ObjectsDeleteObjectHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://some-bucket/prefix", "master")
if err != nil {
t.Fatal(err)
}
Expand Down
19 changes: 19 additions & 0 deletions block/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net/url"
"path"
"strings"
)

Expand Down Expand Up @@ -32,6 +33,24 @@ type QualifiedPrefix struct {
Prefix string
}

func (qk QualifiedKey) Format() string {
scheme := ""
switch qk.StorageType {
case StorageTypeMem:
scheme = "mem"
case StorageTypeLocal:
scheme = "local"
case StorageTypeGS:
scheme = "gs"
case StorageTypeS3:
scheme = "s3"
default:
panic("unknown storage type")
}

return fmt.Sprintf("%s://%s", scheme, path.Join(qk.StorageNamespace, qk.Key))
}

func GetStorageType(namespaceURL *url.URL) (StorageType, error) {
var st StorageType
switch namespaceURL.Scheme {
Expand Down
45 changes: 45 additions & 0 deletions block/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,48 @@ func TestResolveNamespace(t *testing.T) {
})
}
}

func TestFormatQualifiedKey(t *testing.T) {
cases := []struct {
Name string
QualifiedKey block.QualifiedKey
Expected string
}{
{
Name: "simple_path",
QualifiedKey: block.QualifiedKey{
StorageType: block.StorageTypeGS,
StorageNamespace: "some-bucket",
Key: "path",
},
Expected: "gs://some-bucket/path",
},
{
Name: "path_with_prefix",
QualifiedKey: block.QualifiedKey{
StorageType: block.StorageTypeS3,
StorageNamespace: "some-bucket/",
Key: "/path/to/file",
},
Expected: "s3://some-bucket/path/to/file",
},
{
Name: "bucket_with_prefix",
QualifiedKey: block.QualifiedKey{
StorageType: block.StorageTypeS3,
StorageNamespace: "some-bucket/prefix/",
Key: "/path/to/file",
},
Expected: "s3://some-bucket/prefix/path/to/file",
},
}

for _, cas := range cases {
t.Run(cas.Name, func(t *testing.T) {
formatted := cas.QualifiedKey.Format()
if formatted != cas.Expected {
t.Fatalf("expected %v got %v", cas.Expected, formatted)
}
})
}
}
2 changes: 2 additions & 0 deletions docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ definitions:
properties:
path:
type: string
physical_address:
type: string
checksum:
type: string
mtime:
Expand Down
2 changes: 2 additions & 0 deletions swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ definitions:
properties:
path:
type: string
physical_address:
type: string
checksum:
type: string
mtime:
Expand Down

0 comments on commit a4ab2eb

Please sign in to comment.