Skip to content

Commit

Permalink
feat: stream bootloader logs (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-airbyte authored Sep 11, 2024
1 parent 0a859d4 commit 3a76ec0
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 0 deletions.
9 changes: 9 additions & 0 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Client interface {
EventsWatch(ctx context.Context, namespace string) (watch.Interface, error)

LogsGet(ctx context.Context, namespace string, name string) (string, error)
StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error)
}

var _ Client = (*DefaultK8sClient)(nil)
Expand Down Expand Up @@ -325,3 +326,11 @@ func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name s
}
return buf.String(), nil
}

func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: true,
SinceTime: &metav1.Time{Time: since},
})
return req.Stream(ctx)
}
10 changes: 10 additions & 0 deletions internal/cmd/local/k8s/k8stest/k8stest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package k8stest

import (
"context"
"io"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/k8s"
v1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -33,6 +35,7 @@ type MockClient struct {
FnServiceGet func(ctx context.Context, namespace, name string) (*corev1.Service, error)
FnEventsWatch func(ctx context.Context, namespace string) (watch.Interface, error)
FnLogsGet func(ctx context.Context, namespace string, name string) (string, error)
FnStreamPodLogs func(ctx context.Context, namespace, podName string, since time.Time) (io.ReadCloser, error)
}

func (m *MockClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
Expand Down Expand Up @@ -166,3 +169,10 @@ func (m *MockClient) LogsGet(ctx context.Context, namespace string, name string)
}
return m.FnLogsGet(ctx, namespace, name)
}

func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
if m.FnStreamPodLogs == nil {
panic("FnStreamPodLogs is not configured")
}
return m.FnStreamPodLogs(ctx, namespace, podName, since)
}
1 change: 1 addition & 0 deletions internal/cmd/local/local/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const (
airbyteBootloaderPodName = "airbyte-abctl-airbyte-bootloader"
airbyteChartName = "airbyte/airbyte"
airbyteChartRelease = "airbyte-abctl"
airbyteIngress = "ingress-abctl"
Expand Down
61 changes: 61 additions & 0 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package local

import (
"bufio"
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -378,6 +380,63 @@ func (c *Command) watchEvents(ctx context.Context) {
}
}

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error {
r, err := c.k8s.StreamPodLogs(ctx, namespace, podName, since)
if err != nil {
return err
}
defer r.Close()

level := pterm.Debug
scanner := bufio.NewScanner(r)

for scanner.Scan() {

// skip java stacktrace noise
if strings.HasPrefix(scanner.Text(), "\tat ") || strings.HasPrefix(scanner.Text(), "\t... ") {
continue
}

m := javaLogRx.FindSubmatch(scanner.Bytes())
var msg string

if m != nil {
msg = string(m[2])
if string(m[1]) == "ERROR" {
level = pterm.Error
} else {
level = pterm.Debug
}
} else {
msg = scanner.Text()
}

level.Printfln("%s: %s", prefix, msg)
}
return scanner.Err()
}

func (c *Command) watchBootloaderLogs(ctx context.Context) {
pterm.Debug.Printfln("start streaming bootloader logs")
since := time.Now()

for {
// Wait a few seconds on the first iteration, give the bootloaders some time to start.
time.Sleep(5 * time.Second)

err := c.streamPodLogs(ctx, airbyteNamespace, airbyteBootloaderPodName, "airbyte-bootloader", since)
if err == nil {
break
} else {
pterm.Debug.Printfln("error streaming bootloader logs. will retry: %s", err)
}
}
pterm.Debug.Printfln("done streaming bootloader logs")
}

// now is used to filter out kubernetes events that happened in the past.
// Kubernetes wants us to use the ResourceVersion on the event watch request itself, but that approach
// is more complicated as it requires determining which ResourceVersion to initially provide.
Expand All @@ -398,6 +457,8 @@ func (c *Command) handleEvent(ctx context.Context, e *eventsv1.Event) {
case strings.EqualFold(e.Type, "normal"):
if strings.EqualFold(e.Reason, "backoff") {
pterm.Warning.Println(e.Note)
} else if e.Reason == "Started" && e.Regarding.Name == "airbyte-abctl-airbyte-bootloader" {
go c.watchBootloaderLogs(ctx)
} else {
pterm.Debug.Println(e.Note)
}
Expand Down

0 comments on commit 3a76ec0

Please sign in to comment.