Skip to content

Commit

Permalink
initialize registry and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanj-square committed Sep 20, 2024
1 parent 290a2de commit 5362561
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
13 changes: 7 additions & 6 deletions backend/controller/artefacts/dal_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

type ArtefactRow struct {
ID int64
Digest []byte
}

Expand All @@ -20,19 +21,19 @@ type DAL interface {
}

type DALRegistry struct {
dal DAL
db DAL
}

func NewDALRegistry(dal DAL) *DALRegistry {
return &DALRegistry{dal: dal}
func NewDALRegistry(db DAL) *DALRegistry {
return &DALRegistry{db: db}
}

func MakeKey(id int64, digest []byte) ArtefactKey {
return ArtefactKey{id: id, Digest: sha256.FromBytes(digest)}
}

func (r *DALRegistry) GetMissingDigests(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error) {
have, err := r.dal.GetArtefactDigests(ctx, sha256esToBytes(digests))
have, err := r.db.GetArtefactDigests(ctx, sha256esToBytes(digests))
if err != nil {
return nil, libdal.TranslatePGError(err)
}
Expand All @@ -44,12 +45,12 @@ func (r *DALRegistry) GetMissingDigests(ctx context.Context, digests []sha256.SH

func (r *DALRegistry) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
sha256digest := sha256.Sum(artefact.Content)
_, err := r.dal.CreateArtefact(ctx, sha256digest[:], artefact.Content)
_, err := r.db.CreateArtefact(ctx, sha256digest[:], artefact.Content)
return sha256digest, libdal.TranslatePGError(err)
}

func (r *DALRegistry) Download(_ context.Context, key ArtefactKey) io.ReadCloser {
return &dalArtefactStream{dal: r.dal, id: key.id}
return &dalArtefactStream{dal: r.db, id: key.id}
}

type dalArtefactStream struct {
Expand Down
38 changes: 35 additions & 3 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,18 @@ type Reservation interface {
Rollback(ctx context.Context) error
}

type artefactDAL struct {
db dalsql.Querier
}

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *DAL {
var d *DAL
db := dalsql.New(conn)
d = &DAL{
leaseDAL: leasedal.New(conn),
db: dalsql.New(conn),
db: db,
encryption: encryption,
Registry: artefacts.NewDALRegistry(&artefactDAL{db: db}),
Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL {
return &DAL{
Handle: h,
Expand All @@ -179,13 +185,39 @@ func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Ser
return d
}

func (d *artefactDAL) GetArtefactDigests(ctx context.Context, digests [][]byte) ([]artefacts.ArtefactRow, error) {
results, err := d.db.GetArtefactDigests(ctx, digests)
if err != nil {
return nil, fmt.Errorf("could not retrieve artefact digests: %w", err)
}
return slices.Map(results, func(in dalsql.GetArtefactDigestsRow) artefacts.ArtefactRow {
return artefacts.ArtefactRow{ID: in.ID, Digest: in.Digest}
}), nil
}

func (d *artefactDAL) CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) {
id, err := d.db.CreateArtefact(ctx, digest, content)
if err != nil {
return id, fmt.Errorf("could not create artefact: %w", err)
}
return id, nil
}

func (d *artefactDAL) GetArtefactContentRange(ctx context.Context, start int32, count int32, iD int64) ([]byte, error) {
bytes, err := d.db.GetArtefactContentRange(ctx, start, count, iD)
if err != nil {
return nil, fmt.Errorf("could not retrieve artefact content range: %w", err)
}
return bytes, nil
}

type DAL struct {
*libdal.Handle[DAL]
db dalsql.Querier

leaseDAL *leasedal.DAL
encryption *encryption.Service
registry artefacts.Registry
Registry artefacts.Registry

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *pubsub.Topic[DeploymentNotification]
Expand Down Expand Up @@ -784,7 +816,7 @@ func (d *DAL) loadDeployment(ctx context.Context, deployment dalsql.GetDeploymen
return &model.Artefact{
Path: row.Path,
Executable: row.Executable,
Content: d.registry.Download(ctx, key),
Content: d.Registry.Download(ctx, key),
Digest: key.Digest,
}
})
Expand Down
8 changes: 5 additions & 3 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dal
import (
"bytes"
"context"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"io"
"sync"
"testing"
Expand All @@ -25,6 +26,7 @@ func TestDAL(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())

assert.NoError(t, err)

dal := New(ctx, conn, encryption)
Expand All @@ -50,7 +52,7 @@ func TestDAL(t *testing.T) {
var testSha sha256.SHA256

t.Run("CreateArtefact", func(t *testing.T) {
testSha, err = dal.CreateArtefact(ctx, testContent)
testSha, err = dal.Registry.Upload(ctx, artefacts.Artefact{Content: testContent})
assert.NoError(t, err)
})

Expand Down Expand Up @@ -94,7 +96,7 @@ func TestDAL(t *testing.T) {

t.Run("GetMissingArtefacts", func(t *testing.T) {
misshingSHA := sha256.MustParseSHA256("fae7e4cbdca7167bbea4098c05d596f50bbb18062b61c1dfca3705b4a6c2888c")
missing, err := dal.GetMissingArtefacts(ctx, []sha256.SHA256{testSHA, misshingSHA})
missing, err := dal.Registry.GetMissingDigests(ctx, []sha256.SHA256{testSHA, misshingSHA})
assert.NoError(t, err)
assert.Equal(t, []sha256.SHA256{misshingSHA}, missing)
})
Expand Down Expand Up @@ -195,7 +197,7 @@ func TestCreateArtefactConflict(t *testing.T) {
defer wg.Done()
tx1, err := dal.Begin(ctx)
assert.NoError(t, err)
digest, err := tx1.CreateArtefact(ctx, []byte("content"))
digest, err := dal.Registry.Upload(ctx, artefacts.Artefact{Content: []byte("content")})
assert.NoError(t, err)
time.Sleep(time.Second * 2)
err = tx1.Commit(ctx)
Expand Down
5 changes: 3 additions & 2 deletions backend/controller/timeline/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dal
import (
"bytes"
"context"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -38,7 +39,7 @@ func TestTimelineDAL(t *testing.T) {
var testSha sha256.SHA256

t.Run("CreateArtefact", func(t *testing.T) {
testSha, err = controllerDAL.CreateArtefact(ctx, testContent)
testSha, err = controllerDAL.Registry.Upload(ctx, artefacts.Artefact{Content: testContent})
assert.NoError(t, err)
})

Expand Down Expand Up @@ -166,7 +167,7 @@ func TestDeleteOldEvents(t *testing.T) {
var testSha sha256.SHA256

t.Run("CreateArtefact", func(t *testing.T) {
testSha, err = controllerDAL.CreateArtefact(ctx, testContent)
testSha, err = controllerDAL.Registry.Upload(ctx, artefacts.Artefact{Content: testContent})
assert.NoError(t, err)
})

Expand Down

0 comments on commit 5362561

Please sign in to comment.