Skip to content

Commit

Permalink
client-go/rest: contextual logging of request/response
Browse files Browse the repository at this point in the history
Logging in rest.Request.Body cannot be made context-aware without an API
change. Such a change is complicated if done in a backwards-compatible
fashion (must change lots of callers in Kubernetes) and prohibitive if not (all
callers of Body would have to pass a context).

Instead, logging of the request body gets moved into the functions which send
the request. This is a change of behavior, but it is limited to log levels >= 8
and thus should have no impact in production.

A request which gets sent multiple times will also log the body multiple
times. This might even be a good thing because it serves as reminder what is
being sent when it is being sent.

While at it, stack backtracing gets enhanced so that the caller of the REST API
is logged and tests for the new behavior get added.

Kubernetes-commit: 57f9b7c7a2412865e7817dbf7638881b00ac9721
  • Loading branch information
pohly authored and k8s-publishing-bot committed Aug 29, 2024
1 parent c5e16f8 commit 3d02d42
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 22 deletions.
58 changes: 40 additions & 18 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,9 @@ func (r *Request) Body(obj interface{}) *Request {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = nil
r.bodyBytes = data
case []byte:
glogBody("Request Body", t)
r.body = nil
r.bodyBytes = t
case io.Reader:
Expand All @@ -475,7 +473,6 @@ func (r *Request) Body(obj interface{}) *Request {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = nil
r.bodyBytes = data
r.SetHeader("Content-Type", r.c.content.ContentType)
Expand Down Expand Up @@ -704,6 +701,10 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

// We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here.
if r.err != nil {
Expand Down Expand Up @@ -752,7 +753,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// the server must have sent us an error in 'err'
return true, nil
}
result := r.transformResponse(resp, req)
result := r.transformResponse(ctx, resp, req)
if err := result.Error(); err != nil {
return true, err
}
Expand Down Expand Up @@ -845,6 +846,10 @@ func (r WatchListResult) Into(obj runtime.Object) error {
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
// to see what parameters are currently required.
func (r *Request) WatchList(ctx context.Context) WatchListResult {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
}
Expand Down Expand Up @@ -969,6 +974,10 @@ func sanitize(req *Request, resp *http.Response, err error) (string, string) {
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

if r.err != nil {
return nil, r.err
}
Expand Down Expand Up @@ -1012,7 +1021,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
return false, nil
}
result := r.transformResponse(resp, req)
result := r.transformResponse(ctx, resp, req)
if err := result.Error(); err != nil {
return true, err
}
Expand Down Expand Up @@ -1199,9 +1208,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// - http.Client.Do errors are returned directly.
func (r *Request) Do(ctx context.Context) Result {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
result = r.transformResponse(ctx, resp, req)
})
if err != nil {
return Result{err: err}
Expand All @@ -1214,10 +1227,14 @@ func (r *Request) Do(ctx context.Context) Result {

// DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result.body, result.err = io.ReadAll(resp.Body)
glogBody("Response Body", result.body)
logBody(ctx, 2, "Response Body", result.body)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
result.err = r.transformUnstructuredResponseError(resp, req, result.body)
}
Expand All @@ -1232,7 +1249,7 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
}

// transformResponse converts an API response into a structured API object
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result {
var body []byte
if resp.Body != nil {
data, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -1261,7 +1278,8 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}
}

glogBody("Response Body", body)
// Call depth is tricky. This one is okay for Do and DoRaw.
logBody(ctx, 7, "Response Body", body)

// verify the content type is accurate
var decoder runtime.Decoder
Expand Down Expand Up @@ -1321,14 +1339,14 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}

// truncateBody decides if the body should be truncated, based on the glog Verbosity.
func truncateBody(body string) string {
func truncateBody(logger klog.Logger, body string) string {
max := 0
switch {
case bool(klog.V(10).Enabled()):
case bool(logger.V(10).Enabled()):
return body
case bool(klog.V(9).Enabled()):
case bool(logger.V(9).Enabled()):
max = 10240
case bool(klog.V(8).Enabled()):
case bool(logger.V(8).Enabled()):
max = 1024
}

Expand All @@ -1339,17 +1357,21 @@ func truncateBody(body string) string {
return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
}

// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// logBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable.
func glogBody(prefix string, body []byte) {
if klogV := klog.V(8); klogV.Enabled() {
//
// It needs to be called by all functions which send or receive the data.
func logBody(ctx context.Context, callDepth int, prefix string, body []byte) {
logger := klog.FromContext(ctx)
if loggerV := logger.V(8); loggerV.Enabled() {
loggerV := loggerV.WithCallDepth(callDepth)
if bytes.IndexFunc(body, func(r rune) bool {
return r < 0x0a
}) != -1 {
klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
loggerV.Info(prefix, "body", truncateBody(logger, hex.Dump(body)))
} else {
klogV.Infof("%s: %s", prefix, truncateBody(string(body)))
loggerV.Info(prefix, "body", truncateBody(logger, string(body)))
}
}
}
Expand Down
93 changes: 89 additions & 4 deletions rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"reflect"
"regexp"
goruntime "runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -54,6 +60,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
testingclock "k8s.io/utils/clock/testing"
)

Expand Down Expand Up @@ -553,6 +560,7 @@ func TestURLTemplate(t *testing.T) {
}

func TestTransformResponse(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost")
testCases := []struct {
Expand Down Expand Up @@ -601,7 +609,7 @@ func TestTransformResponse(t *testing.T) {
if test.Response.Body == nil {
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
}
result := r.transformResponse(test.Response, &http.Request{})
result := r.transformResponse(ctx, test.Response, &http.Request{})
response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
hasErr := err != nil
if hasErr != test.Error {
Expand Down Expand Up @@ -652,6 +660,7 @@ func (r *renegotiator) StreamDecoder(contentType string, params map[string]strin
}

func TestTransformResponseNegotiate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost")
testCases := []struct {
Expand Down Expand Up @@ -765,7 +774,7 @@ func TestTransformResponseNegotiate(t *testing.T) {
if test.Response.Body == nil {
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
}
result := r.transformResponse(test.Response, &http.Request{})
result := r.transformResponse(ctx, test.Response, &http.Request{})
_, err := result.body, result.err
hasErr := err != nil
if hasErr != test.Error {
Expand Down Expand Up @@ -890,14 +899,15 @@ func TestTransformUnstructuredError(t *testing.T) {

for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
r := &Request{
c: &RESTClient{
content: defaultContentConfig(),
},
resourceName: testCase.Name,
resource: testCase.Resource,
}
result := r.transformResponse(testCase.Res, testCase.Req)
result := r.transformResponse(ctx, testCase.Res, testCase.Req)
err := result.err
if !testCase.ErrFn(err) {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -2331,7 +2341,7 @@ func TestTruncateBody(t *testing.T) {
l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
for _, test := range tests {
flag.Set("v", test.level)
got := truncateBody(test.body)
got := truncateBody(klog.Background(), test.body)
if got != test.want {
t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
}
Expand Down Expand Up @@ -4051,3 +4061,78 @@ func TestRequestConcurrencyWithRetry(t *testing.T) {
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
}
}

func TestRequestLogging(t *testing.T) {
testcases := map[string]struct {
v int
body any
expectedOutput string
}{
"no-output": {
v: 7,
body: []byte("ping"),
},
"output": {
v: 8,
body: []byte("ping"),
expectedOutput: `<location>] "Request Body" logger="TestLogger" body="ping"
<location>] "Response Body" logger="TestLogger" body="pong"
`,
},
"io-reader": {
v: 8,
body: strings.NewReader("ping"),
// Cannot log the request body!
expectedOutput: `<location>] "Response Body" logger="TestLogger" body="pong"
`,
},
"truncate": {
v: 8,
body: []byte(strings.Repeat("a", 2000)),
expectedOutput: fmt.Sprintf(`<location>] "Request Body" logger="TestLogger" body="%s [truncated 976 chars]"
<location>] "Response Body" logger="TestLogger" body="pong"
`, strings.Repeat("a", 1024)),
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
state := klog.CaptureState()
defer state.Restore()

var buffer bytes.Buffer
klog.SetOutput(&buffer)
klog.LogToStderr(false)
var fs flag.FlagSet
klog.InitFlags(&fs)
require.NoError(t, fs.Set("v", fmt.Sprintf("%d", tc.v)), "set verbosity")

client := clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("pong")),
}, nil
})

req := NewRequestWithClient(nil, "", defaultContentConfig(), client).
Body(tc.body)

logger := klog.Background()
logger = klog.LoggerWithName(logger, "TestLogger")
ctx := klog.NewContext(context.Background(), logger)

_, file, line, _ := goruntime.Caller(0)
result := req.Do(ctx)
require.NoError(t, result.Error(), "request.Do")

// Compare log output:
// - strip date/time/pid from each line (fixed length header)
// - replace <location> with the actual call location
state.Restore()
expectedOutput := strings.ReplaceAll(tc.expectedOutput, "<location>", fmt.Sprintf("%s:%d", path.Base(file), line+1))
actualOutput := buffer.String()
actualOutput = regexp.MustCompile(`(?m)^.{30}`).ReplaceAllString(actualOutput, "")
assert.Equal(t, expectedOutput, actualOutput)
})
}
}

0 comments on commit 3d02d42

Please sign in to comment.