diff --git a/backend/controller/scaling/kube_scaling_integration_test.go b/backend/controller/scaling/kube_scaling_integration_test.go index 69e1b104cc..f11c3bcb13 100644 --- a/backend/controller/scaling/kube_scaling_integration_test.go +++ b/backend/controller/scaling/kube_scaling_integration_test.go @@ -34,7 +34,7 @@ func TestKubeScaling(t *testing.T) { in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { assert.Equal(t, "Hello, Bob!!!", response) }), - in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) { + in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset) { deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{}) assert.NoError(t, err) for _, dep := range deps.Items { @@ -76,7 +76,7 @@ func TestKubeScaling(t *testing.T) { err := failure.Load() assert.NoError(t, err) }, - in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) { + in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset) { deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{}) assert.NoError(t, err) depCount := 0 diff --git a/internal/integration/actions.go b/internal/integration/actions.go index 50cf02296d..6cbd329d58 100644 --- a/internal/integration/actions.go +++ b/internal/integration/actions.go @@ -227,7 +227,7 @@ func Deploy(module string) Action { if ic.Provisioner != nil { args = append(args, "--use-provisioner", "--provisioner-endpoint=http://localhost:8893") } - if ic.kubeClient != nil { + if ic.kubeClient.Ok() { args = append(args, "--build-env", "GOOS=linux", "--build-env", "GOARCH=amd64", "--build-env", "CGO_ENABLED=0") } args = append(args, module) @@ -395,9 +395,10 @@ func Call[Req any, Resp any](module, verb string, request Req, check func(t test } // VerifyKubeState lets you test the current kube state -func VerifyKubeState(check func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset)) Action { +func VerifyKubeState(check func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset)) Action { return func(t testing.TB, ic TestContext) { - check(ic.Context, t, ic.kubeNamespace, ic.kubeClient) + clientset := ic.kubeClient.MustGet() + check(ic.Context, t, ic.kubeNamespace, clientset) } } diff --git a/internal/integration/harness.go b/internal/integration/harness.go index 82b49c0ea3..67dd50c1b0 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "io" "os" "path/filepath" "runtime" @@ -20,6 +21,8 @@ import ( "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/optional" "github.com/otiai10/copy" + kubecore "k8s.io/api/core/v1" + kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "github.com/TBD54566975/ftl/backend/controller/scaling/k8sscaling" @@ -39,7 +42,7 @@ func (i TestContext) integrationTestTimeout() time.Duration { if err != nil { panic(err) } - if i.kubeClient != nil { + if i.kubeClient.Ok() { // kube can be slow, give it some time return d * 5 } @@ -277,7 +280,7 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { Verbs: verbs, realT: t, language: language, - kubeClient: kubeClient, + kubeClient: optional.Ptr(kubeClient), kubeNamespace: kubeNamespace, } @@ -354,7 +357,7 @@ type TestContext struct { // The Language under test language string // Set if the test is running on kubernetes - kubeClient *kubernetes.Clientset + kubeClient optional.Option[kubernetes.Clientset] kubeNamespace string Controller ftlv1connect.ControllerServiceClient @@ -384,6 +387,34 @@ func (i TestContext) AssertWithRetry(t testing.TB, assertion Action) { } select { case <-waitCtx.Done(): + if client, ok := i.kubeClient.Get(); ok { + Infof("Kube logs:") + list, err := client.CoreV1().Pods(i.kubeNamespace).List(i, kubemeta.ListOptions{}) + if err == nil { + for _, pod := range list.Items { + Infof("\n\n\n========== Pod %s ==========", pod.Name) + for _, container := range pod.Spec.Containers { + Infof("\n\n\n----------- Pod %s Container %s --------", pod.Name, container.Name) + req := client.CoreV1().Pods(i.kubeNamespace).GetLogs(pod.Name, &kubecore.PodLogOptions{Container: container.Name}) + podLogs, err := req.Stream(context.Background()) + if err != nil { + Infof("Error getting logs for pod %s: %v", pod.Name, err) + continue + } + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + Infof("Error copying logs for pod %s: %v", pod.Name, err) + continue + } + str := buf.String() + Infof("%s", str) + _ = podLogs.Close() + } + } + } + + } t.Fatalf("Timed out waiting for assertion to pass: %s", err) case <-time.After(time.Millisecond * 200):