From b9e1514c852811f4028a55fbe517d6b6126df597 Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Wed, 20 Sep 2023 08:21:01 +0200 Subject: [PATCH 01/11] feat(client): expose a requester interface The patch adds a public Requester interface which will allow derived projects to use the Pebble instantiated client to extend the available commands. The interface is designed to also allow completely replacing the default implementation provided by the Pebble client. The changes made in the patch has been done in a way to produce as small as possible diffs (we keep doSync/doAsyc wrappers). The default interface has been implemented in the client.go file to allow reviewers to easily identify which code was added, and which is unchanged. The following changes are made: 1. The ResultInfo type previously returned by doSync and doAsync private function are removed. Although this is a publicly exposed type, the return value as of today has always been discarded, and the struct is currently empty. 2. ResultInfo has been replaced by RequestResponse. 3. The logs client request now uses the same retry logic on GET failure, as any other GET request. This is previously not possible because the retry logic and response unmarshall code was bundled, not allow raw access to the HTTP body. 4. The CloseIdleConnections() call has been removed as the final Daemon termination step (now in line with Snapd daemon termination). The normal use case for this call is to free idle connections associated with the HTTP client transport instance. This is used in cases where connection reuse cannot occur (concurrent requests, or requests for which the body is not freed immediately) and the transport connection pool builds up idle connections over time (this can also be controlled with idle connection timeout settings). In our case, as the final step before process termination of the server, this is something the garbage collector does in any case just fine, so we really do not need this. --- client/client.go | 457 +++++++++++++++++++++++++++------------ client/exec.go | 12 +- client/exec_test.go | 3 +- client/export_test.go | 28 ++- client/logs.go | 11 +- client/services.go | 21 +- internals/cli/cmd_run.go | 3 - 7 files changed, 366 insertions(+), 169 deletions(-) diff --git a/client/client.go b/client/client.go index 0e9cce91a..451f5434e 100644 --- a/client/client.go +++ b/client/client.go @@ -34,6 +34,63 @@ import ( "github.com/canonical/pebble/internals/wsutil" ) +// DecoderFunc allows the client access to the HTTP response. See the SetDecoder +// description for more details. +type DecoderFunc func(ctx context.Context, res *http.Response, opts *RequestOptions, result interface{}) (*RequestResponse, error) + +type Requester interface { + // Do must support the following cases: + // + // 1. Sync request: + // ctx: Supply a context instance + // RequestOptions: Async must be false + // result: Takes a custom JSON struct + // RequestResponse: Unused + // + // 2. Async request: + // ctx: Supply a context instance + // RequestOptions: Async must be true + // result: Takes a custom JSON struct + // RequestResponse: Returns a ChangeID + // + // 3. Websocket creation request + // ctx: Supply a context instance + // RequestOptions: Path takes the complete websocket path + // result: Takes the address of a Websocket interface + // RequestResponse: Unused + // + // 4. Raw HTTP body request + // ctx: Supply a context instance + // RequestOptions: Async unused + // result: Takes the address of a BodyReader interface + // RequestResponse: Unused + // + // See the default implementation for further details. + Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) + + // SetDecoder allows for client specific processing to be hooked into + // the sync and async response decoding process. The decoder is also + // responsible for unmarshalling the result, and populating the + // RequestReponse. + SetDecoder(decoder DecoderFunc) +} + +// RequestOptions allows setting up a specific request. +type RequestOptions struct { + Method string + Path string + Query url.Values + Headers map[string]string + Body io.Reader + Async bool +} + +// RequestResponse defines a common response associated with requests. +type RequestResponse struct { + StatusCode int + ChangeID string +} + // SocketNotFoundError is the error type returned when the client fails // to find a unix socket at the specified path. type SocketNotFoundError struct { @@ -106,21 +163,24 @@ type Config struct { // A Client knows how to talk to the Pebble daemon. type Client struct { - baseURL url.URL - doer doer - userAgent string - - maintenance error + Requester Requester + maintenance error warningCount int warningTimestamp time.Time - - getWebsocket getWebsocketFunc } -type getWebsocketFunc func(url string) (clientWebsocket, error) +type getWebsocketFunc func(ctx context.Context, url string) (Websocket, error) + +// BodyReader defines a minimal compliant interface which the requester must +// intepret as a request to stream the body content. +type BodyReader interface { + io.ReadCloser +} -type clientWebsocket interface { +// websocket defines a minimal compliant interface which the requester must +// interpret as a websocket creation request. +type Websocket interface { wsutil.MessageReader wsutil.MessageWriter io.Closer @@ -135,55 +195,61 @@ func New(config *Config) (*Client, error) { if config == nil { config = &Config{} } - - var client *Client - var transport *http.Transport - - if config.BaseURL == "" { - // By default talk over a unix socket. - transport = &http.Transport{Dial: unixDialer(config.Socket), DisableKeepAlives: config.DisableKeepAlive} - baseURL := url.URL{Scheme: "http", Host: "localhost"} - client = &Client{baseURL: baseURL} - } else { - // Otherwise talk regular HTTP-over-TCP. - baseURL, err := url.Parse(config.BaseURL) - if err != nil { - return nil, fmt.Errorf("cannot parse base URL: %v", err) - } - transport = &http.Transport{DisableKeepAlives: config.DisableKeepAlive} - client = &Client{baseURL: *baseURL} - } - - client.doer = &http.Client{Transport: transport} - client.userAgent = config.UserAgent - client.getWebsocket = func(url string) (clientWebsocket, error) { - return getWebsocket(transport, url) + requester, err := NewDefaultRequester(&DefaultRequesterConfig{ + Socket: config.Socket, + BaseURL: config.BaseURL, + DisableKeepAlive: config.DisableKeepAlive, + UserAgent: config.UserAgent, + }) + if err != nil { + return nil, err } + return NewWithRequester(requester) +} +func NewWithRequester(requester Requester) (*Client, error) { + client := &Client{Requester: requester} + client.Requester.SetDecoder(client.decoder) return client, nil } -func (client *Client) getTaskWebsocket(taskID, websocketID string) (clientWebsocket, error) { +func (client *Client) getTaskWebsocket(taskID, websocketID string) (Websocket, error) { url := fmt.Sprintf("ws://localhost/v1/tasks/%s/websocket/%s", taskID, websocketID) - return client.getWebsocket(url) + var ws Websocket + _, err := client.Requester.Do(context.Background(), &RequestOptions{Path: url}, &ws) + if err != nil { + return nil, err + } + return ws, nil } -func getWebsocket(transport *http.Transport, url string) (clientWebsocket, error) { +func getWebsocket(ctx context.Context, transport *http.Transport, url string) (Websocket, error) { dialer := websocket.Dialer{ NetDial: transport.Dial, Proxy: transport.Proxy, TLSClientConfig: transport.TLSClientConfig, HandshakeTimeout: 5 * time.Second, } - conn, _, err := dialer.Dial(url, nil) + conn, _, err := dialer.DialContext(ctx, url, nil) return conn, err } // CloseIdleConnections closes any API connections that are currently unused. func (client *Client) CloseIdleConnections() { - c, ok := client.doer.(*http.Client) - if ok { - c.CloseIdleConnections() + // CloseIdleConnections is a public API we have to honor. We only support + // this in the default requester so existing dependencies on Client keeps on + // working. + // + // See: https://forum.golangbridge.org/t/when-should-i-use-client-closeidleconnections/19254 + // + // Note: This is only needed in cases where the transport connection pool + // builds up idle connections really fast, and we do not want to wait for + // the garbage collector to release them after the idle timeout. + if requester, ok := client.Requester.(*DefaultRequester); ok { + c, ok := requester.doer.(*http.Client) + if ok { + c.CloseIdleConnections() + } } } @@ -219,27 +285,26 @@ func (e ConnectionError) Unwrap() error { return e.error } -// raw performs a request and returns the resulting http.Response and -// error you usually only need to call this directly if you expect the -// response to not be JSON, otherwise you'd call Do(...) instead. -func (client *Client) raw(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { +// raw creates an HTTP request, performs the request and returns an HTTP response +// and error. +func (br *DefaultRequester) raw(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { // fake a url to keep http.Client happy - u := client.baseURL - u.Path = path.Join(client.baseURL.Path, urlpath) + u := br.baseURL + u.Path = path.Join(br.baseURL.Path, urlpath) u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, method, u.String(), body) if err != nil { return nil, RequestError{err} } - if client.userAgent != "" { - req.Header.Set("User-Agent", client.userAgent) + if br.userAgent != "" { + req.Header.Set("User-Agent", br.userAgent) } for key, value := range headers { req.Header.Set(key, value) } - rsp, err := client.doer.Do(req) + rsp, err := br.doer.Do(req) if err != nil { return nil, ConnectionError{err} } @@ -248,20 +313,20 @@ func (client *Client) raw(ctx context.Context, method, urlpath string, query url } var ( - doRetry = 250 * time.Millisecond - doTimeout = 5 * time.Second + rawRetry = 250 * time.Millisecond + rawTimeout = 5 * time.Second ) // FakeDoRetry fakes the delays used by the do retry loop (intended for // testing). Calling restore will revert the changes. func FakeDoRetry(retry, timeout time.Duration) (restore func()) { - oldRetry := doRetry - oldTimeout := doTimeout - doRetry = retry - doTimeout = timeout + oldRetry := rawRetry + oldTimeout := rawTimeout + rawRetry = retry + rawTimeout = timeout return func() { - doRetry = oldRetry - doTimeout = oldTimeout + rawRetry = oldRetry + rawTimeout = oldTimeout } } @@ -275,20 +340,25 @@ func (h hijacked) Do(req *http.Request) (*http.Response, error) { // Hijack lets the caller take over the raw HTTP request. func (client *Client) Hijack(f func(*http.Request) (*http.Response, error)) { - client.doer = hijacked{f} + // Hijack is a public API we have to honor. We only support this in the + // default requester so existing dependencies on Client keeps on + // working. Anyone supplying their own requester will immediately see + // that this will fail to work. The Requester interface itself provides + // a much better way to customize the doer. + if requester, ok := client.Requester.(*DefaultRequester); ok { + requester.doer = hijacked{f} + } } -// do performs a request and decodes the resulting json into the given -// value. It's low-level, for testing/experimenting only; you should -// usually use a higher level interface that builds on this. -func (client *Client) do(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) error { - retry := time.NewTicker(doRetry) +// rawWithRetry builds in a retry mechanism for GET failures (body-less request) +func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { + retry := time.NewTicker(rawRetry) defer retry.Stop() - timeout := time.After(doTimeout) + timeout := time.After(rawTimeout) var rsp *http.Response var err error for { - rsp, err = client.raw(context.Background(), method, path, query, headers, body) + rsp, err = br.raw(ctx, method, urlpath, query, headers, body) if err == nil || method != "GET" { break } @@ -300,17 +370,62 @@ func (client *Client) do(method, path string, query url.Values, headers map[stri break } if err != nil { - return err + return nil, err } - defer rsp.Body.Close() + return rsp, nil +} - if v != nil { - if err := decodeInto(rsp.Body, v); err != nil { - return err +// Do implements all the required functionality as defined by the Requester interface. The combination +// of RequestOptions and the result argument type selects the Do behaviour. In case of a successful +// response, the result argument get a request specific result. The RequestResponse return struct will +// include some common attributes, but when it is set is determined by the specific request. +// +// For example see doSync, doAsync, getTaskWebsocket for some examples. +// +// Please see the Requester interface for further details. +func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) { + // Is the result expecting a websocket? + if ws, ok := result.(*Websocket); ok { + conn, err := br.getWebsocket(ctx, opts.Path) + if err != nil { + return nil, err } + *ws = conn + return nil, nil } - return nil + httpResp, err := br.rawWithRetry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) + if err != nil { + return nil, err + } + + // Is the result expecting a caller-managed body reader? + if bodyReader, ok := result.(*BodyReader); ok { + *bodyReader = httpResp.Body + return nil, nil + } + + // If we get here, this is a normal sync or async server request so + // we have to close the body. + defer httpResp.Body.Close() + + // Get the client decoder to extract what it needs before we proceed + reqResp, err := br.decoder(ctx, httpResp, opts, result) + if err != nil { + return nil, err + } + + // Sanity check sync and async requests + if opts.Async == true { + if reqResp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("operation not accepted") + } + if reqResp.ChangeID == "" { + return nil, fmt.Errorf("async response without change reference") + } + } + + return reqResp, nil } func decodeInto(reader io.Reader, v interface{}) error { @@ -326,66 +441,27 @@ func decodeInto(reader io.Reader, v interface{}) error { return nil } -// doSync performs a request to the given path using the specified HTTP method. -// It expects a "sync" response from the API and on success decodes the JSON -// response payload into the given value using the "UseNumber" json decoding -// which produces json.Numbers instead of float64 types for numbers. -func (client *Client) doSync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*ResultInfo, error) { - var rsp response - if err := client.do(method, path, query, headers, body, &rsp); err != nil { - return nil, err - } - if err := rsp.err(client); err != nil { - return nil, err - } - if rsp.Type != "sync" { - return nil, fmt.Errorf("expected sync response, got %q", rsp.Type) - } - - if v != nil { - if err := decodeWithNumber(bytes.NewReader(rsp.Result), v); err != nil { - return nil, fmt.Errorf("cannot unmarshal: %w", err) - } - } - - client.warningCount = rsp.WarningCount - client.warningTimestamp = rsp.WarningTimestamp - - return &rsp.ResultInfo, nil +func (client *Client) doSync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*RequestResponse, error) { + return client.Requester.Do(context.Background(), &RequestOptions{ + Method: method, + Path: path, + Query: query, + Headers: headers, + Body: body, + }, v) } -func (client *Client) doAsync(method, path string, query url.Values, headers map[string]string, body io.Reader) (changeID string, err error) { - _, changeID, err = client.doAsyncFull(method, path, query, headers, body) - return -} - -func (client *Client) doAsyncFull(method, path string, query url.Values, headers map[string]string, body io.Reader) (result json.RawMessage, changeID string, err error) { - var rsp response - - if err := client.do(method, path, query, headers, body, &rsp); err != nil { - return nil, "", err - } - if err := rsp.err(client); err != nil { - return nil, "", err - } - if rsp.Type != "async" { - return nil, "", fmt.Errorf("expected async response for %q on %q, got %q", method, path, rsp.Type) - } - if rsp.StatusCode != 202 { - return nil, "", fmt.Errorf("operation not accepted") - } - if rsp.Change == "" { - return nil, "", fmt.Errorf("async response without change reference") - } - - return rsp.Result, rsp.Change, nil +func (client *Client) doAsync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*RequestResponse, error) { + return client.Requester.Do(context.Background(), &RequestOptions{ + Method: method, + Path: path, + Query: query, + Headers: headers, + Body: body, + Async: true, + }, v) } -// ResultInfo is empty for now, but this is the mechanism that conveys -// general information that makes sense to requests at a more general -// level, and might be disconnected from the specific request at hand. -type ResultInfo struct{} - // A response produced by the REST API will usually fit in this // (exceptions are the icons/ endpoints obvs) type response struct { @@ -398,8 +474,6 @@ type response struct { WarningCount int `json:"warning-count"` WarningTimestamp time.Time `json:"warning-timestamp"` - ResultInfo - Maintenance *Error `json:"maintenance"` } @@ -423,16 +497,8 @@ const ( ErrorKindNoDefaultServices = "no-default-services" ) -func (rsp *response) err(cli *Client) error { - if cli != nil { - maintErr := rsp.Maintenance - // avoid setting to (*client.Error)(nil) - if maintErr != nil { - cli.maintenance = maintErr - } else { - cli.maintenance = nil - } - } +// err extract the error in case of an error type response +func (rsp *response) err() error { if rsp.Type != "error" { return nil } @@ -457,7 +523,7 @@ func parseError(r *http.Response) error { return fmt.Errorf("cannot unmarshal error: %w", err) } - err := rsp.err(nil) + err := rsp.err() if err == nil { return fmt.Errorf("server error: %q", r.Status) } @@ -511,3 +577,116 @@ func (client *Client) DebugGet(action string, result interface{}, params map[str _, err := client.doSync("GET", "/v1/debug", urlParams, nil, nil, &result) return err } + +// decoder receives a raw HTTP response and performs internal client +// processing, as well as unmarshalling the custom result. +func (client *Client) decoder(ctx context.Context, rsp *http.Response, opts *RequestOptions, result interface{}) (*RequestResponse, error) { + var serverResp response + if err := decodeInto(rsp.Body, &serverResp); err != nil { + return nil, err + } + + // Update the maintenance error state + if serverResp.Maintenance != nil { + client.maintenance = serverResp.Maintenance + } else { + client.maintenance = nil + } + + // Deal with error type response + if err := serverResp.err(); err != nil { + return nil, err + } + + // At this point only sync and async type requests may exist so lets + // make sure this is the case. + // + // Note: tests depend on the order or checks, so this cannot simply + // be moved. + if opts.Async == false { + if serverResp.Type != "sync" { + return nil, fmt.Errorf("expected sync response, got %q", serverResp.Type) + } + } else { + if serverResp.Type != "async" { + return nil, fmt.Errorf("expected async response for %q on %q, got %q", opts.Method, opts.Path, serverResp.Type) + } + } + + // Warnings are only included if not an error type response + client.warningCount = serverResp.WarningCount + client.warningTimestamp = serverResp.WarningTimestamp + + // Decode the supplied result type + if result != nil { + if err := decodeWithNumber(bytes.NewReader(serverResp.Result), result); err != nil { + return nil, fmt.Errorf("cannot unmarshal: %w", err) + } + } + + // Common response + return &RequestResponse{ + StatusCode: serverResp.StatusCode, + ChangeID: serverResp.Change, + }, nil +} + +type DefaultRequesterConfig struct { + // BaseURL contains the base URL where the Pebble daemon is expected to be. + // It can be empty for a default behavior of talking over a unix socket. + BaseURL string + + // Socket is the path to the unix socket to use. + Socket string + + // DisableKeepAlive indicates that the connections should not be kept + // alive for later reuse (the default is to keep them alive). + DisableKeepAlive bool + + // UserAgent is the User-Agent header sent to the Pebble daemon. + UserAgent string +} + +type DefaultRequester struct { + baseURL url.URL + doer doer + userAgent string + transport *http.Transport + decoder DecoderFunc + getWebsocket getWebsocketFunc +} + +func NewDefaultRequester(opts *DefaultRequesterConfig) (*DefaultRequester, error) { + if opts == nil { + opts = &DefaultRequesterConfig{} + } + + var requester *DefaultRequester + + if opts.BaseURL == "" { + // By default talk over a unix socket. + transport := &http.Transport{Dial: unixDialer(opts.Socket), DisableKeepAlives: opts.DisableKeepAlive} + baseURL := url.URL{Scheme: "http", Host: "localhost"} + requester = &DefaultRequester{baseURL: baseURL, transport: transport} + } else { + // Otherwise talk regular HTTP-over-TCP. + baseURL, err := url.Parse(opts.BaseURL) + if err != nil { + return nil, fmt.Errorf("cannot parse base URL: %v", err) + } + transport := &http.Transport{DisableKeepAlives: opts.DisableKeepAlive} + requester = &DefaultRequester{baseURL: *baseURL, transport: transport} + } + + requester.doer = &http.Client{Transport: requester.transport} + requester.userAgent = opts.UserAgent + requester.getWebsocket = func(ctx context.Context, url string) (Websocket, error) { + return getWebsocket(ctx, requester.transport, url) + } + + return requester, nil +} + +func (br *DefaultRequester) SetDecoder(decoder DecoderFunc) { + br.decoder = decoder +} diff --git a/client/exec.go b/client/exec.go index 88d405bd4..c4de72ffb 100644 --- a/client/exec.go +++ b/client/exec.go @@ -152,14 +152,10 @@ func (client *Client) Exec(opts *ExecOptions) (*ExecProcess, error) { headers := map[string]string{ "Content-Type": "application/json", } - resultBytes, changeID, err := client.doAsyncFull("POST", "/v1/exec", nil, headers, &body) - if err != nil { - return nil, err - } var result execResult - err = json.Unmarshal(resultBytes, &result) + reqResponse, err := client.doAsync("POST", "/v1/exec", nil, headers, &body, &result) if err != nil { - return nil, fmt.Errorf("cannot unmarshal JSON response: %w", err) + return nil, err } // Connect to the "control" websocket. @@ -178,7 +174,7 @@ func (client *Client) Exec(opts *ExecOptions) (*ExecProcess, error) { stdoutDone := wsutil.WebsocketRecvStream(stdout, ioConn) // Handle stderr separately if needed. - var stderrConn clientWebsocket + var stderrConn Websocket var stderrDone chan bool if opts.Stderr != nil { stderrConn, err = client.getTaskWebsocket(taskID, "stderr") @@ -211,7 +207,7 @@ func (client *Client) Exec(opts *ExecOptions) (*ExecProcess, error) { }() process := &ExecProcess{ - changeID: changeID, + changeID: reqResponse.ChangeID, client: client, timeout: opts.Timeout, writesDone: writesDone, diff --git a/client/exec_test.go b/client/exec_test.go index 1bb4f4276..03b8b8d0f 100644 --- a/client/exec_test.go +++ b/client/exec_test.go @@ -16,6 +16,7 @@ package client_test import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -46,7 +47,7 @@ func (s *execSuite) SetUpTest(c *C) { s.stdioWs = &testWebsocket{} s.controlWs = &testWebsocket{} s.stderrWs = &testWebsocket{} - s.cli.SetGetWebsocket(func(url string) (client.ClientWebsocket, error) { + s.cli.SetGetWebsocket(func(ctx context.Context, url string) (client.Websocket, error) { matches := websocketRegexp.FindStringSubmatch(url) if matches == nil { return nil, fmt.Errorf("invalid websocket URL %q", url) diff --git a/client/export_test.go b/client/export_test.go index ec509df21..a6adb2252 100644 --- a/client/export_test.go +++ b/client/export_test.go @@ -15,6 +15,7 @@ package client import ( + "context" "fmt" "io" "net/url" @@ -26,23 +27,38 @@ var ( ) func (client *Client) SetDoer(d doer) { - client.doer = d + client.Requester.(*DefaultRequester).doer = d } func (client *Client) Do(method, path string, query url.Values, body io.Reader, v interface{}) error { - return client.do(method, path, query, nil, body, v) + var r BodyReader + _, err := client.Requester.Do(context.Background(), &RequestOptions{ + Method: method, + Path: path, + Query: query, + Headers: nil, + Body: body, + }, &r) + if err != nil { + return err + } + err = decodeInto(r, v) + if err != nil { + return err + } + return nil } func (client *Client) FakeAsyncRequest() (changeId string, err error) { - changeId, err = client.doAsync("GET", "/v1/async-test", nil, nil, nil) + reqResponse, err := client.doAsync("GET", "/v1/async-test", nil, nil, nil, nil) if err != nil { return "", fmt.Errorf("cannot do async test: %v", err) } - return changeId, nil + return reqResponse.ChangeID, nil } func (client *Client) SetGetWebsocket(f getWebsocketFunc) { - client.getWebsocket = f + client.Requester.(*DefaultRequester).getWebsocket = f } // WaitStdinDone waits for WebsocketSendStream to be finished calling @@ -50,5 +66,3 @@ func (client *Client) SetGetWebsocket(f getWebsocketFunc) { func (p *ExecProcess) WaitStdinDone() { <-p.stdinDone } - -type ClientWebsocket = clientWebsocket diff --git a/client/logs.go b/client/logs.go index 408b3bc4b..f374cf725 100644 --- a/client/logs.go +++ b/client/logs.go @@ -73,13 +73,18 @@ func (client *Client) logs(ctx context.Context, opts *LogsOptions, follow bool) if follow { query.Set("follow", "true") } - res, err := client.raw(ctx, "GET", "/v1/logs", query, nil, nil) + var body BodyReader + _, err := client.Requester.Do(ctx, &RequestOptions{ + Method: "GET", + Path: "/v1/logs", + Query: query, + }, &body) if err != nil { return err } - defer res.Body.Close() + defer body.Close() - reader := bufio.NewReaderSize(res.Body, logReaderSize) + reader := bufio.NewReaderSize(body, logReaderSize) for { err = decodeLog(reader, opts.WriteLog) if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { diff --git a/client/services.go b/client/services.go index 0be19d23e..0ece749d1 100644 --- a/client/services.go +++ b/client/services.go @@ -30,33 +30,33 @@ type ServiceOptions struct { // AutoStart starts the services makes as "startup: enabled". opts.Names must // be empty for this call. func (client *Client) AutoStart(opts *ServiceOptions) (changeID string, err error) { - _, changeID, err = client.doMultiServiceAction("autostart", opts.Names) + changeID, err = client.doMultiServiceAction("autostart", opts.Names) return changeID, err } // Start starts the services named in opts.Names in dependency order. func (client *Client) Start(opts *ServiceOptions) (changeID string, err error) { - _, changeID, err = client.doMultiServiceAction("start", opts.Names) + changeID, err = client.doMultiServiceAction("start", opts.Names) return changeID, err } // Stop stops the services named in opts.Names in dependency order. func (client *Client) Stop(opts *ServiceOptions) (changeID string, err error) { - _, changeID, err = client.doMultiServiceAction("stop", opts.Names) + changeID, err = client.doMultiServiceAction("stop", opts.Names) return changeID, err } // Restart stops and then starts the services named in opts.Names in // dependency order. func (client *Client) Restart(opts *ServiceOptions) (changeID string, err error) { - _, changeID, err = client.doMultiServiceAction("restart", opts.Names) + changeID, err = client.doMultiServiceAction("restart", opts.Names) return changeID, err } // Replan stops and (re)starts the services whose configuration has changed // since they were started. opts.Names must be empty for this call. func (client *Client) Replan(opts *ServiceOptions) (changeID string, err error) { - _, changeID, err = client.doMultiServiceAction("replan", opts.Names) + changeID, err = client.doMultiServiceAction("replan", opts.Names) return changeID, err } @@ -65,19 +65,24 @@ type multiActionData struct { Services []string `json:"services"` } -func (client *Client) doMultiServiceAction(actionName string, services []string) (result json.RawMessage, changeID string, err error) { +func (client *Client) doMultiServiceAction(actionName string, services []string) (changeID string, err error) { action := multiActionData{ Action: actionName, Services: services, } data, err := json.Marshal(&action) if err != nil { - return nil, "", fmt.Errorf("cannot marshal multi-service action: %w", err) + return "", fmt.Errorf("cannot marshal multi-service action: %w", err) } headers := map[string]string{ "Content-Type": "application/json", } - return client.doAsyncFull("POST", "/v1/services", nil, headers, bytes.NewBuffer(data)) + + reqReponse, err := client.doAsync("POST", "/v1/services", nil, headers, bytes.NewBuffer(data), nil) + if err != nil { + return "", err + } + return reqReponse.ChangeID, nil } type ServicesOptions struct { diff --git a/internals/cli/cmd_run.go b/internals/cli/cmd_run.go index e32bb4b36..4d971a71e 100644 --- a/internals/cli/cmd_run.go +++ b/internals/cli/cmd_run.go @@ -239,9 +239,6 @@ out: } } - // Close our own self-connection, otherwise it prevents fast and clean termination. - rcmd.client.CloseIdleConnections() - return d.Stop(ch) } From 0d6e5b1bb7977cf95b4f7a4fc8b7a72be8f013e4 Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Thu, 28 Sep 2023 16:38:06 +0200 Subject: [PATCH 02/11] Code Review Changes 1 --- client/client.go | 179 +++++++++++++++------------------------ client/exec.go | 2 +- client/exec_test.go | 3 +- client/export_test.go | 22 ++--- client/logs.go | 16 ++-- internals/cli/cmd_run.go | 5 ++ 6 files changed, 95 insertions(+), 132 deletions(-) diff --git a/client/client.go b/client/client.go index 451f5434e..5393e37c0 100644 --- a/client/client.go +++ b/client/client.go @@ -39,33 +39,9 @@ import ( type DecoderFunc func(ctx context.Context, res *http.Response, opts *RequestOptions, result interface{}) (*RequestResponse, error) type Requester interface { - // Do must support the following cases: - // - // 1. Sync request: - // ctx: Supply a context instance - // RequestOptions: Async must be false - // result: Takes a custom JSON struct - // RequestResponse: Unused - // - // 2. Async request: - // ctx: Supply a context instance - // RequestOptions: Async must be true - // result: Takes a custom JSON struct - // RequestResponse: Returns a ChangeID - // - // 3. Websocket creation request - // ctx: Supply a context instance - // RequestOptions: Path takes the complete websocket path - // result: Takes the address of a Websocket interface - // RequestResponse: Unused - // - // 4. Raw HTTP body request - // ctx: Supply a context instance - // RequestOptions: Async unused - // result: Takes the address of a BodyReader interface - // RequestResponse: Unused - // - // See the default implementation for further details. + // Allows for sync, async requests as well as direct access to the + // HTTP response body. See the default implementation for further + // details. Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) // SetDecoder allows for client specific processing to be hooked into @@ -73,22 +49,27 @@ type Requester interface { // responsible for unmarshalling the result, and populating the // RequestReponse. SetDecoder(decoder DecoderFunc) + + // Provide direct access to transport for specialist operations. + Transport() http.RoundTripper } // RequestOptions allows setting up a specific request. type RequestOptions struct { - Method string - Path string - Query url.Values - Headers map[string]string - Body io.Reader - Async bool + Method string + Path string + Query url.Values + Headers map[string]string + Body io.Reader + Async bool + ReturnBody bool } // RequestResponse defines a common response associated with requests. type RequestResponse struct { StatusCode int ChangeID string + Body io.ReadCloser } // SocketNotFoundError is the error type returned when the client fails @@ -168,19 +149,13 @@ type Client struct { maintenance error warningCount int warningTimestamp time.Time -} - -type getWebsocketFunc func(ctx context.Context, url string) (Websocket, error) -// BodyReader defines a minimal compliant interface which the requester must -// intepret as a request to stream the body content. -type BodyReader interface { - io.ReadCloser + getWebsocket getWebsocketFunc } -// websocket defines a minimal compliant interface which the requester must -// interpret as a websocket creation request. -type Websocket interface { +type getWebsocketFunc func(url string) (clientWebsocket, error) + +type clientWebsocket interface { wsutil.MessageReader wsutil.MessageWriter io.Closer @@ -210,46 +185,44 @@ func New(config *Config) (*Client, error) { func NewWithRequester(requester Requester) (*Client, error) { client := &Client{Requester: requester} client.Requester.SetDecoder(client.decoder) + client.getWebsocket = func(url string) (clientWebsocket, error) { + return getWebsocket(requester.Transport(), url) + } return client, nil } -func (client *Client) getTaskWebsocket(taskID, websocketID string) (Websocket, error) { +func (client *Client) getTaskWebsocket(taskID, websocketID string) (clientWebsocket, error) { url := fmt.Sprintf("ws://localhost/v1/tasks/%s/websocket/%s", taskID, websocketID) - var ws Websocket - _, err := client.Requester.Do(context.Background(), &RequestOptions{Path: url}, &ws) - if err != nil { - return nil, err - } - return ws, nil + return client.getWebsocket(url) } -func getWebsocket(ctx context.Context, transport *http.Transport, url string) (Websocket, error) { +func getWebsocket(transport http.RoundTripper, url string) (clientWebsocket, error) { + httpTransport, ok := transport.(*http.Transport) + if !ok { + return nil, fmt.Errorf("cannot create websocket: transport not compatible") + } + dialer := websocket.Dialer{ - NetDial: transport.Dial, - Proxy: transport.Proxy, - TLSClientConfig: transport.TLSClientConfig, + NetDial: httpTransport.Dial, + Proxy: httpTransport.Proxy, + TLSClientConfig: httpTransport.TLSClientConfig, HandshakeTimeout: 5 * time.Second, } - conn, _, err := dialer.DialContext(ctx, url, nil) + conn, _, err := dialer.Dial(url, nil) return conn, err } // CloseIdleConnections closes any API connections that are currently unused. func (client *Client) CloseIdleConnections() { - // CloseIdleConnections is a public API we have to honor. We only support - // this in the default requester so existing dependencies on Client keeps on - // working. - // - // See: https://forum.golangbridge.org/t/when-should-i-use-client-closeidleconnections/19254 - // - // Note: This is only needed in cases where the transport connection pool - // builds up idle connections really fast, and we do not want to wait for - // the garbage collector to release them after the idle timeout. - if requester, ok := client.Requester.(*DefaultRequester); ok { - c, ok := requester.doer.(*http.Client) - if ok { - c.CloseIdleConnections() - } + transport := client.Requester.Transport() + // The following is taken from net/http/client.go because + // we are directly going to try and close idle connections and + // we must make sure the transport supports this. + type closeIdler interface { + CloseIdleConnections() + } + if tr, ok := transport.(closeIdler); ok { + tr.CloseIdleConnections() } } @@ -375,34 +348,22 @@ func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath st return rsp, nil } -// Do implements all the required functionality as defined by the Requester interface. The combination -// of RequestOptions and the result argument type selects the Do behaviour. In case of a successful -// response, the result argument get a request specific result. The RequestResponse return struct will -// include some common attributes, but when it is set is determined by the specific request. -// -// For example see doSync, doAsync, getTaskWebsocket for some examples. -// -// Please see the Requester interface for further details. +// Do implements all the required functionality as defined by the Requester interface. RequestOptions +// selects the Do behaviour. In case of a successful response, the result argument get a +// request specific result. The RequestResponse struct will include common attributes +// (not all will be set of all request types). func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) { - // Is the result expecting a websocket? - if ws, ok := result.(*Websocket); ok { - conn, err := br.getWebsocket(ctx, opts.Path) - if err != nil { - return nil, err - } - *ws = conn - return nil, nil - } httpResp, err := br.rawWithRetry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) if err != nil { return nil, err } - // Is the result expecting a caller-managed body reader? - if bodyReader, ok := result.(*BodyReader); ok { - *bodyReader = httpResp.Body - return nil, nil + // Is the result expecting a caller-managed raw body? + if opts.ReturnBody { + return &RequestResponse{ + Body: httpResp.Body, + }, nil } // If we get here, this is a normal sync or async server request so @@ -415,16 +376,6 @@ func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result return nil, err } - // Sanity check sync and async requests - if opts.Async == true { - if reqResp.StatusCode != http.StatusAccepted { - return nil, fmt.Errorf("operation not accepted") - } - if reqResp.ChangeID == "" { - return nil, fmt.Errorf("async response without change reference") - } - } - return reqResp, nil } @@ -601,8 +552,8 @@ func (client *Client) decoder(ctx context.Context, rsp *http.Response, opts *Req // At this point only sync and async type requests may exist so lets // make sure this is the case. // - // Note: tests depend on the order or checks, so this cannot simply - // be moved. + // Tests depend on the order or checks, so lets keep the order unchanged + // and deal with these before decode. if opts.Async == false { if serverResp.Type != "sync" { return nil, fmt.Errorf("expected sync response, got %q", serverResp.Type) @@ -611,6 +562,12 @@ func (client *Client) decoder(ctx context.Context, rsp *http.Response, opts *Req if serverResp.Type != "async" { return nil, fmt.Errorf("expected async response for %q on %q, got %q", opts.Method, opts.Path, serverResp.Type) } + if serverResp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("operation not accepted") + } + if serverResp.Change == "" { + return nil, fmt.Errorf("async response without change reference") + } } // Warnings are only included if not an error type response @@ -648,12 +605,11 @@ type DefaultRequesterConfig struct { } type DefaultRequester struct { - baseURL url.URL - doer doer - userAgent string - transport *http.Transport - decoder DecoderFunc - getWebsocket getWebsocketFunc + baseURL url.URL + doer doer + userAgent string + transport http.RoundTripper + decoder DecoderFunc } func NewDefaultRequester(opts *DefaultRequesterConfig) (*DefaultRequester, error) { @@ -680,9 +636,6 @@ func NewDefaultRequester(opts *DefaultRequesterConfig) (*DefaultRequester, error requester.doer = &http.Client{Transport: requester.transport} requester.userAgent = opts.UserAgent - requester.getWebsocket = func(ctx context.Context, url string) (Websocket, error) { - return getWebsocket(ctx, requester.transport, url) - } return requester, nil } @@ -690,3 +643,7 @@ func NewDefaultRequester(opts *DefaultRequesterConfig) (*DefaultRequester, error func (br *DefaultRequester) SetDecoder(decoder DecoderFunc) { br.decoder = decoder } + +func (br *DefaultRequester) Transport() http.RoundTripper { + return br.transport +} diff --git a/client/exec.go b/client/exec.go index c4de72ffb..8aab7780c 100644 --- a/client/exec.go +++ b/client/exec.go @@ -174,7 +174,7 @@ func (client *Client) Exec(opts *ExecOptions) (*ExecProcess, error) { stdoutDone := wsutil.WebsocketRecvStream(stdout, ioConn) // Handle stderr separately if needed. - var stderrConn Websocket + var stderrConn clientWebsocket var stderrDone chan bool if opts.Stderr != nil { stderrConn, err = client.getTaskWebsocket(taskID, "stderr") diff --git a/client/exec_test.go b/client/exec_test.go index 03b8b8d0f..1bb4f4276 100644 --- a/client/exec_test.go +++ b/client/exec_test.go @@ -16,7 +16,6 @@ package client_test import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -47,7 +46,7 @@ func (s *execSuite) SetUpTest(c *C) { s.stdioWs = &testWebsocket{} s.controlWs = &testWebsocket{} s.stderrWs = &testWebsocket{} - s.cli.SetGetWebsocket(func(ctx context.Context, url string) (client.Websocket, error) { + s.cli.SetGetWebsocket(func(url string) (client.ClientWebsocket, error) { matches := websocketRegexp.FindStringSubmatch(url) if matches == nil { return nil, fmt.Errorf("invalid websocket URL %q", url) diff --git a/client/export_test.go b/client/export_test.go index a6adb2252..88e1aac45 100644 --- a/client/export_test.go +++ b/client/export_test.go @@ -31,18 +31,18 @@ func (client *Client) SetDoer(d doer) { } func (client *Client) Do(method, path string, query url.Values, body io.Reader, v interface{}) error { - var r BodyReader - _, err := client.Requester.Do(context.Background(), &RequestOptions{ - Method: method, - Path: path, - Query: query, - Headers: nil, - Body: body, - }, &r) + resp, err := client.Requester.Do(context.Background(), &RequestOptions{ + Method: method, + Path: path, + Query: query, + Headers: nil, + Body: body, + ReturnBody: true, + }, nil) if err != nil { return err } - err = decodeInto(r, v) + err = decodeInto(resp.Body, v) if err != nil { return err } @@ -58,7 +58,7 @@ func (client *Client) FakeAsyncRequest() (changeId string, err error) { } func (client *Client) SetGetWebsocket(f getWebsocketFunc) { - client.Requester.(*DefaultRequester).getWebsocket = f + client.getWebsocket = f } // WaitStdinDone waits for WebsocketSendStream to be finished calling @@ -66,3 +66,5 @@ func (client *Client) SetGetWebsocket(f getWebsocketFunc) { func (p *ExecProcess) WaitStdinDone() { <-p.stdinDone } + +type ClientWebsocket = clientWebsocket diff --git a/client/logs.go b/client/logs.go index f374cf725..feaaccd84 100644 --- a/client/logs.go +++ b/client/logs.go @@ -73,18 +73,18 @@ func (client *Client) logs(ctx context.Context, opts *LogsOptions, follow bool) if follow { query.Set("follow", "true") } - var body BodyReader - _, err := client.Requester.Do(ctx, &RequestOptions{ - Method: "GET", - Path: "/v1/logs", - Query: query, - }, &body) + resp, err := client.Requester.Do(ctx, &RequestOptions{ + Method: "GET", + Path: "/v1/logs", + Query: query, + ReturnBody: true, + }, nil) if err != nil { return err } - defer body.Close() + defer resp.Body.Close() - reader := bufio.NewReaderSize(body, logReaderSize) + reader := bufio.NewReaderSize(resp.Body, logReaderSize) for { err = decodeLog(reader, opts.WriteLog) if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { diff --git a/internals/cli/cmd_run.go b/internals/cli/cmd_run.go index 4d971a71e..4ba5774f5 100644 --- a/internals/cli/cmd_run.go +++ b/internals/cli/cmd_run.go @@ -239,6 +239,11 @@ out: } } + // Close the client idle connection to the server (self connection) before we + // start with the HTTP shutdown process. This will speed up the server shutdown, + // and allow the Pebble process to exit faster. + rcmd.client.CloseIdleConnections() + return d.Stop(ch) } From d7ef701ea54fcfd85a2699235c0a66be2b8f07de Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Thu, 28 Sep 2023 17:06:56 +0200 Subject: [PATCH 03/11] Minor tweaks/typos. --- client/client.go | 3 +-- client/services.go | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index 5393e37c0..687f1753f 100644 --- a/client/client.go +++ b/client/client.go @@ -47,7 +47,7 @@ type Requester interface { // SetDecoder allows for client specific processing to be hooked into // the sync and async response decoding process. The decoder is also // responsible for unmarshalling the result, and populating the - // RequestReponse. + // RequestResponse. SetDecoder(decoder DecoderFunc) // Provide direct access to transport for specialist operations. @@ -353,7 +353,6 @@ func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath st // request specific result. The RequestResponse struct will include common attributes // (not all will be set of all request types). func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) { - httpResp, err := br.rawWithRetry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) if err != nil { return nil, err diff --git a/client/services.go b/client/services.go index 0ece749d1..42487a8c8 100644 --- a/client/services.go +++ b/client/services.go @@ -78,11 +78,11 @@ func (client *Client) doMultiServiceAction(actionName string, services []string) "Content-Type": "application/json", } - reqReponse, err := client.doAsync("POST", "/v1/services", nil, headers, bytes.NewBuffer(data), nil) + reqResponse, err := client.doAsync("POST", "/v1/services", nil, headers, bytes.NewBuffer(data), nil) if err != nil { return "", err } - return reqReponse.ChangeID, nil + return reqResponse.ChangeID, nil } type ServicesOptions struct { From cc7d45f483151744a0ea79baae6a6b9c3d24fa7d Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Mon, 2 Oct 2023 06:16:34 +0200 Subject: [PATCH 04/11] Code Review: make Requester private in client --- client/client.go | 43 ++++++++++++------------------------------- client/export_test.go | 4 ++-- client/logs.go | 2 +- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/client/client.go b/client/client.go index 687f1753f..3783dc60c 100644 --- a/client/client.go +++ b/client/client.go @@ -39,7 +39,7 @@ import ( type DecoderFunc func(ctx context.Context, res *http.Response, opts *RequestOptions, result interface{}) (*RequestResponse, error) type Requester interface { - // Allows for sync, async requests as well as direct access to the + // Do allows for sync, async requests as well as direct access to the // HTTP response body. See the default implementation for further // details. Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) @@ -144,7 +144,7 @@ type Config struct { // A Client knows how to talk to the Pebble daemon. type Client struct { - Requester Requester + requester Requester maintenance error warningCount int @@ -179,18 +179,19 @@ func New(config *Config) (*Client, error) { if err != nil { return nil, err } - return NewWithRequester(requester) -} -func NewWithRequester(requester Requester) (*Client, error) { - client := &Client{Requester: requester} - client.Requester.SetDecoder(client.decoder) + client := &Client{requester: requester} + requester.SetDecoder(client.decoder) client.getWebsocket = func(url string) (clientWebsocket, error) { return getWebsocket(requester.Transport(), url) } return client, nil } +func (client *Client) Requester() Requester { + return client.requester +} + func (client *Client) getTaskWebsocket(taskID, websocketID string) (clientWebsocket, error) { url := fmt.Sprintf("ws://localhost/v1/tasks/%s/websocket/%s", taskID, websocketID) return client.getWebsocket(url) @@ -214,7 +215,7 @@ func getWebsocket(transport http.RoundTripper, url string) (clientWebsocket, err // CloseIdleConnections closes any API connections that are currently unused. func (client *Client) CloseIdleConnections() { - transport := client.Requester.Transport() + transport := client.Requester().Transport() // The following is taken from net/http/client.go because // we are directly going to try and close idle connections and // we must make sure the transport supports this. @@ -303,26 +304,6 @@ func FakeDoRetry(retry, timeout time.Duration) (restore func()) { } } -type hijacked struct { - do func(*http.Request) (*http.Response, error) -} - -func (h hijacked) Do(req *http.Request) (*http.Response, error) { - return h.do(req) -} - -// Hijack lets the caller take over the raw HTTP request. -func (client *Client) Hijack(f func(*http.Request) (*http.Response, error)) { - // Hijack is a public API we have to honor. We only support this in the - // default requester so existing dependencies on Client keeps on - // working. Anyone supplying their own requester will immediately see - // that this will fail to work. The Requester interface itself provides - // a much better way to customize the doer. - if requester, ok := client.Requester.(*DefaultRequester); ok { - requester.doer = hijacked{f} - } -} - // rawWithRetry builds in a retry mechanism for GET failures (body-less request) func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { retry := time.NewTicker(rawRetry) @@ -351,7 +332,7 @@ func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath st // Do implements all the required functionality as defined by the Requester interface. RequestOptions // selects the Do behaviour. In case of a successful response, the result argument get a // request specific result. The RequestResponse struct will include common attributes -// (not all will be set of all request types). +// (not all will be set for all request types). func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) { httpResp, err := br.rawWithRetry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) if err != nil { @@ -392,7 +373,7 @@ func decodeInto(reader io.Reader, v interface{}) error { } func (client *Client) doSync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*RequestResponse, error) { - return client.Requester.Do(context.Background(), &RequestOptions{ + return client.Requester().Do(context.Background(), &RequestOptions{ Method: method, Path: path, Query: query, @@ -402,7 +383,7 @@ func (client *Client) doSync(method, path string, query url.Values, headers map[ } func (client *Client) doAsync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*RequestResponse, error) { - return client.Requester.Do(context.Background(), &RequestOptions{ + return client.Requester().Do(context.Background(), &RequestOptions{ Method: method, Path: path, Query: query, diff --git a/client/export_test.go b/client/export_test.go index 88e1aac45..f78270dca 100644 --- a/client/export_test.go +++ b/client/export_test.go @@ -27,11 +27,11 @@ var ( ) func (client *Client) SetDoer(d doer) { - client.Requester.(*DefaultRequester).doer = d + client.Requester().(*DefaultRequester).doer = d } func (client *Client) Do(method, path string, query url.Values, body io.Reader, v interface{}) error { - resp, err := client.Requester.Do(context.Background(), &RequestOptions{ + resp, err := client.Requester().Do(context.Background(), &RequestOptions{ Method: method, Path: path, Query: query, diff --git a/client/logs.go b/client/logs.go index feaaccd84..a73a37368 100644 --- a/client/logs.go +++ b/client/logs.go @@ -73,7 +73,7 @@ func (client *Client) logs(ctx context.Context, opts *LogsOptions, follow bool) if follow { query.Set("follow", "true") } - resp, err := client.Requester.Do(ctx, &RequestOptions{ + resp, err := client.Requester().Do(ctx, &RequestOptions{ Method: "GET", Path: "/v1/logs", Query: query, From c7ae8b9cffde3cbf2106635ad1f468439a65953c Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Mon, 2 Oct 2023 06:23:14 +0200 Subject: [PATCH 05/11] Code Review: typo --- client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 3783dc60c..0108471c1 100644 --- a/client/client.go +++ b/client/client.go @@ -50,7 +50,8 @@ type Requester interface { // RequestResponse. SetDecoder(decoder DecoderFunc) - // Provide direct access to transport for specialist operations. + // Transport provides direct access to the transport instance for + // specialist operations. Transport() http.RoundTripper } From dec9eb927acf3920341dbec2d8c268135e8cee9c Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Tue, 3 Oct 2023 18:28:15 +0200 Subject: [PATCH 06/11] Code Review: remove RoundTripper We do not want to make the interface only satisfy the RoundTripper interface because that alone cannot make Websockets work. We need to expose an interface that ensures currently Client requirements are met. --- client/client.go | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/client/client.go b/client/client.go index 0108471c1..77568dd60 100644 --- a/client/client.go +++ b/client/client.go @@ -52,7 +52,7 @@ type Requester interface { // Transport provides direct access to the transport instance for // specialist operations. - Transport() http.RoundTripper + Transport() *http.Transport } // RequestOptions allows setting up a specific request. @@ -198,16 +198,11 @@ func (client *Client) getTaskWebsocket(taskID, websocketID string) (clientWebsoc return client.getWebsocket(url) } -func getWebsocket(transport http.RoundTripper, url string) (clientWebsocket, error) { - httpTransport, ok := transport.(*http.Transport) - if !ok { - return nil, fmt.Errorf("cannot create websocket: transport not compatible") - } - +func getWebsocket(transport *http.Transport, url string) (clientWebsocket, error) { dialer := websocket.Dialer{ - NetDial: httpTransport.Dial, - Proxy: httpTransport.Proxy, - TLSClientConfig: httpTransport.TLSClientConfig, + NetDial: transport.Dial, + Proxy: transport.Proxy, + TLSClientConfig: transport.TLSClientConfig, HandshakeTimeout: 5 * time.Second, } conn, _, err := dialer.Dial(url, nil) @@ -216,16 +211,7 @@ func getWebsocket(transport http.RoundTripper, url string) (clientWebsocket, err // CloseIdleConnections closes any API connections that are currently unused. func (client *Client) CloseIdleConnections() { - transport := client.Requester().Transport() - // The following is taken from net/http/client.go because - // we are directly going to try and close idle connections and - // we must make sure the transport supports this. - type closeIdler interface { - CloseIdleConnections() - } - if tr, ok := transport.(closeIdler); ok { - tr.CloseIdleConnections() - } + client.Requester().Transport().CloseIdleConnections() } // Maintenance returns an error reflecting the daemon maintenance status or nil. @@ -589,7 +575,7 @@ type DefaultRequester struct { baseURL url.URL doer doer userAgent string - transport http.RoundTripper + transport *http.Transport decoder DecoderFunc } @@ -625,6 +611,6 @@ func (br *DefaultRequester) SetDecoder(decoder DecoderFunc) { br.decoder = decoder } -func (br *DefaultRequester) Transport() http.RoundTripper { +func (br *DefaultRequester) Transport() *http.Transport { return br.transport } From 25afb29feadf3471143c8bd0ac65ac2c836ad970 Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Fri, 6 Oct 2023 17:25:36 +0200 Subject: [PATCH 07/11] Code Review: Simplification with Gustavo --- client/client.go | 213 +++++++++++++++++++++--------------------- client/export_test.go | 14 +-- client/logs.go | 10 +- 3 files changed, 121 insertions(+), 116 deletions(-) diff --git a/client/client.go b/client/client.go index 77568dd60..093a39ad9 100644 --- a/client/client.go +++ b/client/client.go @@ -34,43 +34,51 @@ import ( "github.com/canonical/pebble/internals/wsutil" ) -// DecoderFunc allows the client access to the HTTP response. See the SetDecoder -// description for more details. -type DecoderFunc func(ctx context.Context, res *http.Response, opts *RequestOptions, result interface{}) (*RequestResponse, error) - type Requester interface { - // Do allows for sync, async requests as well as direct access to the - // HTTP response body. See the default implementation for further - // details. - Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) - - // SetDecoder allows for client specific processing to be hooked into - // the sync and async response decoding process. The decoder is also - // responsible for unmarshalling the result, and populating the - // RequestResponse. - SetDecoder(decoder DecoderFunc) - - // Transport provides direct access to the transport instance for - // specialist operations. + // Do performs the HTTP transaction using the provided options. + Do(ctx context.Context, opts *RequestOptions) (*RequestResponse, error) + + // Transport returns the HTTP transport in use by the underlying HTTP client. Transport() *http.Transport } +type RequestType int + +const ( + SyncRequest RequestType = iota + AsyncRequest + RawRequest +) + // RequestOptions allows setting up a specific request. type RequestOptions struct { - Method string - Path string - Query url.Values - Headers map[string]string - Body io.Reader - Async bool - ReturnBody bool + Type RequestType + Method string + Path string + Query url.Values + Headers map[string]string + Body io.Reader } // RequestResponse defines a common response associated with requests. type RequestResponse struct { StatusCode int ChangeID string - Body io.ReadCloser + Result []byte + + // Only set for RawRequest and must be completely read and closed. + Body io.ReadCloser +} + +// DecodeResult can be used to decode a SyncRequest or AsyncRequest. +func (resp *RequestResponse) DecodeResult(result interface{}) error { + if result != nil { + if err := decodeWithNumber(bytes.NewReader(resp.Result), result); err != nil { + return fmt.Errorf("cannot unmarshal: %w", err) + } + } + + return nil } // SocketNotFoundError is the error type returned when the client fails @@ -171,7 +179,9 @@ func New(config *Config) (*Client, error) { if config == nil { config = &Config{} } - requester, err := NewDefaultRequester(&DefaultRequesterConfig{ + + client := &Client{} + requester, err := NewDefaultRequester(client, &DefaultRequesterConfig{ Socket: config.Socket, BaseURL: config.BaseURL, DisableKeepAlive: config.DisableKeepAlive, @@ -181,11 +191,11 @@ func New(config *Config) (*Client, error) { return nil, err } - client := &Client{requester: requester} - requester.SetDecoder(client.decoder) + client.requester = requester client.getWebsocket = func(url string) (clientWebsocket, error) { return getWebsocket(requester.Transport(), url) } + return client, nil } @@ -320,14 +330,14 @@ func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath st // selects the Do behaviour. In case of a successful response, the result argument get a // request specific result. The RequestResponse struct will include common attributes // (not all will be set for all request types). -func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result interface{}) (*RequestResponse, error) { +func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions) (*RequestResponse, error) { httpResp, err := br.rawWithRetry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) if err != nil { return nil, err } // Is the result expecting a caller-managed raw body? - if opts.ReturnBody { + if opts.Type == RawRequest { return &RequestResponse{ Body: httpResp.Body, }, nil @@ -337,13 +347,53 @@ func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions, result // we have to close the body. defer httpResp.Body.Close() - // Get the client decoder to extract what it needs before we proceed - reqResp, err := br.decoder(ctx, httpResp, opts, result) - if err != nil { + var serverResp response + if err := decodeInto(httpResp.Body, &serverResp); err != nil { + return nil, err + } + + // Update the maintenance error state + if serverResp.Maintenance != nil { + br.client.maintenance = serverResp.Maintenance + } else { + br.client.maintenance = nil + } + + // Deal with error type response + if err := serverResp.err(); err != nil { return nil, err } - return reqResp, nil + // At this point only sync and async type requests may exist so lets + // make sure this is the case. + if opts.Type == SyncRequest { + if serverResp.Type != "sync" { + return nil, fmt.Errorf("expected sync response, got %q", serverResp.Type) + } + } else if opts.Type == AsyncRequest { + if serverResp.Type != "async" { + return nil, fmt.Errorf("expected async response for %q on %q, got %q", opts.Method, opts.Path, serverResp.Type) + } + if serverResp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("operation not accepted") + } + if serverResp.Change == "" { + return nil, fmt.Errorf("async response without change reference") + } + } else { + return nil, fmt.Errorf("cannot process unknown request type") + } + + // Warnings are only included if not an error type response + br.client.warningCount = serverResp.WarningCount + br.client.warningTimestamp = serverResp.WarningTimestamp + + // Common response + return &RequestResponse{ + StatusCode: serverResp.StatusCode, + ChangeID: serverResp.Change, + Result: serverResp.Result, + }, nil } func decodeInto(reader io.Reader, v interface{}) error { @@ -360,24 +410,41 @@ func decodeInto(reader io.Reader, v interface{}) error { } func (client *Client) doSync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*RequestResponse, error) { - return client.Requester().Do(context.Background(), &RequestOptions{ + resp, err := client.Requester().Do(context.Background(), &RequestOptions{ + Type: SyncRequest, Method: method, Path: path, Query: query, Headers: headers, Body: body, - }, v) + }) + if err != nil { + return nil, err + } + err = resp.DecodeResult(v) + if err != nil { + return nil, err + } + return resp, nil } func (client *Client) doAsync(method, path string, query url.Values, headers map[string]string, body io.Reader, v interface{}) (*RequestResponse, error) { - return client.Requester().Do(context.Background(), &RequestOptions{ + resp, err := client.Requester().Do(context.Background(), &RequestOptions{ + Type: AsyncRequest, Method: method, Path: path, Query: query, Headers: headers, Body: body, - Async: true, - }, v) + }) + if err != nil { + return nil, err + } + err = resp.DecodeResult(v) + if err != nil { + return nil, err + } + return resp, nil } // A response produced by the REST API will usually fit in this @@ -496,65 +563,6 @@ func (client *Client) DebugGet(action string, result interface{}, params map[str return err } -// decoder receives a raw HTTP response and performs internal client -// processing, as well as unmarshalling the custom result. -func (client *Client) decoder(ctx context.Context, rsp *http.Response, opts *RequestOptions, result interface{}) (*RequestResponse, error) { - var serverResp response - if err := decodeInto(rsp.Body, &serverResp); err != nil { - return nil, err - } - - // Update the maintenance error state - if serverResp.Maintenance != nil { - client.maintenance = serverResp.Maintenance - } else { - client.maintenance = nil - } - - // Deal with error type response - if err := serverResp.err(); err != nil { - return nil, err - } - - // At this point only sync and async type requests may exist so lets - // make sure this is the case. - // - // Tests depend on the order or checks, so lets keep the order unchanged - // and deal with these before decode. - if opts.Async == false { - if serverResp.Type != "sync" { - return nil, fmt.Errorf("expected sync response, got %q", serverResp.Type) - } - } else { - if serverResp.Type != "async" { - return nil, fmt.Errorf("expected async response for %q on %q, got %q", opts.Method, opts.Path, serverResp.Type) - } - if serverResp.StatusCode != http.StatusAccepted { - return nil, fmt.Errorf("operation not accepted") - } - if serverResp.Change == "" { - return nil, fmt.Errorf("async response without change reference") - } - } - - // Warnings are only included if not an error type response - client.warningCount = serverResp.WarningCount - client.warningTimestamp = serverResp.WarningTimestamp - - // Decode the supplied result type - if result != nil { - if err := decodeWithNumber(bytes.NewReader(serverResp.Result), result); err != nil { - return nil, fmt.Errorf("cannot unmarshal: %w", err) - } - } - - // Common response - return &RequestResponse{ - StatusCode: serverResp.StatusCode, - ChangeID: serverResp.Change, - }, nil -} - type DefaultRequesterConfig struct { // BaseURL contains the base URL where the Pebble daemon is expected to be. // It can be empty for a default behavior of talking over a unix socket. @@ -576,10 +584,10 @@ type DefaultRequester struct { doer doer userAgent string transport *http.Transport - decoder DecoderFunc + client *Client } -func NewDefaultRequester(opts *DefaultRequesterConfig) (*DefaultRequester, error) { +func NewDefaultRequester(client *Client, opts *DefaultRequesterConfig) (*DefaultRequester, error) { if opts == nil { opts = &DefaultRequesterConfig{} } @@ -603,14 +611,11 @@ func NewDefaultRequester(opts *DefaultRequesterConfig) (*DefaultRequester, error requester.doer = &http.Client{Transport: requester.transport} requester.userAgent = opts.UserAgent + requester.client = client return requester, nil } -func (br *DefaultRequester) SetDecoder(decoder DecoderFunc) { - br.decoder = decoder -} - func (br *DefaultRequester) Transport() *http.Transport { return br.transport } diff --git a/client/export_test.go b/client/export_test.go index f78270dca..8731153f9 100644 --- a/client/export_test.go +++ b/client/export_test.go @@ -32,13 +32,13 @@ func (client *Client) SetDoer(d doer) { func (client *Client) Do(method, path string, query url.Values, body io.Reader, v interface{}) error { resp, err := client.Requester().Do(context.Background(), &RequestOptions{ - Method: method, - Path: path, - Query: query, - Headers: nil, - Body: body, - ReturnBody: true, - }, nil) + Type: RawRequest, + Method: method, + Path: path, + Query: query, + Headers: nil, + Body: body, + }) if err != nil { return err } diff --git a/client/logs.go b/client/logs.go index a73a37368..21581652d 100644 --- a/client/logs.go +++ b/client/logs.go @@ -74,11 +74,11 @@ func (client *Client) logs(ctx context.Context, opts *LogsOptions, follow bool) query.Set("follow", "true") } resp, err := client.Requester().Do(ctx, &RequestOptions{ - Method: "GET", - Path: "/v1/logs", - Query: query, - ReturnBody: true, - }, nil) + Type: RawRequest, + Method: "GET", + Path: "/v1/logs", + Query: query, + }) if err != nil { return err } From 7f3cdb310de8917934db3e419af865118b41c32c Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Mon, 9 Oct 2023 17:22:43 +0200 Subject: [PATCH 08/11] Code Review changes 1 --- client/client.go | 139 +++++++++++++++++------------------------- client/exec.go | 4 +- client/export_test.go | 9 ++- client/services.go | 4 +- 4 files changed, 65 insertions(+), 91 deletions(-) diff --git a/client/client.go b/client/client.go index 093a39ad9..ba2d3f9b1 100644 --- a/client/client.go +++ b/client/client.go @@ -45,12 +45,11 @@ type Requester interface { type RequestType int const ( - SyncRequest RequestType = iota + RawRequest RequestType = iota + SyncRequest AsyncRequest - RawRequest ) -// RequestOptions allows setting up a specific request. type RequestOptions struct { Type RequestType Method string @@ -60,24 +59,22 @@ type RequestOptions struct { Body io.Reader } -// RequestResponse defines a common response associated with requests. type RequestResponse struct { StatusCode int ChangeID string Result []byte - // Only set for RawRequest and must be completely read and closed. + // Only set for RawRequest. Body io.ReadCloser } -// DecodeResult can be used to decode a SyncRequest or AsyncRequest. +// DecodeResult decodes the endpoint-specific result payload that is included as part of +// sync and async request responses. The decoding is performed with the standard JSON +// package, so the usual field tags should be used to prepare the type for decoding. func (resp *RequestResponse) DecodeResult(result interface{}) error { - if result != nil { - if err := decodeWithNumber(bytes.NewReader(resp.Result), result); err != nil { - return fmt.Errorf("cannot unmarshal: %w", err) - } + if err := decodeWithNumber(bytes.NewReader(resp.Result), result); err != nil { + return fmt.Errorf("cannot unmarshal: %w", err) } - return nil } @@ -181,12 +178,7 @@ func New(config *Config) (*Client, error) { } client := &Client{} - requester, err := NewDefaultRequester(client, &DefaultRequesterConfig{ - Socket: config.Socket, - BaseURL: config.BaseURL, - DisableKeepAlive: config.DisableKeepAlive, - UserAgent: config.UserAgent, - }) + requester, err := newDefaultRequester(client, config) if err != nil { return nil, err } @@ -256,26 +248,24 @@ func (e ConnectionError) Unwrap() error { return e.error } -// raw creates an HTTP request, performs the request and returns an HTTP response -// and error. -func (br *DefaultRequester) raw(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { +func (rq *defaultRequester) dispatch(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { // fake a url to keep http.Client happy - u := br.baseURL - u.Path = path.Join(br.baseURL.Path, urlpath) + u := rq.baseURL + u.Path = path.Join(rq.baseURL.Path, urlpath) u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, method, u.String(), body) if err != nil { return nil, RequestError{err} } - if br.userAgent != "" { - req.Header.Set("User-Agent", br.userAgent) + if rq.userAgent != "" { + req.Header.Set("User-Agent", rq.userAgent) } for key, value := range headers { req.Header.Set(key, value) } - rsp, err := br.doer.Do(req) + rsp, err := rq.doer.Do(req) if err != nil { return nil, ConnectionError{err} } @@ -284,32 +274,32 @@ func (br *DefaultRequester) raw(ctx context.Context, method, urlpath string, que } var ( - rawRetry = 250 * time.Millisecond - rawTimeout = 5 * time.Second + doRetry = 250 * time.Millisecond + doTimeout = 5 * time.Second ) // FakeDoRetry fakes the delays used by the do retry loop (intended for // testing). Calling restore will revert the changes. func FakeDoRetry(retry, timeout time.Duration) (restore func()) { - oldRetry := rawRetry - oldTimeout := rawTimeout - rawRetry = retry - rawTimeout = timeout + oldRetry := doRetry + oldTimeout := doTimeout + doRetry = retry + doTimeout = timeout return func() { - rawRetry = oldRetry - rawTimeout = oldTimeout + doRetry = oldRetry + doTimeout = oldTimeout } } -// rawWithRetry builds in a retry mechanism for GET failures (body-less request) -func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { - retry := time.NewTicker(rawRetry) +// retry builds in a retry mechanism for GET failures. +func (rq *defaultRequester) retry(ctx context.Context, method, urlpath string, query url.Values, headers map[string]string, body io.Reader) (*http.Response, error) { + retry := time.NewTicker(doRetry) defer retry.Stop() - timeout := time.After(rawTimeout) + timeout := time.After(doTimeout) var rsp *http.Response var err error for { - rsp, err = br.raw(ctx, method, urlpath, query, headers, body) + rsp, err = rq.dispatch(ctx, method, urlpath, query, headers, body) if err == nil || method != "GET" { break } @@ -326,27 +316,20 @@ func (br *DefaultRequester) rawWithRetry(ctx context.Context, method, urlpath st return rsp, nil } -// Do implements all the required functionality as defined by the Requester interface. RequestOptions -// selects the Do behaviour. In case of a successful response, the result argument get a -// request specific result. The RequestResponse struct will include common attributes -// (not all will be set for all request types). -func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions) (*RequestResponse, error) { - httpResp, err := br.rawWithRetry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) +// Do performs the HTTP request according to the provided options, possibly retrying GET requests +// if appropriate for the status reported by the server. +func (rq *defaultRequester) Do(ctx context.Context, opts *RequestOptions) (*RequestResponse, error) { + httpResp, err := rq.retry(ctx, opts.Method, opts.Path, opts.Query, opts.Headers, opts.Body) if err != nil { return nil, err } // Is the result expecting a caller-managed raw body? if opts.Type == RawRequest { - return &RequestResponse{ - Body: httpResp.Body, - }, nil + return &RequestResponse{Body: httpResp.Body}, nil } - // If we get here, this is a normal sync or async server request so - // we have to close the body. defer httpResp.Body.Close() - var serverResp response if err := decodeInto(httpResp.Body, &serverResp); err != nil { return nil, err @@ -354,9 +337,9 @@ func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions) (*Requ // Update the maintenance error state if serverResp.Maintenance != nil { - br.client.maintenance = serverResp.Maintenance + rq.client.maintenance = serverResp.Maintenance } else { - br.client.maintenance = nil + rq.client.maintenance = nil } // Deal with error type response @@ -385,8 +368,8 @@ func (br *DefaultRequester) Do(ctx context.Context, opts *RequestOptions) (*Requ } // Warnings are only included if not an error type response - br.client.warningCount = serverResp.WarningCount - br.client.warningTimestamp = serverResp.WarningTimestamp + rq.client.warningCount = serverResp.WarningCount + rq.client.warningTimestamp = serverResp.WarningTimestamp // Common response return &RequestResponse{ @@ -421,9 +404,11 @@ func (client *Client) doSync(method, path string, query url.Values, headers map[ if err != nil { return nil, err } - err = resp.DecodeResult(v) - if err != nil { - return nil, err + if v != nil { + err = resp.DecodeResult(v) + if err != nil { + return nil, err + } } return resp, nil } @@ -440,9 +425,11 @@ func (client *Client) doAsync(method, path string, query url.Values, headers map if err != nil { return nil, err } - err = resp.DecodeResult(v) - if err != nil { - return nil, err + if v != nil { + err = resp.DecodeResult(v) + if err != nil { + return nil, err + } } return resp, nil } @@ -563,23 +550,7 @@ func (client *Client) DebugGet(action string, result interface{}, params map[str return err } -type DefaultRequesterConfig struct { - // BaseURL contains the base URL where the Pebble daemon is expected to be. - // It can be empty for a default behavior of talking over a unix socket. - BaseURL string - - // Socket is the path to the unix socket to use. - Socket string - - // DisableKeepAlive indicates that the connections should not be kept - // alive for later reuse (the default is to keep them alive). - DisableKeepAlive bool - - // UserAgent is the User-Agent header sent to the Pebble daemon. - UserAgent string -} - -type DefaultRequester struct { +type defaultRequester struct { baseURL url.URL doer doer userAgent string @@ -587,18 +558,18 @@ type DefaultRequester struct { client *Client } -func NewDefaultRequester(client *Client, opts *DefaultRequesterConfig) (*DefaultRequester, error) { +func newDefaultRequester(client *Client, opts *Config) (*defaultRequester, error) { if opts == nil { - opts = &DefaultRequesterConfig{} + opts = &Config{} } - var requester *DefaultRequester + var requester *defaultRequester if opts.BaseURL == "" { // By default talk over a unix socket. transport := &http.Transport{Dial: unixDialer(opts.Socket), DisableKeepAlives: opts.DisableKeepAlive} baseURL := url.URL{Scheme: "http", Host: "localhost"} - requester = &DefaultRequester{baseURL: baseURL, transport: transport} + requester = &defaultRequester{baseURL: baseURL, transport: transport} } else { // Otherwise talk regular HTTP-over-TCP. baseURL, err := url.Parse(opts.BaseURL) @@ -606,7 +577,7 @@ func NewDefaultRequester(client *Client, opts *DefaultRequesterConfig) (*Default return nil, fmt.Errorf("cannot parse base URL: %v", err) } transport := &http.Transport{DisableKeepAlives: opts.DisableKeepAlive} - requester = &DefaultRequester{baseURL: *baseURL, transport: transport} + requester = &defaultRequester{baseURL: *baseURL, transport: transport} } requester.doer = &http.Client{Transport: requester.transport} @@ -616,6 +587,6 @@ func NewDefaultRequester(client *Client, opts *DefaultRequesterConfig) (*Default return requester, nil } -func (br *DefaultRequester) Transport() *http.Transport { - return br.transport +func (rq *defaultRequester) Transport() *http.Transport { + return rq.transport } diff --git a/client/exec.go b/client/exec.go index 8aab7780c..5b9d3d5ae 100644 --- a/client/exec.go +++ b/client/exec.go @@ -153,7 +153,7 @@ func (client *Client) Exec(opts *ExecOptions) (*ExecProcess, error) { "Content-Type": "application/json", } var result execResult - reqResponse, err := client.doAsync("POST", "/v1/exec", nil, headers, &body, &result) + resp, err := client.doAsync("POST", "/v1/exec", nil, headers, &body, &result) if err != nil { return nil, err } @@ -207,7 +207,7 @@ func (client *Client) Exec(opts *ExecOptions) (*ExecProcess, error) { }() process := &ExecProcess{ - changeID: reqResponse.ChangeID, + changeID: resp.ChangeID, client: client, timeout: opts.Timeout, writesDone: writesDone, diff --git a/client/export_test.go b/client/export_test.go index 8731153f9..be4b7a14a 100644 --- a/client/export_test.go +++ b/client/export_test.go @@ -27,9 +27,12 @@ var ( ) func (client *Client) SetDoer(d doer) { - client.Requester().(*DefaultRequester).doer = d + client.Requester().(*defaultRequester).doer = d } +// TODO: Clean up tests to use the new Requester API. Tests do not generate a client.response type +// reply in the body while SyncRequest or AsyncRequest responses assume the JSON body can be +// unmarshalled into client.response. func (client *Client) Do(method, path string, query url.Values, body io.Reader, v interface{}) error { resp, err := client.Requester().Do(context.Background(), &RequestOptions{ Type: RawRequest, @@ -50,11 +53,11 @@ func (client *Client) Do(method, path string, query url.Values, body io.Reader, } func (client *Client) FakeAsyncRequest() (changeId string, err error) { - reqResponse, err := client.doAsync("GET", "/v1/async-test", nil, nil, nil, nil) + resp, err := client.doAsync("GET", "/v1/async-test", nil, nil, nil, nil) if err != nil { return "", fmt.Errorf("cannot do async test: %v", err) } - return reqResponse.ChangeID, nil + return resp.ChangeID, nil } func (client *Client) SetGetWebsocket(f getWebsocketFunc) { diff --git a/client/services.go b/client/services.go index 42487a8c8..0143cf627 100644 --- a/client/services.go +++ b/client/services.go @@ -78,11 +78,11 @@ func (client *Client) doMultiServiceAction(actionName string, services []string) "Content-Type": "application/json", } - reqResponse, err := client.doAsync("POST", "/v1/services", nil, headers, bytes.NewBuffer(data), nil) + resp, err := client.doAsync("POST", "/v1/services", nil, headers, bytes.NewBuffer(data), nil) if err != nil { return "", err } - return reqResponse.ChangeID, nil + return resp.ChangeID, nil } type ServicesOptions struct { From c93917fc60bf548f3933fef13f1f9da809d603dc Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Tue, 10 Oct 2023 09:35:31 +0200 Subject: [PATCH 09/11] Code Review changes 2 --- client/client.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/client/client.go b/client/client.go index ba2d3f9b1..0266d479f 100644 --- a/client/client.go +++ b/client/client.go @@ -61,10 +61,13 @@ type RequestOptions struct { type RequestResponse struct { StatusCode int - ChangeID string - Result []byte - - // Only set for RawRequest. + // ChangeID is typically set when an AsyncRequest type is performed. The + // change id allows for introspection and progress tracking of the request. + ChangeID string + // Result can contain request specific JSON data. The result can be + // unmarshalled into the expected type using the DecodeResult method. + Result []byte + // Body is only set for request type RawRequest. Body io.ReadCloser } @@ -339,6 +342,10 @@ func (rq *defaultRequester) Do(ctx context.Context, opts *RequestOptions) (*Requ if serverResp.Maintenance != nil { rq.client.maintenance = serverResp.Maintenance } else { + // We cannot assign a nil pointer of type *Error to an + // interface here because the interface is only nil if + // both the type and value is nil. + // https://go.dev/doc/faq#nil_error rq.client.maintenance = nil } @@ -349,11 +356,12 @@ func (rq *defaultRequester) Do(ctx context.Context, opts *RequestOptions) (*Requ // At this point only sync and async type requests may exist so lets // make sure this is the case. - if opts.Type == SyncRequest { + switch opts.Type { + case SyncRequest: if serverResp.Type != "sync" { return nil, fmt.Errorf("expected sync response, got %q", serverResp.Type) } - } else if opts.Type == AsyncRequest { + case AsyncRequest: if serverResp.Type != "async" { return nil, fmt.Errorf("expected async response for %q on %q, got %q", opts.Method, opts.Path, serverResp.Type) } @@ -363,7 +371,7 @@ func (rq *defaultRequester) Do(ctx context.Context, opts *RequestOptions) (*Requ if serverResp.Change == "" { return nil, fmt.Errorf("async response without change reference") } - } else { + default: return nil, fmt.Errorf("cannot process unknown request type") } @@ -469,7 +477,7 @@ const ( ErrorKindNoDefaultServices = "no-default-services" ) -// err extract the error in case of an error type response +// err extracts the error in case of an error type response func (rsp *response) err() error { if rsp.Type != "error" { return nil @@ -574,7 +582,7 @@ func newDefaultRequester(client *Client, opts *Config) (*defaultRequester, error // Otherwise talk regular HTTP-over-TCP. baseURL, err := url.Parse(opts.BaseURL) if err != nil { - return nil, fmt.Errorf("cannot parse base URL: %v", err) + return nil, fmt.Errorf("cannot parse base URL: %w", err) } transport := &http.Transport{DisableKeepAlives: opts.DisableKeepAlive} requester = &defaultRequester{baseURL: *baseURL, transport: transport} From 6a72a59ad01da8c4bd4ea327285084e18305d2dc Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Tue, 10 Oct 2023 14:09:57 +0200 Subject: [PATCH 10/11] Code Review changes 3 --- client/client.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/client/client.go b/client/client.go index 68c716521..af3505ecd 100644 --- a/client/client.go +++ b/client/client.go @@ -75,9 +75,15 @@ type RequestResponse struct { // sync and async request responses. The decoding is performed with the standard JSON // package, so the usual field tags should be used to prepare the type for decoding. func (resp *RequestResponse) DecodeResult(result interface{}) error { - if err := decodeWithNumber(bytes.NewReader(resp.Result), result); err != nil { + reader := bytes.NewReader(resp.Result) + dec := json.NewDecoder(reader) + dec.UseNumber() + if err := dec.Decode(&result); err != nil { return fmt.Errorf("cannot unmarshal: %w", err) } + if dec.More() { + return fmt.Errorf("cannot unmarshal: cannot parse json value") + } return nil } @@ -102,20 +108,6 @@ func (s SocketNotFoundError) Unwrap() error { return s.Err } -// decodeWithNumber decodes input data using json.Decoder, ensuring numbers are preserved -// via json.Number data type. It errors out on invalid json or any excess input. -func decodeWithNumber(r io.Reader, value interface{}) error { - dec := json.NewDecoder(r) - dec.UseNumber() - if err := dec.Decode(&value); err != nil { - return err - } - if dec.More() { - return fmt.Errorf("cannot parse json value") - } - return nil -} - func unixDialer(socketPath string) func(string, string) (net.Conn, error) { return func(_, _ string) (net.Conn, error) { _, err := os.Stat(socketPath) From c208b42539940f39966968f34b1432ddb77ca5c3 Mon Sep 17 00:00:00 2001 From: Fred Lotter Date: Thu, 12 Oct 2023 12:36:43 +0200 Subject: [PATCH 11/11] Comment Tweak --- client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index af3505ecd..33084e7bf 100644 --- a/client/client.go +++ b/client/client.go @@ -367,7 +367,8 @@ func (rq *defaultRequester) Do(ctx context.Context, opts *RequestOptions) (*Requ return nil, fmt.Errorf("cannot process unknown request type") } - // Warnings are only included if not an error type response + // Warnings are only included if not an error type response, so we don't + // replace valid local warnings with an empty state that comes from a failure. rq.client.warningCount = serverResp.WarningCount rq.client.warningTimestamp = serverResp.WarningTimestamp