Skip to content

Commit

Permalink
feat: expose leases to user code (#1406)
Browse files Browse the repository at this point in the history
[Leases](https://hackmd.io/@ftl/Sym_GKEb0) provide exclusive access to a
resource identified by a unique key with a timeout. If the timeout
expires due to the lease holder terminating, the lease will be released.

Usage is straightforward:

```go
lease, err := ftl.Lease(ctx, 10*time.Second, "update", "user", "123")
if err != nil {
        return err
}
defer lease.Release()

// ...
```

Note that leases are currently not tracked in the schema.
  • Loading branch information
alecthomas authored May 4, 2024
1 parent acece7b commit 98db787
Show file tree
Hide file tree
Showing 46 changed files with 1,492 additions and 798 deletions.
5 changes: 3 additions & 2 deletions Justfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set positional-arguments
set shell := ["bash", "-c"]

WATCHEXEC_ARGS := "-e proto -e go -e sql -f sqlc.yaml"
RELEASE := "build/release"
VERSION := `git describe --tags --always --dirty | sed -e 's/^v//'`
KT_RUNTIME_OUT := "kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT.jar"
Expand Down Expand Up @@ -30,11 +31,11 @@ clean:

# Live rebuild the ftl binary whenever source changes.
live-rebuild:
watchexec -e go -e sql -f sqlc.yaml -- "just build-sqlc && just build ftl"
watchexec {{WATCHEXEC_ARGS}} -- "just build-sqlc && just build ftl"

# Run "ftl dev" with live-reloading whenever source changes.
dev *args:
watchexec -r -e go -e sql -f sqlc.yaml -- "just build-sqlc && ftl dev {{args}}"
watchexec -r {{WATCHEXEC_ARGS}} -- "just build-sqlc && ftl dev {{args}}"

# Build everything
build-all: build-frontend build-generate build-protos build-sqlc build-zips
Expand Down
31 changes: 31 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scaling/localscaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
Expand Down Expand Up @@ -651,6 +652,36 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
return connect.NewResponse(response), nil
}

// AcquireLease acquires a lease on behalf of a module.
//
// This is a bidirectional stream where each request from the client must be
// responded to with an empty response.
func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
var lease leases.Lease
for {
msg, err := stream.Receive()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not receive lease request: %w", err))
}
if lease == nil {
lease, err = s.dal.AcquireLease(ctx, leases.ModuleKey(msg.Module, msg.Key...), msg.Ttl.AsDuration())
if err != nil {
if errors.Is(err, leases.ErrConflict) {
return connect.NewError(connect.CodeResourceExhausted, fmt.Errorf("lease is held: %w", err))
}
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not acquire lease: %w", err))
}
defer lease.Release() //nolint:errcheck
}
if err = stream.Send(&ftlv1.AcquireLeaseResponse{}); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send lease response: %w", err))
}
}
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "")
}
Expand Down
Loading

0 comments on commit 98db787

Please sign in to comment.