diff --git a/Makefile b/Makefile index 3503131..2595d84 100644 --- a/Makefile +++ b/Makefile @@ -31,3 +31,8 @@ image: push: image $(DOCKER) push $(IMAGE) + +update: + buf mod update ./proto + for ref in $$(yq -r '.deps[] | .remote + "/gen/go/" + .owner + "/" + .repository + "/protocolbuffers/go@" + .commit' proto/buf.lock); do go get $$ref; done + go mod tidy diff --git a/cli/run.go b/cli/run.go index c081a78..9be6d2e 100644 --- a/cli/run.go +++ b/cli/run.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "io" "log/slog" "net/http" "os" @@ -20,11 +21,15 @@ import ( "github.com/google/uuid" "github.com/spf13/cobra" + "google.golang.org/protobuf/proto" + + sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1" ) var ( BridgeSession string LocalEndpoint string + Verbose bool ) const defaultEndpoint = "127.0.0.1:8000" @@ -45,15 +50,15 @@ func runCommand() *cobra.Command { Short: "Run a Dispatch application", Long: fmt.Sprintf(`Run a Dispatch application. -The command to start the local application should be specified -after the run command and its options: +The command to start the local application endpoint should be +specified after the run command and its options: dispatch run [options] -- -Dispatch spawns the local application and then dispatches function -calls to it continuously. +Dispatch spawns the local application endpoint and then dispatches +function calls to it continuously. -Dispatch connects to the local application on http://%s. +Dispatch connects to the local application endpoint on http://%s. If the local application is listening on a different host or port, please set the --endpoint option appropriately. The value passed to this option will be exported as the DISPATCH_ENDPOINT_ADDR environment @@ -71,6 +76,10 @@ previous run.`, defaultEndpoint), return runConfigFlow() }, RunE: func(c *cobra.Command, args []string) error { + if Verbose { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + if BridgeSession == "" { BridgeSession = uuid.New().String() } @@ -165,6 +174,7 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) defer wg.Done() err := invoke(ctx, httpClient, bridgeSessionURL, requestID, res) + res.Body.Close() if err != nil { if ctx.Err() == nil { slog.Warn(err.Error()) @@ -209,13 +219,14 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) } cmd.Flags().StringVarP(&BridgeSession, "session", "s", "", "Optional session to resume") - cmd.Flags().StringVarP(&LocalEndpoint, "endpoint", "e", defaultEndpoint, "Host:port that the local application is listening on") + cmd.Flags().StringVarP(&LocalEndpoint, "endpoint", "e", defaultEndpoint, "Host:port that the local application endpoint is listening on") + cmd.Flags().BoolVarP(&Verbose, "verbose", "", false, "Enable verbose logging") return cmd } func poll(ctx context.Context, client *http.Client, url string) (string, *http.Response, error) { - slog.Debug("getting request from API", "url", url) + slog.Debug("getting request from Dispatch", "url", url) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { @@ -253,45 +264,105 @@ func poll(ctx context.Context, client *http.Client, url string) (string, *http.R } func invoke(ctx context.Context, client *http.Client, url, requestID string, bridgeGetRes *http.Response) error { - defer bridgeGetRes.Body.Close() - - slog.Debug("sending request from Dispatch API to local application", "endpoint", LocalEndpoint, "request_id", requestID) + slog.Debug("sending request to local application", "endpoint", LocalEndpoint, "request_id", requestID) - // Extract the nested header/body. + // Extract the nested request header/body. endpointReq, err := http.ReadRequest(bufio.NewReader(bridgeGetRes.Body)) if err != nil { return fmt.Errorf("invalid response from Dispatch API: %v", err) } endpointReq = endpointReq.WithContext(ctx) + // Buffer the request body in memory. + endpointReqBody := &bytes.Buffer{} + if endpointReq.ContentLength > 0 { + endpointReqBody.Grow(int(endpointReq.ContentLength)) + } + _, err = io.Copy(endpointReqBody, endpointReq.Body) + bridgeGetRes.Body.Close() + endpointReq.Body.Close() + if err != nil { + return fmt.Errorf("failed to read response from Dispatch API: %v", err) + } + endpointReq.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(endpointReqBody.Bytes())), nil + } + endpointReq.Body, _ = endpointReq.GetBody() + + // Parse the request body from the API. + var runRequest sdkv1.RunRequest + if err := proto.Unmarshal(endpointReqBody.Bytes(), &runRequest); err != nil { + return fmt.Errorf("invalid response from Dispatch API: %v", err) + } + switch d := runRequest.Directive.(type) { + case *sdkv1.RunRequest_Input: + slog.Debug("calling function", "function", runRequest.Function, "request_id", requestID) + case *sdkv1.RunRequest_PollResult: + slog.Debug("resuming function", "function", runRequest.Function, "call_results", len(d.PollResult.Results), "request_id", requestID) + } + // The RequestURI field must be cleared for client.Do() to // accept the request below. endpointReq.RequestURI = "" - // Forward the request to the local application. + // Forward the request to the local application endpoint. endpointReq.Host = LocalEndpoint endpointReq.URL.Scheme = "http" endpointReq.URL.Host = LocalEndpoint endpointRes, err := client.Do(endpointReq) if err != nil { - return fmt.Errorf("failed to contact local application (%s): %v. Please check that -e,--endpoint is correct.", LocalEndpoint, err) + return fmt.Errorf("failed to contact local application endpoint (%s): %v. Please check that -e,--endpoint is correct.", LocalEndpoint, err) } - defer endpointRes.Body.Close() - bridgeGetRes.Body.Close() - - // Buffer the response from the endpoint. - // TODO: pipe it into the request below - var bufferedEndpointRes bytes.Buffer - if err := endpointRes.Write(&bufferedEndpointRes); err != nil { - return fmt.Errorf("failed to read response from local application (%s): %v", LocalEndpoint, err) + // Buffer the response body in memory. + endpointResBody := &bytes.Buffer{} + if endpointRes.ContentLength > 0 { + endpointResBody.Grow(int(endpointRes.ContentLength)) } + _, err = io.Copy(endpointResBody, endpointRes.Body) endpointRes.Body.Close() + if err != nil { + return fmt.Errorf("failed to read response from local application endpoint (%s): %v", LocalEndpoint, err) + } + endpointRes.Body = io.NopCloser(endpointResBody) - slog.Debug("sending local application's response to Dispatch API", "request_id", requestID) + // Parse the response body from the API. + if endpointRes.StatusCode == http.StatusOK && endpointRes.Header.Get("Content-Type") == "application/proto" { + var runResponse sdkv1.RunResponse + if err := proto.Unmarshal(endpointResBody.Bytes(), &runResponse); err != nil { + return fmt.Errorf("invalid response from local application endpoint (%s): %v", LocalEndpoint, err) + } + switch runResponse.Status { + case sdkv1.Status_STATUS_OK: + switch d := runResponse.Directive.(type) { + case *sdkv1.RunResponse_Exit: + if d.Exit.TailCall != nil { + slog.Debug("function tail-called", "function", runRequest.Function, "tail_call", d.Exit.TailCall.Function, "request_id", requestID) + } else { + slog.Debug("function call succeeded", "function", runRequest.Function, "request_id", requestID) + } + case *sdkv1.RunResponse_Poll: + slog.Debug("function yielded", "function", runRequest.Function, "calls", len(d.Poll.Calls), "request_id", requestID) + } + default: + slog.Debug("function call failed", "function", runRequest.Function, "status", runResponse.Status, "request_id", requestID) + } + } else { + // The response might indicate some other issue, e.g. it could be a 404 if the function can't be found + slog.Debug("function call failed", "function", runRequest.Function, "http_status", endpointRes.StatusCode, "request_id", requestID) + } + + // Use io.Pipe to convert the response writer into an io.Reader. + pr, pw := io.Pipe() + go func() { + err := endpointRes.Write(pw) + pw.CloseWithError(err) + }() + + slog.Debug("sending response to Dispatch", "request_id", requestID) // Send the response back to the API. - bridgePostReq, err := http.NewRequestWithContext(ctx, "POST", url, bufio.NewReader(&bufferedEndpointRes)) + bridgePostReq, err := http.NewRequestWithContext(ctx, "POST", url, bufio.NewReader(pr)) if err != nil { panic(err) } @@ -302,7 +373,7 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri } bridgePostRes, err := client.Do(bridgePostReq) if err != nil { - return fmt.Errorf("failed to contact Dispatch API: %v", err) + return fmt.Errorf("failed to contact Dispatch API or write response: %v", err) } if bridgePostRes.StatusCode != http.StatusAccepted { return fmt.Errorf("failed to contact Dispatch API: response code %d", bridgePostRes.StatusCode) diff --git a/go.mod b/go.mod index c8d91cc..f61d117 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,18 @@ module github.com/stealthrocket/dispatch go 1.22.0 require ( + buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1 github.com/charmbracelet/bubbles v0.18.0 github.com/charmbracelet/bubbletea v0.25.0 github.com/charmbracelet/lipgloss v0.9.1 github.com/google/uuid v1.6.0 github.com/pelletier/go-toml/v2 v2.2.0 github.com/spf13/cobra v1.8.0 + google.golang.org/protobuf v1.33.0 ) require ( + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index b336693..000f190 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1 h1:SbKXkoZduR4jQ+aCrX71I+dAcHKZPk1SiriRs/XYX9s= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1/go.mod h1:Tgn5bgL220vkFOI0KPStlcClPeOJzAv4uT+V8JXGUnw= +buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1 h1:fo9VjlitQzAnyrzhYMFyxV2iNmNMN2HF4jtQFutblO4= +buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1/go.mod h1:yJei/TBJwZBJ8ZUWCKVKceUHk/8gniSGs812SZA9TEE= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/charmbracelet/bubbles v0.18.0 h1:PYv1A036luoBGroX6VWjQIE9Syf2Wby2oOl/39KLfy0= @@ -12,6 +16,9 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -65,6 +72,11 @@ golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/proto/buf.lock b/proto/buf.lock new file mode 100644 index 0000000..95fc641 --- /dev/null +++ b/proto/buf.lock @@ -0,0 +1,13 @@ +# Generated by buf. DO NOT EDIT. +version: v1 +deps: + - remote: buf.build + owner: bufbuild + repository: protovalidate + commit: e097f827e65240ac9fd4b1158849a8fc + digest: shake256:f19252436fd9ded945631e2ffaaed28247a92c9015ccf55ae99db9fb3d9600c4fdb00fd2d3bd7701026ec2fd4715c5129e6ae517c25a59ba690020cfe80bf8ad + - remote: buf.build + owner: stealthrocket + repository: dispatch-proto + commit: 27bc9b1d8ccb4ab2a37063596845524b + digest: shake256:deb2dca49a7ed8943f1839c0b85fd55f060d2a436e40f33f736f6e979236bb5fab2b5ed8b8dfd1b4883cecf03f541c19a65c26f938c504b285d9a4b8c8d18820 diff --git a/proto/buf.yaml b/proto/buf.yaml new file mode 100644 index 0000000..834a4ac --- /dev/null +++ b/proto/buf.yaml @@ -0,0 +1,9 @@ +version: v1 +deps: +- buf.build/stealthrocket/dispatch-proto +breaking: + use: + - FILE +lint: + use: + - DEFAULT