Skip to content

Commit

Permalink
refactor: pull module-specific endpoints out into a separate service
Browse files Browse the repository at this point in the history
ModuleService is implemented by the controller and contains all of the
endpoints that a module uses interact with FTL features - leases, FSM,
etc.

Note that the JVM ftl-runtime builds and passes tests, but does log:

```
2024-08-24 22:48:15,943 ERROR [xyz.blo.ftl.run.FTLController] (grpc-default-executor-1) GRPC connection error: io.grpc.StatusRuntimeException: UNIMPLEMENTED: Method not found: xyz.block.ftl.v1.ModuleService/GetModuleContext
	at io.grpc.Status.asRuntimeException(Status.java:533)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
```
  • Loading branch information
alecthomas committed Aug 24, 2024
1 parent ed98645 commit c67c16f
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 504 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
run: sqlc vet
ensure-frozen-migrations:
name: Ensure Frozen Migrations
if: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'skip-ensure-frozen-migrations') }}
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down Expand Up @@ -106,6 +107,7 @@ jobs:
run: just lint-scripts
proto-breaking:
name: Proto Breaking Change Check
if: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'skip-proto-breaking') }}
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down
1 change: 1 addition & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
g.Go(func() error {
return rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewModuleServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewControllerServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewAdminServiceHandler, admin),
rpc.GRPC(pbconsoleconnect.NewConsoleServiceHandler, console),
Expand Down
542 changes: 275 additions & 267 deletions backend/protos/xyz/block/ftl/v1/ftl.pb.go

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions backend/protos/xyz/block/ftl/v1/ftl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ service VerbService {
option idempotency_level = NO_SIDE_EFFECTS;
}

// Issue a synchronous call to a Verb.
rpc Call(CallRequest) returns (CallResponse);
}

// ModuleService is the service that modules use to interact with the Controller.
service ModuleService {
// Ping service for readiness.
rpc Ping(PingRequest) returns (PingResponse) {
option idempotency_level = NO_SIDE_EFFECTS;
}

// Get configuration state for the module
rpc GetModuleContext(ModuleContextRequest) returns (stream ModuleContextResponse);

Expand All @@ -120,9 +131,6 @@ service VerbService {

// Publish an event to a topic.
rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse);

// Issue a synchronous call to a Verb.
rpc Call(CallRequest) returns (CallResponse);
}

enum DeploymentChangeType {
Expand Down
290 changes: 182 additions & 108 deletions backend/protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go

Large diffs are not rendered by default.

20 changes: 0 additions & 20 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,6 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.ModuleContextResponse]) error {
return connect.NewError(connect.CodeUnimplemented, errors.New("module context must be acquired from the controller"))
}

func (s *Service) AcquireLease(context.Context, *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
return connect.NewError(connect.CodeUnimplemented, errors.New("leases must be acquired from the controller"))
}

func (s *Service) SendFSMEvent(context.Context, *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("FSM events must be sent to the controller"))
}

func (s *Service) SetNextFSMEvent(context.Context, *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("FSM events must be set by the controller"))
}

func (s *Service) PublishEvent(context.Context, *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("topic events must be sent to the controller"))
}

func (s *Service) Deploy(ctx context.Context, req *connect.Request[ftlv1.DeployRequest]) (response *connect.Response[ftlv1.DeployResponse], err error) {
if err, ok := s.registrationFailure.Load().Get(); ok {
observability.Deployment.Failure(ctx, optional.None[string]())
Expand Down
55 changes: 39 additions & 16 deletions frontend/src/protos/xyz/block/ftl/v1/ftl_connect.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go-runtime/ftl/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type leaseClient struct {
var _ modulecontext.LeaseClient = &leaseClient{}

func (c *leaseClient) Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error {
c.stream = rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx).AcquireLease(ctx)
c.stream = rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx).AcquireLease(ctx)
req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)}
if err := c.stream.Send(req); err != nil {
if connect.CodeOf(err) == connect.CodeResourceExhausted {
Expand Down
6 changes: 3 additions & 3 deletions go-runtime/internal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *RealFTL) GetSecret(_ context.Context, name string, dest any) error {
}

func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any) error {
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
Expand All @@ -69,7 +69,7 @@ func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any)
}

func (r *RealFTL) FSMNext(ctx context.Context, fsm, instance string, event any) error {
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
Expand All @@ -91,7 +91,7 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any
if topic.Module != caller.Module {
return fmt.Errorf("can not publish to another module's topic: %s", topic)
}
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
Expand Down
24 changes: 3 additions & 21 deletions go-runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ type UserVerbConfig struct {
// This function is intended to be used by the code generator.
func NewUserVerbServer(projectName string, moduleName string, handlers ...Handler) plugin.Constructor[ftlv1connect.VerbServiceHandler, UserVerbConfig] {
return func(ctx context.Context, uc UserVerbConfig) (context.Context, ftlv1connect.VerbServiceHandler, error) {
moduleServiceClient := rpc.Dial(ftlv1connect.NewModuleServiceClient, uc.FTLEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, moduleServiceClient)
verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, uc.FTLEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, verbServiceClient)

moduleContextSupplier := modulecontext.NewModuleContextSupplier(verbServiceClient)
moduleContextSupplier := modulecontext.NewModuleContextSupplier(moduleServiceClient)
dynamicCtx, err := modulecontext.NewDynamicContext(ctx, moduleContextSupplier, moduleName)
if err != nil {
return nil, nil, fmt.Errorf("could not get config: %w", err)
Expand Down Expand Up @@ -169,26 +171,6 @@ func (m *moduleServer) Call(ctx context.Context, req *connect.Request[ftlv1.Call
}), nil
}

func (m *moduleServer) GetModuleContext(_ context.Context, _ *connect.Request[ftlv1.ModuleContextRequest], _ *connect.ServerStream[ftlv1.ModuleContextResponse]) error {
return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("GetModuleContext not implemented"))
}

func (m *moduleServer) AcquireLease(context.Context, *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("AcquireLease not implemented"))
}

func (m *moduleServer) Ping(_ context.Context, _ *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (m *moduleServer) SendFSMEvent(context.Context, *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("SendFSMEvent not implemented"))
}

func (m *moduleServer) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("SetNextFSMEvent not implemented"))
}

