Skip to content

Commit

Permalink
feat: dump kube pod logs
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Oct 3, 2024
1 parent 91d1822 commit 22a493b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ jobs:
set -euo pipefail
# shellcheck disable=SC2046
go test -v -race -tags infrastructure -run '^${{ matrix.test }}$' $(git grep -l '^//go:build infrastructure' | xargs grep -l '^func ${{ matrix.test }}' | xargs -I {} dirname ./{})
- name: Archive Report
uses: actions/upload-artifact@v4
if: always() # Always upload the report even on failure
with:
name: kube-report-${{ matrix.test }}
path: /tmp/ftl-kube-report/
integration-success:
name: Integration Success
needs: [integration-run]
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/scaling/kube_scaling_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestKubeScaling(t *testing.T) {
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
assert.Equal(t, "Hello, Bob!!!", response)
}),
in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) {
in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset) {
deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{})
assert.NoError(t, err)
for _, dep := range deps.Items {
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestKubeScaling(t *testing.T) {
err := failure.Load()
assert.NoError(t, err)
},
in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset) {
in.VerifyKubeState(func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset) {
deps, err := client.AppsV1().Deployments(namespace).List(ctx, v1.ListOptions{})
assert.NoError(t, err)
depCount := 0
Expand Down
6 changes: 3 additions & 3 deletions internal/integration/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func Deploy(module string) Action {
if ic.Provisioner != nil {
args = append(args, "--use-provisioner", "--provisioner-endpoint=http://localhost:8893")
}
if ic.kubeClient != nil {
if ic.kubeClient.Ok() {
args = append(args, "--build-env", "GOOS=linux", "--build-env", "GOARCH=amd64", "--build-env", "CGO_ENABLED=0")
}
args = append(args, module)
Expand Down Expand Up @@ -395,9 +395,9 @@ func Call[Req any, Resp any](module, verb string, request Req, check func(t test
}

// VerifyKubeState lets you test the current kube state
func VerifyKubeState(check func(ctx context.Context, t testing.TB, namespace string, client *kubernetes.Clientset)) Action {
func VerifyKubeState(check func(ctx context.Context, t testing.TB, namespace string, client kubernetes.Clientset)) Action {
return func(t testing.TB, ic TestContext) {
check(ic.Context, t, ic.kubeNamespace, ic.kubeClient)
check(ic.Context, t, ic.kubeNamespace, ic.kubeClient.MustGet())
}
}

Expand Down
66 changes: 63 additions & 3 deletions internal/integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
Expand All @@ -22,6 +23,11 @@ import (
"github.com/otiai10/copy"
"k8s.io/client-go/kubernetes"

kubecore "k8s.io/api/core/v1"
kubemeta "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/yaml"

"github.com/TBD54566975/ftl/backend/controller/scaling/k8sscaling"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect"
Expand All @@ -33,13 +39,15 @@ import (
"github.com/TBD54566975/ftl/internal/rpc"
)

const dumpPath = "/tmp/ftl-kube-report"

func (i TestContext) integrationTestTimeout() time.Duration {
timeout := optional.Zero(os.Getenv("FTL_INTEGRATION_TEST_TIMEOUT")).Default("5s")
d, err := time.ParseDuration(timeout)
if err != nil {
panic(err)
}
if i.kubeClient != nil {
if i.kubeClient.Ok() {
// kube can be slow, give it some time
return d * 5
}
Expand Down Expand Up @@ -277,9 +285,10 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
Verbs: verbs,
realT: t,
language: language,
kubeClient: kubeClient,
kubeNamespace: kubeNamespace,
kubeClient: optional.Ptr(kubeClient),
}
defer ic.dumpKubePods()

if opts.startController || opts.kube {
ic.Controller = controller
Expand Down Expand Up @@ -354,7 +363,7 @@ type TestContext struct {
// The Language under test
language string
// Set if the test is running on kubernetes
kubeClient *kubernetes.Clientset
kubeClient optional.Option[kubernetes.Clientset]
kubeNamespace string

Controller ftlv1connect.ControllerServiceClient
Expand Down Expand Up @@ -465,3 +474,54 @@ func startProcess(ctx context.Context, t testing.TB, args ...string) context.Con
})
return ctx
}

func (i TestContext) dumpKubePods() {
if client, ok := i.kubeClient.Get(); ok {
_ = os.RemoveAll(dumpPath) // #nosec
list, err := client.CoreV1().Pods(i.kubeNamespace).List(i, kubemeta.ListOptions{})
if err == nil {
for _, pod := range list.Items {
Infof("Dumping logs for pod %s", pod.Name)
podPath := filepath.Join(dumpPath, pod.Name)
err := os.MkdirAll(podPath, 0755) // #nosec
if err != nil {
Infof("Error creating directory %s: %v", podPath, err)
continue
}
podYaml, err := yaml.Marshal(pod)
if err != nil {
Infof("Error marshalling pod %s: %v", pod.Name, err)
continue
}
err = os.WriteFile(filepath.Join(podPath, "pod.yaml"), podYaml, 0644) // #nosec
if err != nil {
Infof("Error writing pod %s: %v", pod.Name, err)
continue
}
for _, container := range pod.Spec.Containers {
path := filepath.Join(dumpPath, pod.Name, container.Name+".log")
req := client.CoreV1().Pods(i.kubeNamespace).GetLogs(pod.Name, &kubecore.PodLogOptions{Container: container.Name})
podLogs, err := req.Stream(context.Background())
defer func() {
_ = podLogs.Close()
}()
if err != nil {
Infof("Error getting logs for pod %s: %v", pod.Name, err)
continue
}
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
Infof("Error copying logs for pod %s: %v", pod.Name, err)
continue
}
str := buf.String()
err = os.WriteFile(path, []byte(str), 0644) // #nosec
if err != nil {
Infof("Error writing logs for pod %s: %v", pod.Name, err)
}
}
}
}
}
}

0 comments on commit 22a493b

Please sign in to comment.