Skip to content

Commit

Permalink
Merge pull request #21 from stealthrocket/log-calls-and-results
Browse files Browse the repository at this point in the history
Log calls and results
  • Loading branch information
chriso authored Apr 19, 2024
2 parents 2f34ae5 + bacb1c2 commit f7ca0bb
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 24 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ image:

push: image
$(DOCKER) push $(IMAGE)

update:
buf mod update ./proto
for ref in $$(yq -r '.deps[] | .remote + "/gen/go/" + .owner + "/" + .repository + "/protocolbuffers/go@" + .commit' proto/buf.lock); do go get $$ref; done
go mod tidy
119 changes: 95 additions & 24 deletions cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os"
Expand All @@ -20,11 +21,15 @@ import (

"github.com/google/uuid"
"github.com/spf13/cobra"
"google.golang.org/protobuf/proto"

sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
)

var (
BridgeSession string
LocalEndpoint string
Verbose bool
)

const defaultEndpoint = "127.0.0.1:8000"
Expand All @@ -45,15 +50,15 @@ func runCommand() *cobra.Command {
Short: "Run a Dispatch application",
Long: fmt.Sprintf(`Run a Dispatch application.
The command to start the local application should be specified
after the run command and its options:
The command to start the local application endpoint should be
specified after the run command and its options:
dispatch run [options] -- <command>
Dispatch spawns the local application and then dispatches function
calls to it continuously.
Dispatch spawns the local application endpoint and then dispatches
function calls to it continuously.
Dispatch connects to the local application on http://%s.
Dispatch connects to the local application endpoint on http://%s.
If the local application is listening on a different host or port,
please set the --endpoint option appropriately. The value passed to
this option will be exported as the DISPATCH_ENDPOINT_ADDR environment
Expand All @@ -71,6 +76,10 @@ previous run.`, defaultEndpoint),
return runConfigFlow()
},
RunE: func(c *cobra.Command, args []string) error {
if Verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
}

if BridgeSession == "" {
BridgeSession = uuid.New().String()
}
Expand Down Expand Up @@ -165,6 +174,7 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession)
defer wg.Done()

err := invoke(ctx, httpClient, bridgeSessionURL, requestID, res)
res.Body.Close()
if err != nil {
if ctx.Err() == nil {
slog.Warn(err.Error())
Expand Down Expand Up @@ -209,13 +219,14 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession)
}

cmd.Flags().StringVarP(&BridgeSession, "session", "s", "", "Optional session to resume")
cmd.Flags().StringVarP(&LocalEndpoint, "endpoint", "e", defaultEndpoint, "Host:port that the local application is listening on")
cmd.Flags().StringVarP(&LocalEndpoint, "endpoint", "e", defaultEndpoint, "Host:port that the local application endpoint is listening on")
cmd.Flags().BoolVarP(&Verbose, "verbose", "", false, "Enable verbose logging")

return cmd
}

func poll(ctx context.Context, client *http.Client, url string) (string, *http.Response, error) {
slog.Debug("getting request from API", "url", url)
slog.Debug("getting request from Dispatch", "url", url)

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
Expand Down Expand Up @@ -253,45 +264,105 @@ func poll(ctx context.Context, client *http.Client, url string) (string, *http.R
}

func invoke(ctx context.Context, client *http.Client, url, requestID string, bridgeGetRes *http.Response) error {
defer bridgeGetRes.Body.Close()

slog.Debug("sending request from Dispatch API to local application", "endpoint", LocalEndpoint, "request_id", requestID)
slog.Debug("sending request to local application", "endpoint", LocalEndpoint, "request_id", requestID)

// Extract the nested header/body.
// Extract the nested request header/body.
endpointReq, err := http.ReadRequest(bufio.NewReader(bridgeGetRes.Body))
if err != nil {
return fmt.Errorf("invalid response from Dispatch API: %v", err)
}
endpointReq = endpointReq.WithContext(ctx)

// Buffer the request body in memory.
endpointReqBody := &bytes.Buffer{}
if endpointReq.ContentLength > 0 {
endpointReqBody.Grow(int(endpointReq.ContentLength))
}
_, err = io.Copy(endpointReqBody, endpointReq.Body)
bridgeGetRes.Body.Close()
endpointReq.Body.Close()
if err != nil {
return fmt.Errorf("failed to read response from Dispatch API: %v", err)
}
endpointReq.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(endpointReqBody.Bytes())), nil
}
endpointReq.Body, _ = endpointReq.GetBody()

// Parse the request body from the API.
var runRequest sdkv1.RunRequest
if err := proto.Unmarshal(endpointReqBody.Bytes(), &runRequest); err != nil {
return fmt.Errorf("invalid response from Dispatch API: %v", err)
}
switch d := runRequest.Directive.(type) {
case *sdkv1.RunRequest_Input:
slog.Debug("calling function", "function", runRequest.Function, "request_id", requestID)
case *sdkv1.RunRequest_PollResult:
slog.Debug("resuming function", "function", runRequest.Function, "call_results", len(d.PollResult.Results), "request_id", requestID)
}

// The RequestURI field must be cleared for client.Do() to
// accept the request below.
endpointReq.RequestURI = ""

// Forward the request to the local application.
// Forward the request to the local application endpoint.
endpointReq.Host = LocalEndpoint
endpointReq.URL.Scheme = "http"
endpointReq.URL.Host = LocalEndpoint
endpointRes, err := client.Do(endpointReq)
if err != nil {
return fmt.Errorf("failed to contact local application (%s): %v. Please check that -e,--endpoint is correct.", LocalEndpoint, err)
return fmt.Errorf("failed to contact local application endpoint (%s): %v. Please check that -e,--endpoint is correct.", LocalEndpoint, err)
}
defer endpointRes.Body.Close()