func (m *moduleServer) PublishEvent(context.Context, *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("PublishEvent not implemented"))
}
4 changes: 2 additions & 2 deletions internal/modulecontext/module_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ type ModuleContextSupplier interface {
}

type grpcModuleContextSupplier struct {
client ftlv1connect.VerbServiceClient
client ftlv1connect.ModuleServiceClient
}

func NewModuleContextSupplier(client ftlv1connect.VerbServiceClient) ModuleContextSupplier {
func NewModuleContextSupplier(client ftlv1connect.ModuleServiceClient) ModuleContextSupplier {
return ModuleContextSupplier(grpcModuleContextSupplier{client})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.LeaseFailedException;
import xyz.block.ftl.LeaseHandle;
import xyz.block.ftl.v1.AcquireLeaseRequest;
import xyz.block.ftl.v1.AcquireLeaseResponse;
import xyz.block.ftl.v1.CallRequest;
import xyz.block.ftl.v1.CallResponse;
import xyz.block.ftl.v1.ModuleContextRequest;
import xyz.block.ftl.v1.ModuleContextResponse;
import xyz.block.ftl.v1.PublishEventRequest;
import xyz.block.ftl.v1.PublishEventResponse;
import xyz.block.ftl.v1.VerbServiceGrpc;
import xyz.block.ftl.v1.*;
import xyz.block.ftl.v1.schema.Ref;

public class FTLController implements LeaseClient {
Expand All @@ -39,6 +31,7 @@ public class FTLController implements LeaseClient {
private boolean waiters = false;

final VerbServiceGrpc.VerbServiceStub verbService;
final ModuleServiceGrpc.ModuleServiceStub moduleService;
final StreamObserver<ModuleContextResponse> moduleObserver = new ModuleObserver();

private static volatile FTLController controller;
Expand Down Expand Up @@ -73,8 +66,9 @@ public static FTLController instance() {
channelBuilder.usePlaintext();
}
var channel = channelBuilder.build();
moduleService = ModuleServiceGrpc.newStub(channel);
moduleService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
verbService = VerbServiceGrpc.newStub(channel);
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
}

public byte[] getSecret(String secretName) {
Expand Down Expand Up @@ -138,7 +132,7 @@ public void onCompleted() {

public void publishEvent(String topic, String callingVerbName, byte[] event) {
CompletableFuture<?> cf = new CompletableFuture<>();
verbService.publishEvent(PublishEventRequest.newBuilder()
moduleService.publishEvent(PublishEventRequest.newBuilder()
.setCaller(callingVerbName).setBody(ByteString.copyFrom(event))
.setTopic(Ref.newBuilder().setModule(moduleName).setName(topic).build()).build(),
new StreamObserver<PublishEventResponse>() {
Expand Down Expand Up @@ -166,7 +160,7 @@ public void onCompleted() {

public LeaseHandle acquireLease(Duration duration, String... keys) throws LeaseFailedException {
CompletableFuture<?> cf = new CompletableFuture<>();
var client = verbService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
var client = moduleService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
@Override
public void onNext(AcquireLeaseResponse value) {
cf.complete(null);
Expand Down Expand Up @@ -255,7 +249,7 @@ public void onError(Throwable throwable) {
}
}
if (failCount.incrementAndGet() < 5) {
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
moduleService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,6 @@ public void call(CallRequest request, StreamObserver<CallResponse> responseObser
}
}

@Override
public void publishEvent(PublishEventRequest request, StreamObserver<PublishEventResponse> responseObserver) {
super.publishEvent(request, responseObserver);
}

@Override
public void sendFSMEvent(SendFSMEventRequest request, StreamObserver<SendFSMEventResponse> responseObserver) {
super.sendFSMEvent(request, responseObserver);
}

@Override
public StreamObserver<AcquireLeaseRequest> acquireLease(StreamObserver<AcquireLeaseResponse> responseObserver) {
return super.acquireLease(responseObserver);
}

@Override
public void getModuleContext(ModuleContextRequest request, StreamObserver<ModuleContextResponse> responseObserver) {
super.getModuleContext(request, responseObserver);
}

@Override
public void ping(PingRequest request, StreamObserver<PingResponse> responseObserver) {
responseObserver.onNext(PingResponse.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public void start() {
var addr = new InetSocketAddress("127.0.0.1", 0);
grpcServer = NettyServerBuilder.forAddress(addr)
.addService(new TestVerbServer())
.addService(new TestModuleServer())
.build();
try {
grpcServer.start();
Expand Down
Loading

0 comments on commit c67c16f

Please sign in to comment.