diff --git a/cli/color.go b/cli/color.go new file mode 100644 index 0000000..54243a4 --- /dev/null +++ b/cli/color.go @@ -0,0 +1,11 @@ +package cli + +import "github.com/charmbracelet/lipgloss" + +var ( + grayColor = lipgloss.Color("#7D7D7D") + whiteColor = lipgloss.Color("#FFFFFF") + redColor = lipgloss.Color("#FF0000") + greenColor = lipgloss.Color("#00FF00") + yellowColor = lipgloss.Color("#FFAA00") +) diff --git a/cli/log.go b/cli/log.go new file mode 100644 index 0000000..069b01c --- /dev/null +++ b/cli/log.go @@ -0,0 +1,108 @@ +package cli + +import ( + "bytes" + "context" + "io" + "log/slog" + "slices" + "sync" + + "github.com/charmbracelet/lipgloss" +) + +var ( + logTimeStyle = lipgloss.NewStyle().Foreground(grayColor) + logAttrStyle = lipgloss.NewStyle().Foreground(grayColor) + + logDebugStyle = lipgloss.NewStyle().Foreground(whiteColor) + logInfoStyle = lipgloss.NewStyle().Foreground(whiteColor) + logWarnStyle = lipgloss.NewStyle().Foreground(yellowColor) + logErrorStyle = lipgloss.NewStyle().Foreground(redColor) +) + +type slogHandler struct { + mu sync.Mutex + stream io.Writer + level slog.Level + + parent *slogHandler + attrs []slog.Attr +} + +func (h *slogHandler) Enabled(ctx context.Context, level slog.Level) bool { + return level >= h.level +} + +func (h *slogHandler) Handle(ctx context.Context, record slog.Record) error { + h.mu.Lock() + defer h.mu.Unlock() + + var b bytes.Buffer + b.WriteString(logTimeStyle.Render(record.Time.Format("2006-01-02 15:04:05.000"))) + b.WriteByte(' ') + b.WriteString(levelString(record.Level)) + b.WriteByte(' ') + b.WriteString(record.Message) + record.Attrs(func(attr slog.Attr) bool { + b.WriteByte(' ') + b.WriteString(logAttrStyle.Render(attr.String())) + return true + }) + for _, attr := range h.attrs { + b.WriteByte(' ') + b.WriteString(logAttrStyle.Render(attr.String())) + } + b.WriteByte('\n') + + _, err := h.stream.Write(b.Bytes()) + return err +} + +func levelString(level slog.Level) string { + switch level { + case slog.LevelDebug: + return logDebugStyle.Render(level.String()) + case slog.LevelInfo: + return logInfoStyle.Render(level.String()) + case slog.LevelWarn: + return logWarnStyle.Render(level.String()) + case slog.LevelError: + return logErrorStyle.Render(level.String()) + default: + return level.String() + } +} + +func (h *slogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + parent := h + if parent.parent != nil { + parent = parent.parent + } + return &slogHandler{ + stream: h.stream, + level: h.level, + parent: parent, + attrs: append(slices.Clip(parent.attrs), attrs...), + } +} + +func (h *slogHandler) WithGroup(group string) slog.Handler { + panic("not implemented") +} + +type prefixLogWriter struct { + stream io.Writer + prefix []byte +} + +func (p *prefixLogWriter) Write(b []byte) (int, error) { + var buffer bytes.Buffer + if _, err := buffer.Write(p.prefix); err != nil { + return 0, err + } + if _, err := buffer.Write(b); err != nil { + return 0, err + } + return p.stream.Write(buffer.Bytes()) +} diff --git a/cli/run.go b/cli/run.go index 74df0c4..76fbff0 100644 --- a/cli/run.go +++ b/cli/run.go @@ -4,9 +4,11 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "log/slog" + "net" "net/http" "os" "os/exec" @@ -20,11 +22,11 @@ import ( "syscall" "time" + sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1" + tea "github.com/charmbracelet/bubbletea" "github.com/google/uuid" "github.com/spf13/cobra" "google.golang.org/protobuf/proto" - - sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1" ) var ( @@ -81,16 +83,34 @@ previous run.`, defaultEndpoint), prefixWidth := max(len("dispatch"), len(arg0)) - if Verbose { + if checkEndpoint(LocalEndpoint, time.Second) { + return fmt.Errorf("cannot start local application on address that's already in use: %v", LocalEndpoint) + } + + // Enable the TUI if this is an interactive session and + // stdout/stderr aren't redirected. + var tui *TUI + var logWriter io.Writer = os.Stderr + if isTerminal(os.Stdin) && isTerminal(os.Stdout) && isTerminal(os.Stderr) { + tui = &TUI{} + logWriter = tui + } + + if Verbose || tui != nil { + level := slog.LevelInfo + if Verbose { + level = slog.LevelDebug + } prefix := []byte(pad("dispatch", prefixWidth) + " | ") if Color { prefix = []byte("\033[32m" + pad("dispatch", prefixWidth) + " \033[90m|\033[0m ") } - // Print Dispatch logs with a prefix in verbose mode. - slog.SetDefault(slog.New(&prefixHandler{ - Handler: slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}), - stream: os.Stderr, - prefix: prefix, + slog.SetDefault(slog.New(&slogHandler{ + stream: &prefixLogWriter{ + stream: logWriter, + prefix: prefix, + }, + level: level, })) } @@ -98,7 +118,7 @@ previous run.`, defaultEndpoint), BridgeSession = uuid.New().String() } - if Verbose { + if Verbose || tui != nil { slog.Info("starting session", "session_id", BridgeSession) } else { dialog(`Starting Dispatch session: %v @@ -117,7 +137,7 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) // to disambiguate Dispatch logs from the local application's logs. var stdout io.ReadCloser var stderr io.ReadCloser - if Verbose { + if Verbose || tui != nil { var err error stdout, err = cmd.StdoutPipe() if err != nil { @@ -131,8 +151,8 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) } defer stderr.Close() } else { - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + cmd.Stdout = logWriter + cmd.Stderr = logWriter } // Pass on environment variables to the local application. @@ -148,6 +168,10 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) "DISPATCH_ENDPOINT_ADDR="+LocalEndpoint, ) + // Set OS-specific process attributes. + cmd.SysProcAttr = &syscall.SysProcAttr{} + setSysProcAttr(cmd.SysProcAttr) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -176,6 +200,27 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) } }() + // Initialize the TUI. + if tui != nil { + p := tea.NewProgram(tui, + tea.WithContext(ctx), + tea.WithoutSignalHandler(), + tea.WithoutCatchPanics()) + wg.Add(1) + go func() { + defer wg.Done() + + if _, err := p.Run(); err != nil && !errors.Is(err, tea.ErrProgramKilled) { + panic(err) + } + // Quitting the TUI sends an implicit interrupt. + select { + case signals <- syscall.SIGINT: + default: + } + }() + } + bridgeSessionURL := fmt.Sprintf("%s/sessions/%s", DispatchBridgeUrl, BridgeSession) // Poll for work in the background. @@ -212,7 +257,7 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) go func() { defer wg.Done() - err := invoke(ctx, httpClient, bridgeSessionURL, requestID, res) + err := invoke(ctx, httpClient, bridgeSessionURL, requestID, res, tui) res.Body.Close() if err != nil { if ctx.Err() == nil { @@ -237,14 +282,14 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) return fmt.Errorf("failed to start %s: %v", strings.Join(args, " "), err) } - if Verbose { + if Verbose || tui != nil { prefix := []byte(pad(arg0, prefixWidth) + " | ") suffix := []byte("\n") if Color { prefix = []byte("\033[35m" + pad(arg0, prefixWidth) + " \033[90m|\033[0m ") } - go printPrefixedLines(os.Stderr, stdout, prefix, suffix) - go printPrefixedLines(os.Stderr, stderr, prefix, suffix) + go printPrefixedLines(logWriter, stdout, prefix, suffix) + go printPrefixedLines(logWriter, stderr, prefix, suffix) } err = cmd.Wait() @@ -260,12 +305,18 @@ Run 'dispatch help run' to learn about Dispatch sessions.`, BridgeSession) err = nil if atomic.LoadInt64(&successfulPolls) > 0 && !Verbose { - dialog("To resume this Dispatch session:\n\n\tdispatch run --session %s -- %s", - BridgeSession, strings.Join(args, " ")) + dispatchArg0 := os.Args[0] + dialog("To resume this Dispatch session:\n\n\t%s run --session %s -- %s", + dispatchArg0, BridgeSession, strings.Join(args, " ")) } } if err != nil { + if r, ok := logWriter.(io.Reader); ok { + // Dump any buffered logs to stderr in this case. + time.Sleep(100 * time.Millisecond) + _, _ = io.Copy(os.Stderr, r) + } return fmt.Errorf("failed to invoke command '%s': %v", strings.Join(args, " "), err) } return nil @@ -317,8 +368,34 @@ func poll(ctx context.Context, client *http.Client, url string) (string, *http.R return requestID, res, nil } -func invoke(ctx context.Context, client *http.Client, url, requestID string, bridgeGetRes *http.Response) error { - slog.Debug("sending request to local application", "endpoint", LocalEndpoint, "request_id", requestID) +// FunctionCallObserver observes function call requests and responses. +// +// The observer may be invoked concurrently from many goroutines. +type FunctionCallObserver interface { + // ObserveRequest observes a RunRequest as it passes from the API through + // the CLI to the local application. + ObserveRequest(*sdkv1.RunRequest) + + // ObserveResponse observes a response to the RunRequest. + // + // If the RunResponse is nil, it means the local application did not return + // a valid response. If the http.Response is not nil, it means an HTTP + // response was generated, but it wasn't a valid RunResponse. The error may + // be present if there was either an error making the HTTP request, or parsing + // the response. + // + // ObserveResponse always comes after a call to ObserveRequest for any given + // RunRequest. + ObserveResponse(*sdkv1.RunRequest, error, *http.Response, *sdkv1.RunResponse) +} + +func invoke(ctx context.Context, client *http.Client, url, requestID string, bridgeGetRes *http.Response, observer FunctionCallObserver) error { + logger := slog.Default() + if Verbose { + logger = slog.With("request_id", requestID) + } + + logger.Debug("sending request to local application", "endpoint", LocalEndpoint) // Extract the nested request header/body. endpointReq, err := http.ReadRequest(bufio.NewReader(bridgeGetRes.Body)) @@ -348,11 +425,15 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri if err := proto.Unmarshal(endpointReqBody.Bytes(), &runRequest); err != nil { return fmt.Errorf("invalid response from Dispatch API: %v", err) } - switch d := runRequest.Directive.(type) { + logger.Debug("parsed request", "function", runRequest.Function, "dispatch_id", runRequest.DispatchId) + switch runRequest.Directive.(type) { case *sdkv1.RunRequest_Input: - slog.Debug("calling function", "function", runRequest.Function, "request_id", requestID) + logger.Info("calling function", "function", runRequest.Function) case *sdkv1.RunRequest_PollResult: - slog.Debug("resuming function", "function", runRequest.Function, "call_results", len(d.PollResult.Results), "request_id", requestID) + logger.Info("resuming function", "function", runRequest.Function) + } + if observer != nil { + observer.ObserveRequest(&runRequest) } // The RequestURI field must be cleared for client.Do() to @@ -365,6 +446,9 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri endpointReq.URL.Host = LocalEndpoint endpointRes, err := client.Do(endpointReq) if err != nil { + if observer != nil { + observer.ObserveResponse(&runRequest, err, nil, nil) + } return fmt.Errorf("failed to contact local application endpoint (%s): %v. Please check that -e,--endpoint is correct.", LocalEndpoint, err) } @@ -376,6 +460,9 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri _, err = io.Copy(endpointResBody, endpointRes.Body) endpointRes.Body.Close() if err != nil { + if observer != nil { + observer.ObserveResponse(&runRequest, err, endpointRes, nil) + } return fmt.Errorf("failed to read response from local application endpoint (%s): %v", LocalEndpoint, err) } endpointRes.Body = io.NopCloser(endpointResBody) @@ -385,6 +472,9 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri if endpointRes.StatusCode == http.StatusOK && endpointRes.Header.Get("Content-Type") == "application/proto" { var runResponse sdkv1.RunResponse if err := proto.Unmarshal(endpointResBody.Bytes(), &runResponse); err != nil { + if observer != nil { + observer.ObserveResponse(&runRequest, err, endpointRes, nil) + } return fmt.Errorf("invalid response from local application endpoint (%s): %v", LocalEndpoint, err) } switch runResponse.Status { @@ -392,19 +482,25 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri switch d := runResponse.Directive.(type) { case *sdkv1.RunResponse_Exit: if d.Exit.TailCall != nil { - slog.Debug("function tail-called", "function", runRequest.Function, "tail_call", d.Exit.TailCall.Function, "request_id", requestID) + logger.Info("function tail-called", "function", runRequest.Function, "tail_call", d.Exit.TailCall.Function) } else { - slog.Debug("function call succeeded", "function", runRequest.Function, "request_id", requestID) + logger.Info("function call succeeded", "function", runRequest.Function) } case *sdkv1.RunResponse_Poll: - slog.Debug("function yielded", "function", runRequest.Function, "calls", len(d.Poll.Calls), "request_id", requestID) + logger.Info("function yielded", "function", runRequest.Function, "calls", len(d.Poll.Calls)) } default: - slog.Debug("function call failed", "function", runRequest.Function, "status", runResponse.Status, "request_id", requestID) + logger.Warn("function call failed", "function", runRequest.Function, "status", statusString(runResponse.Status)) + } + if observer != nil { + observer.ObserveResponse(&runRequest, nil, endpointRes, &runResponse) } } else { // The response might indicate some other issue, e.g. it could be a 404 if the function can't be found - slog.Debug("function call failed", "function", runRequest.Function, "http_status", endpointRes.StatusCode, "request_id", requestID) + logger.Warn("function call failed", "function", runRequest.Function, "http_status", endpointRes.StatusCode) + if observer != nil { + observer.ObserveResponse(&runRequest, nil, endpointRes, nil) + } } // Use io.Pipe to convert the response writer into an io.Reader. @@ -414,7 +510,7 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri pw.CloseWithError(err) }() - slog.Debug("sending response to Dispatch", "request_id", requestID) + logger.Debug("sending response to Dispatch") // Send the response back to the API. bridgePostReq, err := http.NewRequestWithContext(ctx, "POST", url, bufio.NewReader(pr)) @@ -436,7 +532,7 @@ func invoke(ctx context.Context, client *http.Client, url, requestID string, bri case http.StatusNotFound: // A 404 is expected if there's a timeout upstream that's hit // before the response can be sent. - slog.Debug("request is no longer available", "request_id", requestID, "method", "post") + logger.Debug("request is no longer available", "method", "post") return nil default: return fmt.Errorf("failed to contact Dispatch API to send response: response code %d", bridgePostRes.StatusCode) @@ -473,6 +569,18 @@ func cleanup(ctx context.Context, client *http.Client, url, requestID string) er } } +func checkEndpoint(addr string, timeout time.Duration) bool { + slog.Debug("checking endpoint", "addr", addr) + conn, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + slog.Debug("endpoint could not be contacted", "addr", addr, "err", err) + return false + } + slog.Debug("endpoint contacted successfully", "addr", addr) + conn.Close() + return true +} + func withoutEnv(env []string, prefixes ...string) []string { return slices.DeleteFunc(env, func(v string) bool { for _, prefix := range prefixes { @@ -484,24 +592,6 @@ func withoutEnv(env []string, prefixes ...string) []string { }) } -type prefixHandler struct { - slog.Handler - stream io.Writer - prefix []byte - suffix []byte -} - -func (h *prefixHandler) Handle(ctx context.Context, r slog.Record) error { - if _, err := h.stream.Write(h.prefix); err != nil { - return err - } - if err := h.Handler.Handle(ctx, r); err != nil { - return err - } - _, err := h.stream.Write(h.suffix) - return err -} - func printPrefixedLines(w io.Writer, r io.Reader, prefix, suffix []byte) { scanner := bufio.NewScanner(r) buffer := bytes.NewBuffer(nil) diff --git a/cli/run_default.go b/cli/run_default.go new file mode 100644 index 0000000..dd174de --- /dev/null +++ b/cli/run_default.go @@ -0,0 +1,7 @@ +//go:build !linux + +package cli + +import "syscall" + +func setSysProcAttr(attr *syscall.SysProcAttr) {} diff --git a/cli/run_linux.go b/cli/run_linux.go new file mode 100644 index 0000000..6df928e --- /dev/null +++ b/cli/run_linux.go @@ -0,0 +1,7 @@ +package cli + +import "syscall" + +func setSysProcAttr(attr *syscall.SysProcAttr) { + attr.Pdeathsig = syscall.SIGTERM +} diff --git a/cli/tui.go b/cli/tui.go new file mode 100644 index 0000000..a4e141a --- /dev/null +++ b/cli/tui.go @@ -0,0 +1,646 @@ +package cli + +import ( + "bytes" + "errors" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1" + "github.com/charmbracelet/bubbles/help" + "github.com/charmbracelet/bubbles/key" + "github.com/charmbracelet/bubbles/spinner" + "github.com/charmbracelet/bubbles/viewport" + tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/lipgloss" + "github.com/muesli/reflow/ansi" +) + +const refreshInterval = time.Second / 2 + +var ( + // Style for the viewport that contains everything. + viewportStyle = lipgloss.NewStyle().Margin(1, 2) + + // Styles for the dispatch_ ASCII logo. + logoStyle = lipgloss.NewStyle().Foreground(whiteColor) + logoUnderscoreStyle = lipgloss.NewStyle().Foreground(greenColor) + + // Style for the line under the logo as the CLI is initializing/waiting. + statusStyle = lipgloss.NewStyle().Foreground(grayColor) + + // Style for the table of function calls. + tableHeaderStyle = lipgloss.NewStyle().Foreground(whiteColor).Bold(true) + + // Styles for function names and statuses in the table. + pendingStyle = lipgloss.NewStyle().Foreground(grayColor) + retryStyle = lipgloss.NewStyle().Foreground(yellowColor) + errorStyle = lipgloss.NewStyle().Foreground(redColor) + okStyle = lipgloss.NewStyle().Foreground(greenColor) + + // Styles for other components inside the table. + spinnerStyle = lipgloss.NewStyle().Foreground(grayColor) + treeStyle = lipgloss.NewStyle().Foreground(grayColor) +) + +type DispatchID string + +type TUI struct { + mu sync.Mutex + + // Storage for the function call hierarchies. Each function call + // has a "root" node, and nodes can have zero or more children. + // + // FIXME: we never clean up items from these maps + roots map[DispatchID]struct{} + orderedRoots []DispatchID + nodes map[DispatchID]node + + // Storage for logs. + logs bytes.Buffer + + // TUI models / options / flags, used to display the information + // above. + spinner spinner.Model + viewport viewport.Model + help help.Model + ready bool + keys []key.Binding + activeTab tab + tail bool + windowHeight int +} + +type tab int + +const ( + functionsTab tab = iota + logsTab +) + +const tabCount = 2 + +type node struct { + function string + + failures int + responses int + + status sdkv1.Status + error error + + running bool + done bool + + creationTime time.Time + expirationTime time.Time + doneTime time.Time + + children map[DispatchID]struct{} + orderedChildren []DispatchID +} + +type tickMsg struct{} + +func tick() tea.Cmd { + // The TUI isn't in the driver's seat. Instead, we have the layer + // up coordinating the interactions between the Dispatch API and + // the local application. The layer up notifies the TUI of changes + // via the FunctionCallObserver interface. + // + // To keep the TUI up to date, we have a ticker that sends messages + // at a fixed interval. + return tea.Tick(refreshInterval, func(time.Time) tea.Msg { + return tickMsg{} + }) +} + +func (t *TUI) Init() tea.Cmd { + t.spinner = spinner.New(spinner.WithSpinner(spinner.Dot)) + t.help = help.New() + // Note that t.viewport is initialized on the first tea.WindowSizeMsg. + + t.keys = []key.Binding{ + key.NewBinding( + key.WithKeys("tab"), + key.WithHelp("tab", "switch tabs"), + ), + key.NewBinding( + key.WithKeys("t"), + key.WithHelp("t", "tail"), + ), + key.NewBinding( + key.WithKeys("q", "ctrl+c", "esc"), + key.WithHelp("q", "quit"), + ), + } + + t.tail = true + t.activeTab = functionsTab + + return tea.Batch(t.spinner.Tick, tick()) +} + +func (t *TUI) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + // Here we handle "messages" such as key presses, window size changes, + // refresh ticks, etc. Note that the TUI view is updated after messages + // have been processed. + var cmd tea.Cmd + var cmds []tea.Cmd + switch msg := msg.(type) { + case tickMsg: + cmds = append(cmds, tick()) + case spinner.TickMsg: + // Forward this tick to the spinner model so that it updates. + t.spinner, cmd = t.spinner.Update(msg) + cmds = append(cmds, cmd) + case tea.WindowSizeMsg: + // Initialize or resize the viewport. + t.windowHeight = msg.Height + height := msg.Height - 1 // reserve space for help + width := msg.Width + if !t.ready { + t.viewport = viewport.New(width, height) + t.viewport.Style = viewportStyle + t.ready = true + } else { + t.viewport.Width = width + t.viewport.Height = height + } + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c", "q", "esc": + return t, tea.Quit + case "t": + t.tail = true + case "tab": + t.activeTab = (t.activeTab + 1) % tabCount + case "up", "down", "left", "right", "pgup", "pgdown", "ctrl+u", "ctrl+d": + t.tail = false + } + } + // Forward messages to the viewport, e.g. for scroll-back support. + t.viewport, cmd = t.viewport.Update(msg) + cmds = append(cmds, cmd) + return t, tea.Batch(cmds...) +} + +// https://patorjk.com/software/taag/ (Ogre) +var dispatchAscii = []string{ + logoStyle.Render(` _ _ _ _`), + logoStyle.Render(` __| (_)___ _ __ __ _| |_ ___| |__`), + logoStyle.Render(` / _' | / __| '_ \ / _' | __/ __| '_ \`), + logoStyle.Render(`| (_| | \__ \ |_) | (_| | || (__| | | |`) + logoUnderscoreStyle.Render(" _____"), + logoStyle.Render(` \__,_|_|___/ .__/ \__,_|\__\___|_| |_|`) + logoUnderscoreStyle.Render("|_____|"), + logoStyle.Render(` |_|`), + "", +} + +var minWindowHeight = len(dispatchAscii) + 3 + +func (t *TUI) View() string { + if !t.ready { + return statusStyle.Render(strings.Join(append(dispatchAscii, "Initializing...\n"), "\n")) + } + + // Render the correct tab. + switch t.activeTab { + case functionsTab: + t.viewport.SetContent(t.functionCallsView(time.Now())) + case logsTab: + t.viewport.SetContent(t.logs.String()) + } + + // Tail the output, unless the user has tried + // to scroll back (e.g. with arrow keys). + if t.tail { + t.viewport.GotoBottom() + } + + // Shrink the viewport so it contains the content and help line only. + t.viewport.Height = max(minWindowHeight, min(t.viewport.TotalLineCount()+1, t.windowHeight-1)) + + return t.viewport.View() + "\n" + t.help.ShortHelpView(t.keys) +} + +func (t *TUI) ObserveRequest(req *sdkv1.RunRequest) { + // ObserveRequest is part of the FunctionCallObserver interface. + // It's called after a request has been received from the Dispatch API, + // and before the request has been sent to the local application. + + t.mu.Lock() + defer t.mu.Unlock() + + if t.roots == nil { + t.roots = map[DispatchID]struct{}{} + } + if t.nodes == nil { + t.nodes = map[DispatchID]node{} + } + + rootID := t.parseID(req.RootDispatchId) + parentID := t.parseID(req.ParentDispatchId) + id := t.parseID(req.DispatchId) + + // Upsert the root. + if _, ok := t.roots[rootID]; !ok { + t.roots[rootID] = struct{}{} + t.orderedRoots = append(t.orderedRoots, rootID) + } + root, ok := t.nodes[rootID] + if !ok { + root = node{} + } + t.nodes[rootID] = root + + // Upsert the node. + n, ok := t.nodes[id] + if !ok { + n = node{} + } + n.function = req.Function + n.running = true + if req.CreationTime != nil { + n.creationTime = req.CreationTime.AsTime() + } + if n.creationTime.IsZero() { + n.creationTime = time.Now() + } + if req.ExpirationTime != nil { + n.expirationTime = req.ExpirationTime.AsTime() + } + t.nodes[id] = n + + // Upsert the parent and link its child, if applicable. + if parentID != "" { + parent, ok := t.nodes[parentID] + if !ok { + parent = node{} + if parentID != rootID { + panic("not implemented") + } + } + if parent.children == nil { + parent.children = map[DispatchID]struct{}{} + } + if _, ok := parent.children[id]; !ok { + parent.children[id] = struct{}{} + parent.orderedChildren = append(parent.orderedChildren, id) + } + t.nodes[parentID] = parent + } +} + +func (t *TUI) ObserveResponse(req *sdkv1.RunRequest, err error, httpRes *http.Response, res *sdkv1.RunResponse) { + // ObserveResponse is part of the FunctionCallObserver interface. + // It's called after a response has been received from the local + // application, and before the response has been sent to Dispatch. + + t.mu.Lock() + defer t.mu.Unlock() + + id := t.parseID(req.DispatchId) + n := t.nodes[id] + + n.responses++ + n.error = nil + n.status = 0 + n.running = false + + if res != nil { + switch res.Status { + case sdkv1.Status_STATUS_OK: + // noop + case sdkv1.Status_STATUS_INCOMPATIBLE_STATE: + n = node{function: n.function} // reset + default: + n.failures++ + } + + switch d := res.Directive.(type) { + case *sdkv1.RunResponse_Exit: + n.status = res.Status + n.done = terminalStatus(res.Status) + if d.Exit.TailCall != nil { + n = node{function: d.Exit.TailCall.Function} // reset + } + case *sdkv1.RunResponse_Poll: + // noop + } + } else if httpRes != nil { + n.failures++ + n.error = fmt.Errorf("unexpected HTTP status code %d", httpRes.StatusCode) + n.done = terminalHTTPStatusCode(httpRes.StatusCode) + } else if err != nil { + n.failures++ + n.error = err + } + + if n.done && n.doneTime.IsZero() { + n.doneTime = time.Now() + } + + t.nodes[id] = n +} + +func (t *TUI) Write(b []byte) (int, error) { + t.mu.Lock() + defer t.mu.Unlock() + + return t.logs.Write(b) +} + +func (t *TUI) Read(b []byte) (int, error) { + t.mu.Lock() + defer t.mu.Unlock() + + return t.logs.Read(b) +} + +func (t *TUI) parseID(id string) DispatchID { + return DispatchID(id) +} + +func whitespace(width int) string { + return strings.Repeat(" ", width) +} + +func padding(width int, s string) int { + return width - ansi.PrintableRuneWidth(s) +} + +func truncate(width int, s string) string { + var truncated bool + for ansi.PrintableRuneWidth(s) > width { + s = s[:len(s)-1] + truncated = true + } + if truncated { + s = s + "\033[0m" + } + return s +} + +func right(width int, s string) string { + if ansi.PrintableRuneWidth(s) > width { + return truncate(width-3, s) + "..." + } + return whitespace(padding(width, s)) + s +} + +func left(width int, s string) string { + if ansi.PrintableRuneWidth(s) > width { + return truncate(width-3, s) + "..." + } + return s + whitespace(padding(width, s)) +} + +func (t *TUI) functionCallsView(now time.Time) string { + t.mu.Lock() + defer t.mu.Unlock() + + if len(t.roots) == 0 { + return statusStyle.Render(strings.Join(append(dispatchAscii, "Waiting for function calls...\n"), "\n")) + } + + // Render function calls in a hybrid table/tree view. + var b strings.Builder + var rows rowBuffer + for i, rootID := range t.orderedRoots { + if i > 0 { + b.WriteByte('\n') + } + + // Buffer rows in memory. + t.buildRows(now, rootID, nil, &rows) + + // Dynamically size the function call tree column. + maxFunctionWidth := 0 + for i := range rows.rows { + maxFunctionWidth = max(maxFunctionWidth, ansi.PrintableRuneWidth(rows.rows[i].function)) + } + functionColumnWidth := max(9, min(50, maxFunctionWidth)) + + // Render the table. + b.WriteString(tableHeaderView(functionColumnWidth)) + for i := range rows.rows { + b.WriteString(tableRowView(&rows.rows[i], functionColumnWidth)) + } + + rows.reset() + } + + return b.String() +} + +type row struct { + spinner string + attempts int + elapsed time.Duration + function string + status string +} + +type rowBuffer struct { + rows []row +} + +func (b *rowBuffer) add(r row) { + b.rows = append(b.rows, r) +} + +func (b *rowBuffer) reset() { + b.rows = b.rows[:0] +} + +func tableHeaderView(functionColumnWidth int) string { + return whitespace(2) + + left(functionColumnWidth, tableHeaderStyle.Render("Function")) + " " + + right(8, tableHeaderStyle.Render("Attempts")) + " " + + right(10, tableHeaderStyle.Render("Duration")) + " " + + left(30, tableHeaderStyle.Render("Status")) + + "\n" +} + +func tableRowView(r *row, functionColumnWidth int) string { + attemptsStr := strconv.Itoa(r.attempts) + + var elapsedStr string + if r.elapsed > 0 { + elapsedStr = r.elapsed.String() + } else { + elapsedStr = "?" + } + + return left(2, r.spinner) + + left(functionColumnWidth, r.function) + " " + + right(8, attemptsStr) + " " + + right(10, elapsedStr) + " " + + left(30, r.status) + + "\n" +} + +func (t *TUI) buildRows(now time.Time, id DispatchID, isLast []bool, rows *rowBuffer) { + // t.mu must be locked! + + n := t.nodes[id] + + // Render the tree prefix. + var function strings.Builder + for i, last := range isLast { + var s string + if i == len(isLast)-1 { + if last { + s = "└─" + } else { + s = "├─" + } + } else { + if last { + s = " " + } else { + s = "│ " + } + } + function.WriteString(treeStyle.Render(s)) + function.WriteByte(' ') + } + + // Determine what to print, based on the status of the function call. + var style lipgloss.Style + pending := false + if n.done { + if n.status == sdkv1.Status_STATUS_OK { + style = okStyle + } else { + style = errorStyle + } + } else if !n.expirationTime.IsZero() && n.expirationTime.Before(now) { + n.error = errors.New("Expired") + style = errorStyle + n.done = true + n.doneTime = n.expirationTime + } else { + style = pendingStyle + if n.failures > 0 { + style = retryStyle + } + pending = true + } + + // Render the function name. + if n.function != "" { + function.WriteString(style.Render(n.function)) + } else { + function.WriteString(style.Render("(?)")) + } + + // Render the status. + var status string + if n.running { + status = "Running" + style = pendingStyle + } else if n.error != nil { + status = n.error.Error() + } else if n.status != sdkv1.Status_STATUS_UNSPECIFIED { + status = statusString(n.status) + } else if pending && n.responses > 0 { + status = "Suspended" + style = pendingStyle + } else { + status = "Pending" + } + status = style.Render(status) + + var spinner string + if pending { + spinner = spinnerStyle.Render(t.spinner.View()) + } + + attempts := n.failures + 1 + + var elapsed time.Duration + if !n.creationTime.IsZero() { + var tail time.Time + if !n.done { + tail = now + } else { + tail = n.doneTime + } + elapsed = tail.Sub(n.creationTime).Truncate(time.Millisecond) + } + + rows.add(row{spinner, attempts, elapsed, function.String(), status}) + + // Recursively render children. + for i, id := range n.orderedChildren { + last := i == len(n.orderedChildren)-1 + t.buildRows(now, id, append(isLast[:len(isLast):len(isLast)], last), rows) + } +} + +func statusString(status sdkv1.Status) string { + switch status { + case sdkv1.Status_STATUS_OK: + return "OK" + case sdkv1.Status_STATUS_TIMEOUT: + return "Timeout" + case sdkv1.Status_STATUS_THROTTLED: + return "Throttled" + case sdkv1.Status_STATUS_INVALID_ARGUMENT: + return "Invalid response" + case sdkv1.Status_STATUS_TEMPORARY_ERROR: + return "Temporary error" + case sdkv1.Status_STATUS_PERMANENT_ERROR: + return "Permanent error" + case sdkv1.Status_STATUS_INCOMPATIBLE_STATE: + return "Incompatible state" + case sdkv1.Status_STATUS_DNS_ERROR: + return "DNS error" + case sdkv1.Status_STATUS_TCP_ERROR: + return "TCP error" + case sdkv1.Status_STATUS_TLS_ERROR: + return "TLS error" + case sdkv1.Status_STATUS_HTTP_ERROR: + return "HTTP error" + case sdkv1.Status_STATUS_UNAUTHENTICATED: + return "Unauthenticated" + case sdkv1.Status_STATUS_PERMISSION_DENIED: + return "Permission denied" + case sdkv1.Status_STATUS_NOT_FOUND: + return "Not found" + default: + return status.String() + } +} + +func terminalStatus(status sdkv1.Status) bool { + switch status { + case sdkv1.Status_STATUS_TIMEOUT, + sdkv1.Status_STATUS_THROTTLED, + sdkv1.Status_STATUS_TEMPORARY_ERROR, + sdkv1.Status_STATUS_INCOMPATIBLE_STATE, + sdkv1.Status_STATUS_DNS_ERROR, + sdkv1.Status_STATUS_TCP_ERROR, + sdkv1.Status_STATUS_TLS_ERROR, + sdkv1.Status_STATUS_HTTP_ERROR: + return false + default: + return true + } +} + +func terminalHTTPStatusCode(code int) bool { + switch code / 100 { + case 4: + return code != http.StatusRequestTimeout && code != http.StatusTooManyRequests + case 5: + return code == http.StatusNotImplemented + default: + return true + } +} diff --git a/go.mod b/go.mod index c013bf7..6f177bb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/stealthrocket/dispatch go 1.22.0 require ( - buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1 + buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240429010127-639d52c5db75.1 github.com/charmbracelet/bubbles v0.18.0 github.com/charmbracelet/bubbletea v0.25.0 github.com/charmbracelet/lipgloss v0.9.1 diff --git a/go.sum b/go.sum index 093dacf..2cde6a5 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1 h1:SbKXkoZduR4jQ+aCrX71I+dAcHKZPk1SiriRs/XYX9s= buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20231115204500-e097f827e652.1/go.mod h1:Tgn5bgL220vkFOI0KPStlcClPeOJzAv4uT+V8JXGUnw= -buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1 h1:fo9VjlitQzAnyrzhYMFyxV2iNmNMN2HF4jtQFutblO4= -buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240417235922-27bc9b1d8ccb.1/go.mod h1:yJei/TBJwZBJ8ZUWCKVKceUHk/8gniSGs812SZA9TEE= +buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240429010127-639d52c5db75.1 h1:C8qLXgA5Y2iK8VkXsNHi8E4qU3aUsZQ0mFqwgxIH7KI= +buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go v1.33.0-20240429010127-639d52c5db75.1/go.mod h1:yJei/TBJwZBJ8ZUWCKVKceUHk/8gniSGs812SZA9TEE= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/charmbracelet/bubbles v0.18.0 h1:PYv1A036luoBGroX6VWjQIE9Syf2Wby2oOl/39KLfy0= diff --git a/proto/buf.lock b/proto/buf.lock index 95fc641..96a9278 100644 --- a/proto/buf.lock +++ b/proto/buf.lock @@ -9,5 +9,5 @@ deps: - remote: buf.build owner: stealthrocket repository: dispatch-proto - commit: 27bc9b1d8ccb4ab2a37063596845524b - digest: shake256:deb2dca49a7ed8943f1839c0b85fd55f060d2a436e40f33f736f6e979236bb5fab2b5ed8b8dfd1b4883cecf03f541c19a65c26f938c504b285d9a4b8c8d18820 + commit: 639d52c5db754187a9461d96d783c093 + digest: shake256:0fc989737c9db14c41feab7f2dc847b9cf7949cdb1852e3a5337dbb99d909a38acd9211b60501274cac55924ea18c2b18889a7087183ab5d850e79e3b4c3eaed