Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pull module-specific endpoints out into a separate service #2495

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
run: sqlc vet
ensure-frozen-migrations:
name: Ensure Frozen Migrations
if: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'skip-ensure-frozen-migrations') }}
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down Expand Up @@ -106,6 +107,7 @@ jobs:
run: just lint-scripts
proto-breaking:
name: Proto Breaking Change Check
if: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'skip-proto-breaking') }}
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down
1 change: 1 addition & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
g.Go(func() error {
return rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewModuleServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewControllerServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewAdminServiceHandler, admin),
rpc.GRPC(pbconsoleconnect.NewConsoleServiceHandler, console),
Expand Down
542 changes: 275 additions & 267 deletions backend/protos/xyz/block/ftl/v1/ftl.pb.go

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions backend/protos/xyz/block/ftl/v1/ftl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ service VerbService {
option idempotency_level = NO_SIDE_EFFECTS;
}

// Issue a synchronous call to a Verb.
rpc Call(CallRequest) returns (CallResponse);
}

// ModuleService is the service that modules use to interact with the Controller.
service ModuleService {
// Ping service for readiness.
rpc Ping(PingRequest) returns (PingResponse) {
option idempotency_level = NO_SIDE_EFFECTS;
}

// Get configuration state for the module
rpc GetModuleContext(ModuleContextRequest) returns (stream ModuleContextResponse);

Expand All @@ -120,9 +131,6 @@ service VerbService {

// Publish an event to a topic.
rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse);

// Issue a synchronous call to a Verb.
rpc Call(CallRequest) returns (CallResponse);
}

enum DeploymentChangeType {
Expand Down
290 changes: 182 additions & 108 deletions backend/protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go

Large diffs are not rendered by default.

20 changes: 0 additions & 20 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,6 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.ModuleContextResponse]) error {
return connect.NewError(connect.CodeUnimplemented, errors.New("module context must be acquired from the controller"))
}

func (s *Service) AcquireLease(context.Context, *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
return connect.NewError(connect.CodeUnimplemented, errors.New("leases must be acquired from the controller"))
}

func (s *Service) SendFSMEvent(context.Context, *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("FSM events must be sent to the controller"))
}

func (s *Service) SetNextFSMEvent(context.Context, *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("FSM events must be set by the controller"))
}

func (s *Service) PublishEvent(context.Context, *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("topic events must be sent to the controller"))
}

func (s *Service) Deploy(ctx context.Context, req *connect.Request[ftlv1.DeployRequest]) (response *connect.Response[ftlv1.DeployResponse], err error) {
if err, ok := s.registrationFailure.Load().Get(); ok {
observability.Deployment.Failure(ctx, optional.None[string]())
Expand Down
55 changes: 39 additions & 16 deletions frontend/src/protos/xyz/block/ftl/v1/ftl_connect.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go-runtime/ftl/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type leaseClient struct {
var _ modulecontext.LeaseClient = &leaseClient{}

func (c *leaseClient) Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error {
c.stream = rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx).AcquireLease(ctx)
c.stream = rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx).AcquireLease(ctx)
req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)}
if err := c.stream.Send(req); err != nil {
if connect.CodeOf(err) == connect.CodeResourceExhausted {
Expand Down
6 changes: 3 additions & 3 deletions go-runtime/internal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *RealFTL) GetSecret(_ context.Context, name string, dest any) error {
}

func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any) error {
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
Expand All @@ -69,7 +69,7 @@ func (r *RealFTL) FSMSend(ctx context.Context, fsm, instance string, event any)
}

func (r *RealFTL) FSMNext(ctx context.Context, fsm, instance string, event any) error {
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
Expand All @@ -91,7 +91,7 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any
if topic.Module != caller.Module {
return fmt.Errorf("can not publish to another module's topic: %s", topic)
}
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
client := rpc.ClientFromContext[ftlv1connect.ModuleServiceClient](ctx)
body, err := encoding.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
Expand Down
24 changes: 3 additions & 21 deletions go-runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ type UserVerbConfig struct {
// This function is intended to be used by the code generator.
func NewUserVerbServer(projectName string, moduleName string, handlers ...Handler) plugin.Constructor[ftlv1connect.VerbServiceHandler, UserVerbConfig] {
return func(ctx context.Context, uc UserVerbConfig) (context.Context, ftlv1connect.VerbServiceHandler, error) {
moduleServiceClient := rpc.Dial(ftlv1connect.NewModuleServiceClient, uc.FTLEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, moduleServiceClient)
verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, uc.FTLEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, verbServiceClient)

moduleContextSupplier := modulecontext.NewModuleContextSupplier(verbServiceClient)
moduleContextSupplier := modulecontext.NewModuleContextSupplier(moduleServiceClient)
dynamicCtx, err := modulecontext.NewDynamicContext(ctx, moduleContextSupplier, moduleName)
if err != nil {
return nil, nil, fmt.Errorf("could not get config: %w", err)
Expand Down Expand Up @@ -169,26 +171,6 @@ func (m *moduleServer) Call(ctx context.Context, req *connect.Request[ftlv1.Call
}), nil
}

func (m *moduleServer) GetModuleContext(_ context.Context, _ *connect.Request[ftlv1.ModuleContextRequest], _ *connect.ServerStream[ftlv1.ModuleContextResponse]) error {
return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("GetModuleContext not implemented"))
}

func (m *moduleServer) AcquireLease(context.Context, *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("AcquireLease not implemented"))
}

func (m *moduleServer) Ping(_ context.Context, _ *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (m *moduleServer) SendFSMEvent(context.Context, *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("SendFSMEvent not implemented"))
}

func (m *moduleServer) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (*connect.Response[ftlv1.SendFSMEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("SetNextFSMEvent not implemented"))
}

func (m *moduleServer) PublishEvent(context.Context, *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("PublishEvent not implemented"))
}
4 changes: 2 additions & 2 deletions internal/modulecontext/module_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ type ModuleContextSupplier interface {
}

