Skip to content

Commit

Permalink
resolving merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanj-square committed Sep 30, 2024
1 parent a50ecc8 commit 5aea06b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 59 deletions.
8 changes: 6 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"hash"
"io"
"net/http"
Expand Down Expand Up @@ -221,6 +222,7 @@ type Service struct {
tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
pubSub *pubsub.Service
registry *artefacts.Service
timeline *timeline.Service
controllerListListeners []ControllerListListener

Expand Down Expand Up @@ -275,6 +277,8 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub

svc.registry = artefacts.New(ctx, conn)

svc.dal = dal.New(ctx, conn, encryption, pubSub)

timelineSvc := timeline.New(ctx, conn, encryption)
Expand Down Expand Up @@ -1094,7 +1098,7 @@ func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftl
if err != nil {
return nil, err
}
need, err := s.dal.GetMissingArtefacts(ctx, byteDigests)
_, need, err := s.registry.GetDigestsKeys(ctx, byteDigests)
if err != nil {
return nil, err
}
Expand All @@ -1105,7 +1109,7 @@ func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftl

func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1.UploadArtefactRequest]) (*connect.Response[ftlv1.UploadArtefactResponse], error) {
logger := log.FromContext(ctx)
digest, err := s.dal.CreateArtefact(ctx, req.Msg.Content)
digest, err := s.registry.Upload(ctx, artefacts.Artefact{Content: req.Msg.Content})
if err != nil {
return nil, err
}
Expand Down
86 changes: 29 additions & 57 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"time"

"github.com/alecthomas/types/optional"
inprocesspubsub "github.com/alecthomas/types/pubsub"
sets "github.com/deckarep/golang-set/v2"
xmaps "golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -55,17 +53,20 @@ type Reservation interface {

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL {
var d *DAL
db := dalsql.New(conn)
d = &DAL{
leaser: dbleaser.NewDatabaseLeaser(conn),
db: dalsql.New(conn),
db: db,
encryption: encryption,
registry: artefacts.New(ctx, conn),
Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL {
return &DAL{
Handle: h,
db: dalsql.New(h.Connection),
leaser: dbleaser.NewDatabaseLeaser(h.Connection),
pubsub: pubsub,
encryption: d.encryption,
registry: artefacts.New(ctx, h.Connection),
DeploymentChanges: d.DeploymentChanges,
}
}),
Expand All @@ -82,6 +83,7 @@ type DAL struct {
leaser *dbleaser.DatabaseLeaser
pubsub *pubsub.Service
encryption *encryption.Service
registry *artefacts.Service

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification]
Expand Down Expand Up @@ -194,25 +196,6 @@ func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err erro
return libdal.TranslatePGError(err)
}

// GetMissingArtefacts returns the digests of the artefacts that are missing from the database.
func (d *DAL) GetMissingArtefacts(ctx context.Context, digests []sha256.SHA256) ([]sha256.SHA256, error) {
have, err := d.db.GetArtefactDigests(ctx, sha256esToBytes(digests))
if err != nil {
return nil, libdal.TranslatePGError(err)
}
haveStr := slices.Map(have, func(in dalsql.GetArtefactDigestsRow) sha256.SHA256 {
return sha256.FromBytes(in.Digest)
})
return sets.NewSet(digests...).Difference(sets.NewSet(haveStr...)).ToSlice(), nil
}

// CreateArtefact inserts a new artefact into the database and returns its ID.
func (d *DAL) CreateArtefact(ctx context.Context, content []byte) (digest sha256.SHA256, err error) {
sha256digest := sha256.Sum(content)
_, err = d.db.CreateArtefact(ctx, sha256digest[:], content)
return sha256digest, libdal.TranslatePGError(err)
}

type IngressRoutingEntry struct {
Verb string
Method string
Expand Down Expand Up @@ -281,19 +264,21 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
return model.DeploymentKey{}, fmt.Errorf("failed to create deployment: %w", libdal.TranslatePGError(err))
}

uploadedDigests := slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) []byte { return in.Digest[:] })
artefactDigests, err := tx.db.GetArtefactDigests(ctx, uploadedDigests)
uploadedDigests := slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) sha256.SHA256 { return sha256.FromBytes(in.Digest[:]) })
keys, missing, err := tx.registry.GetDigestsKeys(ctx, uploadedDigests)
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to get artefact digests: %w", err)
}
if len(artefactDigests) != len(artefacts) {
missingDigests := strings.Join(slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) string { return in.Digest.String() }), ", ")
return model.DeploymentKey{}, fmt.Errorf("missing %d artefacts: %s", len(artefacts)-len(artefactDigests), missingDigests)
if len(missing) > 0 {
m := slices.Reduce(missing, "", func(join string, in sha256.SHA256) string {
return fmt.Sprintf("%s, %s", join, in.String())
})
return model.DeploymentKey{}, fmt.Errorf("missing digests %s", m)
}

