From 22a493b5549f6f157499c3962939c469b05d2ae4 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 4 Oct 2024 09:08:01 +1000 Subject: [PATCH] feat: dump kube pod logs --- .github/workflows/ci.yml | 6 ++ .../scaling/kube_scaling_integration_test.go | 4 +- internal/integration/actions.go | 6 +- internal/integration/harness.go | 66 ++++++++++++++++++- 4 files changed, 74 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8134829f7e..ab47696b74 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -348,6 +348,12 @@ jobs: set -euo pipefail # shellcheck disable=SC2046 go test -v -race -tags infrastructure -run '^${{ matrix.test }}$' $(git grep -l '^//go:build infrastructure' | xargs grep -l '^func ${{ matrix.test }}' | xargs -I {} dirname ./{}) + - name: Archive Report + uses: actions/upload-artifact@v4 + if: always() # Always upload the report even on failure + with: + name: kube-report-${{ matrix.test }} + path: /tmp/ftl-kube-report/ integration-success: name: Integration Success needs: [integration-run] diff --git a/backend/controller/scaling/kube_scaling_integration_test.go b/backend/controller/scaling/kube_scaling_integration_test.go index 69e1b104cc..f11c3bcb13 100644 --- a/backend/controller/scaling/kube_scaling_integration_test.go +++ b/backend/controller/scaling/kube_scaling_integration_test.go @@ -34,7 +34,7 @@ func TestKubeScaling(t *testing.T) { in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { assert.Equal(t, "Hello, Bob!!!", response) }), - in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) { + in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset) { deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{}) assert.NoError(t, err) for _, dep := range deps.Items { @@ -76,7 +76,7 @@ func TestKubeScaling(t *testing.T) { err := failure.Load() assert.NoError(t, err) }, - in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) { + in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset) { deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{}) assert.NoError(t, err) depCount := 0 diff --git a/internal/integration/actions.go b/internal/integration/actions.go index 13b3ba4e5a..3f93b7b045 100644 --- a/internal/integration/actions.go +++ b/internal/integration/actions.go @@ -227,7 +227,7 @@ func Deploy(module string) Action { if ic.Provisioner != nil { args = append(args, "--use-provisioner", "--provisioner-endpoint=http://localhost:8893") } - if ic.kubeClient != nil { + if ic.kubeClient.Ok() { args = append(args, "--build-env", "GOOS=linux", "--build-env", "GOARCH=amd64", "--build-env", "CGO_ENABLED=0") } args = append(args, module) @@ -395,9 +395,9 @@ func Call[Req any, Resp any](module, verb string, request Req, check func(t test } // VerifyKubeState lets you test the current kube state -func VerifyKubeState(check func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset)) Action { +func VerifyKubeState(check func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset)) Action { return func(t testing.TB, ic TestContext) { - check(ic.Context, t, ic.kubeNamespace, ic.kubeClient) + check(ic.Context, t, ic.kubeNamespace, ic.kubeClient.MustGet()) } } diff --git a/internal/integration/harness.go b/internal/integration/harness.go index 209965afad..1c450fa4cb 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "io" "os" "path/filepath" "runtime" @@ -22,6 +23,11 @@ import ( "github.com/otiai10/copy" "k8s.io/client-go/kubernetes" + kubecore "k8s.io/api/core/v1" + kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/yaml" + "github.com/TBD54566975/ftl/backend/controller/scaling/k8sscaling" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect" @@ -33,13 +39,15 @@ import ( "github.com/TBD54566975/ftl/internal/rpc" ) +const dumpPath = "/tmp/ftl-kube-report" + func (i TestContext) integrationTestTimeout() time.Duration { timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s") d, err := time.ParseDuration(timeout) if err != nil { panic(err) } - if i.kubeClient != nil { + if i.kubeClient.Ok() { // kube can be slow, give it some time return d * 5 } @@ -277,9 +285,10 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { Verbs: verbs, realT: t, language: language, - kubeClient: kubeClient, kubeNamespace: kubeNamespace, + kubeClient: optional.Ptr(kubeClient), } + defer ic.dumpKubePods() if opts.startController || opts.kube { ic.Controller = controller @@ -354,7 +363,7 @@ type TestContext struct { // The Language under test language string // Set if the test is running on kubernetes - kubeClient *kubernetes.Clientset + kubeClient optional.Option[kubernetes.Clientset] kubeNamespace string Controller ftlv1connect.ControllerServiceClient @@ -465,3 +474,54 @@ func startProcess(ctx context.Context, t testing.TB, args ...string) context.Con }) return ctx } + +func (i TestContext) dumpKubePods() { + if client, ok := i.kubeClient.Get(); ok { + _ = os.RemoveAll(dumpPath) // #nosec + list, err := client.CoreV1().Pods(i.kubeNamespace).List(i, kubemeta.ListOptions{}) + if err == nil { + for _, pod := range list.Items { + Infof("Dumping logs for pod %s", pod.Name) + podPath := filepath.Join(dumpPath, pod.Name) + err := os.MkdirAll(podPath, 0755) // #nosec + if err != nil { + Infof("Error creating directory %s: %v", podPath, err) + continue + } + podYaml, err := yaml.Marshal(pod) + if err != nil { + Infof("Error marshalling pod %s: %v", pod.Name, err) + continue + } + err = os.WriteFile(filepath.Join(podPath, "pod.yaml"), podYaml, 0644) // #nosec + if err != nil { + Infof("Error writing pod %s: %v", pod.Name, err) + continue + } + for _, container := range pod.Spec.Containers { + path := filepath.Join(dumpPath, pod.Name, container.Name+".log") + req := client.CoreV1().Pods(i.kubeNamespace).GetLogs(pod.Name, &kubecore.PodLogOptions{Container: container.Name}) + podLogs, err := req.Stream(context.Background()) + defer func() { + _ = podLogs.Close() + }() + if err != nil { + Infof("Error getting logs for pod %s: %v", pod.Name, err) + continue + } + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + Infof("Error copying logs for pod %s: %v", pod.Name, err) + continue + } + str := buf.String() + err = os.WriteFile(path, []byte(str), 0644) // #nosec + if err != nil { + Infof("Error writing logs for pod %s: %v", pod.Name, err) + } + } + } + } + } +}