bridgeGetRes.Body.Close()

// Buffer the response from the endpoint.
// TODO: pipe it into the request below
var bufferedEndpointRes bytes.Buffer
if err := endpointRes.Write(&bufferedEndpointRes); err != nil {
return fmt.Errorf("failed to read response from local application (%s): %v", LocalEndpoint, err)
// Buffer the response body in memory.
endpointResBody := &bytes.Buffer{}
if endpointRes.ContentLength > 0 {
endpointResBody.Grow(int(endpointRes.ContentLength))
}
_, err = io.Copy(endpointResBody, endpointRes.Body)
endpointRes.Body.Close()
if err != nil {
return fmt.Errorf("failed to read response from local application endpoint (%s): %v", LocalEndpoint, err)
}
endpointRes.Body = io.NopCloser(endpointResBody)

slog.Debug("sending local application's response to Dispatch API", "request_id", requestID)
// Parse the response body from the API.
if endpointRes.StatusCode == http.StatusOK && endpointRes.Header.Get("Content-Type") == "application/proto" {
var runResponse sdkv1.RunResponse
if err := proto.Unmarshal(endpointResBody.Bytes(), &runResponse); err != nil {
return fmt.Errorf("invalid response from local application endpoint (%s): %v", LocalEndpoint, err)
}
switch runResponse.Status {
case sdkv1.Status_STATUS_OK:
switch d := runResponse.Directive.(type) {
case *sdkv1.RunResponse_Exit:
if d.Exit.TailCall != nil {
slog.Debug("function tail-called", "function", runRequest.Function, "tail_call", d.Exit.TailCall.Function, "request_id", requestID)
} else {
slog.Debug("function call succeeded", "function", runRequest.Function, "request_id", requestID)
}
case *sdkv1.RunResponse_Poll:
slog.Debug("function yielded", "function", runRequest.Function, "calls", len(d.Poll.Calls), "request_id", requestID)
}
default:
slog.Debug("function call failed", "function", runRequest.Function, "status", runResponse.Status, "request_id", requestID)
}
} else {
// The response might indicate some other issue, e.g. it could be a 404 if the function can't be found
slog.Debug("function call failed", "function", runRequest.Function, "http_status", endpointRes.StatusCode, "request_id", requestID)
}

// Use io.Pipe to convert the response writer into an io.Reader.
pr, pw := io.Pipe()
go func() {
err := endpointRes.Write(pw)
pw.CloseWithError(err)
}()

slog.Debug("sending response to Dispatch", "request_id", requestID)

// Send the response back to the API.
bridgePostReq, err := http.NewRequestWithContext(ctx, "POST", url, bufio.NewReader(&bufferedEndpointRes))
bridgePostReq, err := http.NewRequestWithContext(ctx, "POST", url, bufio.NewReader(pr))
if err != nil {
panic(err)
}
Expand All @@ -302,7 +373,7 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri
}
bridgePostRes, err := client.Do(bridgePostReq)
if err != nil {
return fmt.Errorf("failed to contact Dispatch API: %v", err)
return fmt.Errorf("failed to contact Dispatch API or write response: %v", err)
}
if bridgePostRes.StatusCode != http.StatusAccepted {
return fmt.Errorf("failed to contact Dispatch API: response code %d", bridgePostRes.StatusCode)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ module github.com/stealthrocket/dispatch
go 1.22.0

require (
buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1
github.com/charmbracelet/bubbles v0.18.0
github.com/charmbracelet/bubbletea v0.25.0
github.com/charmbracelet/lipgloss v0.9.1
github.com/google/uuid v1.6.0
github.com/pelletier/go-toml/v2 v2.2.0
github.com/spf13/cobra v1.8.0
google.golang.org/protobuf v1.33.0
)

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1 h1:SbKXkoZduR4jQ+aCrX71I+dAcHKZPk1SiriRs/XYX9s=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1/go.mod h1:Tgn5bgL220vkFOI0KPStlcClPeOJzAv4uT+V8JXGUnw=
buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1 h1:fo9VjlitQzAnyrzhYMFyxV2iNmNMN2HF4jtQFutblO4=
buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1/go.mod h1:yJei/TBJwZBJ8ZUWCKVKceUHk/8gniSGs812SZA9TEE=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/charmbracelet/bubbles v0.18.0 h1:PYv1A036luoBGroX6VWjQIE9Syf2Wby2oOl/39KLfy0=
Expand All @@ -12,6 +16,9 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -65,6 +72,11 @@ golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
13 changes: 13 additions & 0 deletions proto/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: bufbuild
repository: protovalidate
commit: e097f827e65240ac9fd4b1158849a8fc
digest: shake256:f19252436fd9ded945631e2ffaaed28247a92c9015ccf55ae99db9fb3d9600c4fdb00fd2d3bd7701026ec2fd4715c5129e6ae517c25a59ba690020cfe80bf8ad
- remote: buf.build
owner: stealthrocket
repository: dispatch-proto
commit: 27bc9b1d8ccb4ab2a37063596845524b
digest: shake256:deb2dca49a7ed8943f1839c0b85fd55f060d2a436e40f33f736f6e979236bb5fab2b5ed8b8dfd1b4883cecf03f541c19a65c26f938c504b285d9a4b8c8d18820
9 changes: 9 additions & 0 deletions proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: v1
deps:
- buf.build/stealthrocket/dispatch-proto
breaking:
use:
- FILE
lint:
use:
- DEFAULT

0 comments on commit f7ca0bb

Please sign in to comment.