// Associate the artefacts with the deployment
for _, row := range artefactDigests {
artefact := artefactsByDigest[sha256.FromBytes(row.Digest)]
for _, row := range keys {
artefact := artefactsByDigest[row.Digest]
err = tx.db.AssociateArtefactWithDeployment(ctx, dalsql.AssociateArtefactWithDeploymentParams{
Key: deploymentKey,
ArtefactID: row.ID,
Expand Down Expand Up @@ -701,18 +686,27 @@ func (d *DAL) loadDeployment(ctx context.Context, deployment dalsql.GetDeploymen
Key: deployment.Deployment.Key,
Schema: deployment.Deployment.Schema,
}
artefacts, err := d.db.GetDeploymentArtefacts(ctx, deployment.Deployment.ID)
darts, err := d.db.GetDeploymentArtefacts(ctx, deployment.Deployment.ID)

if err != nil {
return nil, libdal.TranslatePGError(err)
}
out.Artefacts = slices.Map(artefacts, func(row dalsql.GetDeploymentArtefactsRow) *model.Artefact {
out.Artefacts, err = slices.MapErr(darts, func(row dalsql.GetDeploymentArtefactsRow) (*model.Artefact, error) {
digest := sha256.FromBytes(row.Digest)
content, err := d.registry.Download(ctx, digest)
if err != nil {
return nil, fmt.Errorf("artefact download failed: %w", libdal.TranslatePGError(err))
}
return &model.Artefact{
Path: row.Path,
Executable: row.Executable,
Content: &artefactReader{id: row.ID, db: d.db},
Digest: sha256.FromBytes(row.Digest),
}
Content: content,
Digest: digest,
}, nil
})
if err != nil {
return nil, fmt.Errorf("an artefact download failed: %w", err)
}
return out, nil
}

Expand Down Expand Up @@ -779,25 +773,3 @@ func (*DAL) checkForExistingDeployments(ctx context.Context, tx *DAL, moduleSche
func sha256esToBytes(digests []sha256.SHA256) [][]byte {
return slices.Map(digests, func(digest sha256.SHA256) []byte { return digest[:] })
}

type artefactReader struct {
id int64
db dalsql.Querier
offset int32
}

func (r *artefactReader) Close() error { return nil }

func (r *artefactReader) Read(p []byte) (n int, err error) {
content, err := r.db.GetArtefactContentRange(context.Background(), r.offset+1, int32(len(p)), r.id)
if err != nil {
return 0, libdal.TranslatePGError(err)
}
copy(p, content)
clen := len(content)
r.offset += int32(clen)
if clen == 0 {
err = io.EOF
}
return clen, err
}
14 changes: 14 additions & 0 deletions internal/buildengine/testdata/alpha/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ require (
github.com/XSAM/otelsql v0.34.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v1.2.1 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.16.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
Expand All @@ -34,15 +38,25 @@ require (
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.5 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.30.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.2 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)

Expand Down
26 changes: 26 additions & 0 deletions internal/buildengine/testdata/alpha/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5aea06b

Please sign in to comment.