Skip to content

Commit

Permalink
migrate uploader to aws-sdk-go-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
rudream committed Aug 1, 2024
1 parent 4fd411a commit 4f9fe11
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 373 deletions.
6 changes: 3 additions & 3 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ type Streamer interface {
// StreamPart represents uploaded stream part
type StreamPart struct {
// Number is a part number
Number int64
Number int32
// ETag is a part e-tag
ETag string
}
Expand Down Expand Up @@ -914,9 +914,9 @@ type MultipartUploader interface {
CompleteUpload(ctx context.Context, upload StreamUpload, parts []StreamPart) error
// ReserveUploadPart reserves an upload part. Reserve is used to identify
// upload errors beforehand.
ReserveUploadPart(ctx context.Context, upload StreamUpload, partNumber int64) error
ReserveUploadPart(ctx context.Context, upload StreamUpload, partNumber int32) error
// UploadPart uploads part and returns the part
UploadPart(ctx context.Context, upload StreamUpload, partNumber int64, partBody io.ReadSeeker) (*StreamPart, error)
UploadPart(ctx context.Context, upload StreamUpload, partNumber int32, partBody io.ReadSeeker) (*StreamPart, error)
// ListParts returns all uploaded parts for the completed upload in sorted order
ListParts(ctx context.Context, upload StreamUpload) ([]StreamPart, error)
// ListUploads lists uploads that have been initiated but not completed with
Expand Down
10 changes: 5 additions & 5 deletions lib/events/azsessions/azsessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func partPrefix(upload events.StreamUpload) string {
}

// partName returns the name of the blob for a specific part in an upload.
func partName(upload events.StreamUpload, partNumber int64) string {
func partName(upload events.StreamUpload, partNumber int32) string {
return fmt.Sprintf("%v%v", partPrefix(upload), partNumber)
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func (h *Handler) uploadMarkerBlob(upload events.StreamUpload) *blockblob.Client

// partBlob returns a BlockBlobClient for the blob of the part of the specified
// upload, with the given part number.
func (h *Handler) partBlob(upload events.StreamUpload, partNumber int64) *blockblob.Client {
func (h *Handler) partBlob(upload events.StreamUpload, partNumber int32) *blockblob.Client {
return h.inprogress.NewBlockBlobClient(partName(upload, partNumber))
}

Expand Down Expand Up @@ -440,12 +440,12 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload
}

// ReserveUploadPart implements [events.MultipartUploader].
func (*Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64) error {
func (*Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32) error {
return nil
}

// UploadPart implements [events.MultipartUploader].
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32, partBody io.ReadSeeker) (*events.StreamPart, error) {
partBlob := h.partBlob(upload, partNumber)

// our parts are just over 5 MiB (events.MinUploadPartSizeBytes) so we can
Expand Down Expand Up @@ -493,7 +493,7 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
continue
}

parts = append(parts, events.StreamPart{Number: partNumber})
parts = append(parts, events.StreamPart{Number: int32(partNumber)})
}
}

Expand Down
16 changes: 8 additions & 8 deletions lib/events/eventstest/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type MemoryUpload struct {
// id is the upload ID
id string
// parts is the upload parts
parts map[int64][]byte
parts map[int32][]byte
// sessionID is the session ID associated with the upload
sessionID session.ID
//completed specifies upload as completed
Expand Down Expand Up @@ -105,7 +105,7 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID)
m.uploads[upload.ID] = &MemoryUpload{
id: upload.ID,
sessionID: sessionID,
parts: make(map[int64][]byte),
parts: make(map[int32][]byte),
Initiated: upload.Initiated,
}
return upload, nil
Expand All @@ -124,7 +124,7 @@ func (m *MemoryUploader) CompleteUpload(ctx context.Context, upload events.Strea
}
// verify that all parts have been uploaded
var result []byte
partsSet := make(map[int64]bool, len(parts))
partsSet := make(map[int32]bool, len(parts))
for _, part := range parts {
partsSet[part.Number] = true
data, ok := up.parts[part.Number]
Expand All @@ -146,7 +146,7 @@ func (m *MemoryUploader) CompleteUpload(ctx context.Context, upload events.Strea
}

// UploadPart uploads part and returns the part
func (m *MemoryUploader) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
func (m *MemoryUploader) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32, partBody io.ReadSeeker) (*events.StreamPart, error) {
data, err := io.ReadAll(partBody)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -190,7 +190,7 @@ func (m *MemoryUploader) GetParts(uploadID string) ([][]byte, error) {
return nil, trace.NotFound("upload %q is not found", uploadID)
}

partNumbers := make([]int64, 0, len(up.parts))
partNumbers := make([]int32, 0, len(up.parts))
sortedParts := make([][]byte, 0, len(up.parts))
for partNumber := range up.parts {
partNumbers = append(partNumbers, partNumber)
Expand Down Expand Up @@ -222,7 +222,7 @@ func (m *MemoryUploader) ListParts(ctx context.Context, upload events.StreamUplo
return nil, trace.NotFound("upload %v is not found", upload.ID)
}

partNumbers := make([]int64, 0, len(up.parts))
partNumbers := make([]int32, 0, len(up.parts))
sortedParts := make([]events.StreamPart, 0, len(up.parts))
for partNumber := range up.parts {
partNumbers = append(partNumbers, partNumber)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (m *MemoryUploader) GetUploadMetadata(sid session.ID) events.UploadMetadata
}

// ReserveUploadPart reserves an upload part.
func (m *MemoryUploader) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64) error {
func (m *MemoryUploader) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32) error {
return nil
}

Expand Down Expand Up @@ -307,7 +307,7 @@ func (m *MockUploader) CreateUpload(ctx context.Context, sessionID session.ID) (
}, nil
}

func (m *MockUploader) ReserveUploadPart(_ context.Context, _ events.StreamUpload, _ int64) error {
func (m *MockUploader) ReserveUploadPart(_ context.Context, _ events.StreamUpload, _ int32) error {
return m.ReserveUploadPartError
}

Expand Down
18 changes: 9 additions & 9 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (h *Handler) CreateUpload(ctx context.Context, sessionID session.ID) (*even
}

// UploadPart uploads part
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32, partBody io.ReadSeeker) (*events.StreamPart, error) {
if err := checkUpload(upload); err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (h *Handler) GetUploadMetadata(s session.ID) events.UploadMetadata {
}

// ReserveUploadPart reserves an upload part.
func (h *Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64) error {
func (h *Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32) error {
file, partPath, err := h.openReservationPart(upload, partNumber)
if err != nil {
return trace.ConvertSystemError(err)
Expand All @@ -361,7 +361,7 @@ func (h *Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpl
}

// openReservationPart opens a reservation upload part file.
func (h *Handler) openReservationPart(upload events.StreamUpload, partNumber int64) (*os.File, string, error) {
func (h *Handler) openReservationPart(upload events.StreamUpload, partNumber int32) (*os.File, string, error) {
partPath := h.reservationPath(upload, partNumber)
file, err := GetOpenFileFunc()(partPath, os.O_RDWR|os.O_CREATE, 0o600)
if err != nil {
Expand All @@ -383,23 +383,23 @@ func (h *Handler) uploadPath(upload events.StreamUpload) string {
return filepath.Join(h.uploadRootPath(upload), string(upload.SessionID))
}

func (h *Handler) partPath(upload events.StreamUpload, partNumber int64) string {
func (h *Handler) partPath(upload events.StreamUpload, partNumber int32) string {
return filepath.Join(h.uploadPath(upload), partFileName(partNumber))
}

func (h *Handler) reservationPath(upload events.StreamUpload, partNumber int64) string {
func (h *Handler) reservationPath(upload events.StreamUpload, partNumber int32) string {
return filepath.Join(h.uploadPath(upload), reservationFileName(partNumber))
}

func partFileName(partNumber int64) string {
func partFileName(partNumber int32) string {
return fmt.Sprintf("%v%v", partNumber, partExt)
}

func reservationFileName(partNumber int64) string {
func reservationFileName(partNumber int32) string {
return fmt.Sprintf("%v%v", partNumber, reservationExt)
}

func partFromFileName(fileName string) (int64, error) {
func partFromFileName(fileName string) (int32, error) {
base := filepath.Base(fileName)
if filepath.Ext(base) != partExt {
return -1, trace.BadParameter("expected extension %v, got %v", partExt, base)
Expand All @@ -409,7 +409,7 @@ func partFromFileName(fileName string) (int64, error) {
if err != nil {
return -1, trace.Wrap(err)
}
return partNumber, nil
return int32(partNumber), nil
}

// checkUpload checks that upload IDs are valid
Expand Down
24 changes: 12 additions & 12 deletions lib/events/filesessions/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestReserveUploadPart(t *testing.T) {
ctx := context.Background()
partNumber := int64(1)
partNumber := int32(1)
dir := t.TempDir()

handler, err := NewHandler(Config{
Expand All @@ -54,7 +54,7 @@ func TestReserveUploadPart(t *testing.T) {

func TestUploadPart(t *testing.T) {
ctx := context.Background()
partNumber := int64(1)
partNumber := int32(1)
dir := t.TempDir()
expectedContent := []byte("upload part contents")

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestCompleteUpload(t *testing.T) {
ctx := context.Background()

// Create some upload parts using reserve + write.
createPart := func(t *testing.T, handler *Handler, upload *events.StreamUpload, partNumber int64, content []byte) events.StreamPart {
createPart := func(t *testing.T, handler *Handler, upload *events.StreamUpload, partNumber int32, content []byte) events.StreamPart {
err := handler.ReserveUploadPart(ctx, *upload, partNumber)
require.NoError(t, err)

Expand All @@ -111,27 +111,27 @@ func TestCompleteUpload(t *testing.T) {
desc: "PartsWithContent",
expectedContent: []byte("helloworld"),
partsFunc: func(t *testing.T, handler *Handler, upload *events.StreamUpload) {
createPart(t, handler, upload, int64(1), []byte("hello"))
createPart(t, handler, upload, int64(2), []byte("world"))
createPart(t, handler, upload, int32(1), []byte("hello"))
createPart(t, handler, upload, int32(2), []byte("world"))
},
},
{
desc: "ReservationParts",
expectedContent: []byte("helloworldwithreservation"),
partsFunc: func(t *testing.T, handler *Handler, upload *events.StreamUpload) {
createPart(t, handler, upload, int64(1), []byte{})
createPart(t, handler, upload, int64(2), []byte("hello"))
createPart(t, handler, upload, int64(3), []byte("world"))
createPart(t, handler, upload, int64(4), []byte{})
createPart(t, handler, upload, int64(5), []byte("withreservation"))
createPart(t, handler, upload, int32(1), []byte{})
createPart(t, handler, upload, int32(2), []byte("hello"))
createPart(t, handler, upload, int32(3), []byte("world"))
createPart(t, handler, upload, int32(4), []byte{})
createPart(t, handler, upload, int32(5), []byte("withreservation"))
},
},
{
desc: "OnlyReservation",
expectedContent: []byte{},
partsFunc: func(t *testing.T, handler *Handler, upload *events.StreamUpload) {
createPart(t, handler, upload, int64(1), []byte{})
createPart(t, handler, upload, int64(2), []byte{})
createPart(t, handler, upload, int32(1), []byte{})
createPart(t, handler, upload, int32(2), []byte{})
},
},
} {
Expand Down
8 changes: 4 additions & 4 deletions lib/events/gcssessions/gcsstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (h *Handler) CreateUpload(ctx context.Context, sessionID session.ID) (*even
}

// UploadPart uploads part
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32, partBody io.ReadSeeker) (*events.StreamPart, error) {
if err := upload.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (h *Handler) GetUploadMetadata(s session.ID) events.UploadMetadata {
}

// ReserveUploadPart reserves an upload part.
func (h *Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64) error {
func (h *Handler) ReserveUploadPart(ctx context.Context, upload events.StreamUpload, partNumber int32) error {
return nil
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func (h *Handler) partsPrefix(upload events.StreamUpload) string {
}

// partPath is "path/parts/<upload-id>/<part-number>.part"
func (h *Handler) partPath(upload events.StreamUpload, partNumber int64) string {
func (h *Handler) partPath(upload events.StreamUpload, partNumber int32) string {
return path.Join(h.partsPrefix(upload), fmt.Sprintf("%v%v", partNumber, partExt))
}

Expand Down Expand Up @@ -398,5 +398,5 @@ func partFromPath(uploadPath string) (*events.StreamPart, error) {
if err != nil {
return nil, trace.Wrap(err)
}
return &events.StreamPart{Number: partNumber}, nil
return &events.StreamPart{Number: int32(partNumber)}, nil
}
Loading

0 comments on commit 4f9fe11

Please sign in to comment.