type grpcModuleContextSupplier struct {
client ftlv1connect.VerbServiceClient
client ftlv1connect.ModuleServiceClient
}

func NewModuleContextSupplier(client ftlv1connect.VerbServiceClient) ModuleContextSupplier {
func NewModuleContextSupplier(client ftlv1connect.ModuleServiceClient) ModuleContextSupplier {
return ModuleContextSupplier(grpcModuleContextSupplier{client})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.LeaseFailedException;
import xyz.block.ftl.LeaseHandle;
import xyz.block.ftl.v1.AcquireLeaseRequest;
import xyz.block.ftl.v1.AcquireLeaseResponse;
import xyz.block.ftl.v1.CallRequest;
import xyz.block.ftl.v1.CallResponse;
import xyz.block.ftl.v1.ModuleContextRequest;
import xyz.block.ftl.v1.ModuleContextResponse;
import xyz.block.ftl.v1.PublishEventRequest;
import xyz.block.ftl.v1.PublishEventResponse;
import xyz.block.ftl.v1.VerbServiceGrpc;
import xyz.block.ftl.v1.*;
import xyz.block.ftl.v1.schema.Ref;

public class FTLController implements LeaseClient {
Expand All @@ -39,6 +31,7 @@ public class FTLController implements LeaseClient {
private boolean waiters = false;

final VerbServiceGrpc.VerbServiceStub verbService;
final ModuleServiceGrpc.ModuleServiceStub moduleService;
final StreamObserver<ModuleContextResponse> moduleObserver = new ModuleObserver();

private static volatile FTLController controller;
Expand Down Expand Up @@ -73,8 +66,9 @@ public static FTLController instance() {
channelBuilder.usePlaintext();
}
var channel = channelBuilder.build();
moduleService = ModuleServiceGrpc.newStub(channel);
moduleService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
verbService = VerbServiceGrpc.newStub(channel);
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
}

public byte[] getSecret(String secretName) {
Expand Down Expand Up @@ -138,7 +132,7 @@ public void onCompleted() {

public void publishEvent(String topic, String callingVerbName, byte[] event) {
CompletableFuture<?> cf = new CompletableFuture<>();
verbService.publishEvent(PublishEventRequest.newBuilder()
moduleService.publishEvent(PublishEventRequest.newBuilder()
.setCaller(callingVerbName).setBody(ByteString.copyFrom(event))
.setTopic(Ref.newBuilder().setModule(moduleName).setName(topic).build()).build(),
new StreamObserver<PublishEventResponse>() {
Expand Down Expand Up @@ -166,7 +160,7 @@ public void onCompleted() {

public LeaseHandle acquireLease(Duration duration, String... keys) throws LeaseFailedException {
CompletableFuture<?> cf = new CompletableFuture<>();
var client = verbService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
var client = moduleService.acquireLease(new StreamObserver<AcquireLeaseResponse>() {
@Override
public void onNext(AcquireLeaseResponse value) {
cf.complete(null);
Expand Down Expand Up @@ -255,7 +249,7 @@ public void onError(Throwable throwable) {
}
}
if (failCount.incrementAndGet() < 5) {
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
moduleService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,6 @@ public void call(CallRequest request, StreamObserver<CallResponse> responseObser
}
}

@Override
public void publishEvent(PublishEventRequest request, StreamObserver<PublishEventResponse> responseObserver) {
super.publishEvent(request, responseObserver);
}

@Override
public void sendFSMEvent(SendFSMEventRequest request, StreamObserver<SendFSMEventResponse> responseObserver) {
super.sendFSMEvent(request, responseObserver);
}

@Override
public StreamObserver<AcquireLeaseRequest> acquireLease(StreamObserver<AcquireLeaseResponse> responseObserver) {
return super.acquireLease(responseObserver);
}

@Override
public void getModuleContext(ModuleContextRequest request, StreamObserver<ModuleContextResponse> responseObserver) {
super.getModuleContext(request, responseObserver);
}

@Override
public void ping(PingRequest request, StreamObserver<PingResponse> responseObserver) {
responseObserver.onNext(PingResponse.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public void start() {
var addr = new InetSocketAddress("127.0.0.1", 0);
grpcServer = NettyServerBuilder.forAddress(addr)
.addService(new TestVerbServer())
.addService(new TestModuleServer())
.build();
try {
grpcServer.start();
Expand Down
Loading
Loading