Skip to content

Commit

Permalink
Merge pull request #126999 from pohly/log-client-go-rest-body
Browse files Browse the repository at this point in the history
client-go/rest: contextual logging of request/response

Kubernetes-commit: 9e597655855771bd241508fab4501ac099e29f16
  • Loading branch information
k8s-publishing-bot committed Sep 12, 2024
2 parents c5e16f8 + 3d02d42 commit 53c7e93
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 22 deletions.
58 changes: 40 additions & 18 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,9 @@ func (r *Request) Body(obj interface{}) *Request {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = nil
r.bodyBytes = data
case []byte:
glogBody("Request Body", t)
r.body = nil
r.bodyBytes = t
case io.Reader:
Expand All @@ -475,7 +473,6 @@ func (r *Request) Body(obj interface{}) *Request {
r.err = err
return r
}
glogBody("Request Body", data)
r.body = nil
r.bodyBytes = data
r.SetHeader("Content-Type", r.c.content.ContentType)
Expand Down Expand Up @@ -704,6 +701,10 @@ func (b *throttledLogger) Infof(message string, args ...interface{}) {
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

// We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here.
if r.err != nil {
Expand Down Expand Up @@ -752,7 +753,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// the server must have sent us an error in 'err'
return true, nil
}
result := r.transformResponse(resp, req)
result := r.transformResponse(ctx, resp, req)
if err := result.Error(); err != nil {
return true, err
}
Expand Down Expand Up @@ -845,6 +846,10 @@ func (r WatchListResult) Into(obj runtime.Object) error {
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
// to see what parameters are currently required.
func (r *Request) WatchList(ctx context.Context) WatchListResult {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
}
Expand Down Expand Up @@ -969,6 +974,10 @@ func sanitize(req *Request, resp *http.Response, err error) (string, string) {
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

if r.err != nil {
return nil, r.err
}
Expand Down Expand Up @@ -1012,7 +1021,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
return false, nil
}
result := r.transformResponse(resp, req)
result := r.transformResponse(ctx, resp, req)
if err := result.Error(); err != nil {
return true, err
}
Expand Down Expand Up @@ -1199,9 +1208,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// - http.Client.Do errors are returned directly.
func (r *Request) Do(ctx context.Context) Result {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
result = r.transformResponse(ctx, resp, req)
})
if err != nil {
return Result{err: err}
Expand All @@ -1214,10 +1227,14 @@ func (r *Request) Do(ctx context.Context) Result {

// DoRaw executes the request but does not process the response body.
func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
if r.body == nil {
logBody(ctx, 2, "Request Body", r.bodyBytes)
}

var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result.body, result.err = io.ReadAll(resp.Body)
glogBody("Response Body", result.body)
logBody(ctx, 2, "Response Body", result.body)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
result.err = r.transformUnstructuredResponseError(resp, req, result.body)
}
Expand All @@ -1232,7 +1249,7 @@ func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
}

// transformResponse converts an API response into a structured API object
func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result {
var body []byte
if resp.Body != nil {
data, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -1261,7 +1278,8 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}
}

glogBody("Response Body", body)
// Call depth is tricky. This one is okay for Do and DoRaw.
logBody(ctx, 7, "Response Body", body)

// verify the content type is accurate
var decoder runtime.Decoder
Expand Down Expand Up @@ -1321,14 +1339,14 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
}

// truncateBody decides if the body should be truncated, based on the glog Verbosity.
func truncateBody(body string) string {
func truncateBody(logger klog.Logger, body string) string {
max := 0
switch {
case bool(klog.V(10).Enabled()):
case bool(logger.V(10).Enabled()):
return body
case bool(klog.V(9).Enabled()):
case bool(logger.V(9).Enabled()):
max = 10240
case bool(klog.V(8).Enabled()):
case bool(logger.V(8).Enabled()):
max = 1024
}

Expand All @@ -1339,17 +1357,21 @@ func truncateBody(body string) string {
return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
}

// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// logBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable.
func glogBody(prefix string, body []byte) {
if klogV := klog.V(8); klogV.Enabled() {
//
// It needs to be called by all functions which send or receive the data.
func logBody(ctx context.Context, callDepth int, prefix string, body []byte) {
logger := klog.FromContext(ctx)
if loggerV := logger.V(8); loggerV.Enabled() {
loggerV := loggerV.WithCallDepth(callDepth)
if bytes.IndexFunc(body, func(r rune) bool {
return r < 0x0a
}) != -1 {
klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
loggerV.Info(prefix, "body", truncateBody(logger, hex.Dump(body)))
} else {
klogV.Infof("%s: %s", prefix, truncateBody(string(body)))
loggerV.Info(prefix, "body", truncateBody(logger, string(body)))
}
}
}
Expand Down
93 changes: 89 additions & 4 deletions rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"reflect"
"regexp"
goruntime "runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -54,6 +60,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
testingclock "k8s.io/utils/clock/testing"
)

Expand Down Expand Up @@ -553,6 +560,7 @@ func TestURLTemplate(t *testing.T) {
}

func TestTransformResponse(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost")
testCases := []struct {
Expand Down Expand Up @@ -601,7 +609,7 @@ func TestTransformResponse(t *testing.T) {
if test.Response.Body == nil {
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
}
result := r.transformResponse(test.Response, &http.Request{})
result := r.transformResponse(ctx, test.Response, &http.Request{})
response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
hasErr := err != nil
if hasErr != test.Error {
Expand Down Expand Up @@ -652,6 +660,7 @@ func (r *renegotiator) StreamDecoder(contentType string, params map[string]strin
}

func TestTransformResponseNegotiate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
invalid := []byte("aaaaa")
uri, _ := url.Parse("http://localhost")
testCases := []struct {
Expand Down Expand Up @@ -765,7 +774,7 @@ func TestTransformResponseNegotiate(t *testing.T) {
if test.Response.Body == nil {
test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
}
result := r.transformResponse(test.Response, &http.Request{})
result := r.transformResponse(ctx, test.Response, &http.Request{})
_, err := result.body, result.err
hasErr := err != nil
if hasErr != test.Error {
Expand Down Expand Up @@ -890,14 +899,15 @@ func TestTransformUnstructuredError(t *testing.T) {

for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
r := &Request{
c: &RESTClient{
content: defaultContentConfig(),
},
resourceName: testCase.Name,
resource: testCase.Resource,
}
result := r.transformResponse(testCase.Res, testCase.Req)
result := r.transformResponse(ctx, testCase.Res, testCase.Req)
err := result.err
if !testCase.ErrFn(err) {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -2331,7 +2341,7 @@ func TestTruncateBody(t *testing.T) {
l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
for _, test := range tests {
flag.Set("v", test.level)
got := truncateBody(test.body)
got := truncateBody(klog.Background(), test.body)
if got != test.want {
t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
}
Expand Down Expand Up @@ -4051,3 +4061,78 @@ func TestRequestConcurrencyWithRetry(t *testing.T) {
t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
}
}

func TestRequestLogging(t *testing.T) {
testcases := map[string]struct {
v int
body any
expectedOutput string
}{
"no-output": {
v: 7,
body: []byte("ping"),
},
"output": {
v: 8,
body: []byte("ping"),
expectedOutput: `<location>] "Request Body" logger="TestLogger" body="ping"
<location>] "Response Body" logger="TestLogger" body="pong"
`,
},
"io-reader": {
v: 8,
body: strings.NewReader("ping"),
// Cannot log the request body!
expectedOutput: `<location>] "Response Body" logger="TestLogger" body="pong"
`,
},
"truncate": {
v: 8,
body: []byte(strings.Repeat("a", 2000)),
expectedOutput: fmt.Sprintf(`<location>] "Request Body" logger="TestLogger" body="%s [truncated 976 chars]"
<location>] "Response Body" logger="TestLogger" body="pong"
`, strings.Repeat("a", 1024)),
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
state := klog.CaptureState()
defer state.Restore()

var buffer bytes.Buffer
klog.SetOutput(&buffer)
klog.LogToStderr(false)
var fs flag.FlagSet
klog.InitFlags(&fs)
require.NoError(t, fs.Set("v", fmt.Sprintf("%d", tc.v)), "set verbosity")

client := clientForFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("pong")),
}, nil
})

req := NewRequestWithClient(nil, "", defaultContentConfig(), client).
Body(tc.body)

logger := klog.Background()
logger = klog.LoggerWithName(logger, "TestLogger")
ctx := klog.NewContext(context.Background(), logger)

_, file, line, _ := goruntime.Caller(0)
result := req.Do(ctx)
require.NoError(t, result.Error(), "request.Do")

// Compare log output:
// - strip date/time/pid from each line (fixed length header)
// - replace <location> with the actual call location
state.Restore()
expectedOutput := strings.ReplaceAll(tc.expectedOutput, "<location>", fmt.Sprintf("%s:%d", path.Base(file), line+1))
actualOutput := buffer.String()
actualOutput = regexp.MustCompile(`(?m)^.{30}`).ReplaceAllString(actualOutput, "")
assert.Equal(t, expectedOutput, actualOutput)
})
}
}

0 comments on commit 53c7e93

Please sign in to comment.