Skip to content

Commit

Permalink
logging UI
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 23, 2024
1 parent f5a8bfc commit d084dd6
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 49 deletions.
15 changes: 15 additions & 0 deletions pkg/observability/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (o *Observability) Ingest(data io.ReadCloser) error {
if err != nil {
return fmt.Errorf("decode error: %s", err)
}
logging.DebugLog(fmt.Errorf("messages ingested: %d", len(msgs)))
_, err = o.Buffer.Write(encodeMessage(msgs))
if err != nil {
return fmt.Errorf("write error: %s", err)
Expand All @@ -84,6 +85,20 @@ func (o *Observability) Ingest(data io.ReadCloser) error {
return nil
}

func (o *Observability) Flush() error {
// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
return fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
return nil
}

func (c *ConcurrentRWBuffer) Write(p []byte) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
12 changes: 3 additions & 9 deletions pkg/observability/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,9 @@ func TestIngestionMoreMessages(t *testing.T) {
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
err = o.Flush()
if err != nil {
t.Fatalf("flush error: %s", err)
}

dirlist, err := storage.ReadDir("")
Expand Down
46 changes: 24 additions & 22 deletions pkg/observability/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,33 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) {
if len(m1) == 0 {
return result, fmt.Errorf("empty array")
}
switch m2 := m1[0].(type) {
case map[string]interface{}:
var fluentBitMessage FluentBitMessage
fluentBitMessage.Data = make(map[string]string)
val, ok := m2["date"]
if ok {
fluentBitMessage.Date = val.(float64)
}
for key, value := range m2 {
if key != "date" {
switch valueTyped := value.(type) {
case string:
fluentBitMessage.Data[key] = valueTyped
case float64:
fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64)
case []byte:
fluentBitMessage.Data[key] = string(valueTyped)
default:
fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped))
for _, m1Element := range m1 {
switch m2 := m1Element.(type) {
case map[string]interface{}:
var fluentBitMessage FluentBitMessage
fluentBitMessage.Data = make(map[string]string)
val, ok := m2["date"]
if ok {
fluentBitMessage.Date = val.(float64)
}
for key, value := range m2 {
if key != "date" {
switch valueTyped := value.(type) {
case string:
fluentBitMessage.Data[key] = valueTyped
case float64:
fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64)
case []byte:
fluentBitMessage.Data[key] = string(valueTyped)
default:
fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped))
}
}
}
result = append(result, fluentBitMessage)
default:
return result, fmt.Errorf("invalid type: no map found in array")
}
result = append(result, fluentBitMessage)
default:
return result, fmt.Errorf("invalid type: no map found in array")
}
default:
return result, fmt.Errorf("invalid type: no array found")
Expand Down
44 changes: 44 additions & 0 deletions pkg/observability/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,50 @@ func TestDecoding(t *testing.T) {
}
}

func TestDecodingMultiMessage(t *testing.T) {
payload := IncomingData{
{
"date": 1727119152.0,
"container_name": "/fluentbit-nginx-1",
"source": "stdout",
"log": "/docker-entrypoint.sh: /docker-entrypoint.d/ is not empty, will attempt to perform configuration",
"container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186",
},
{
"date": 1727119152.0,
"source": "stdout",
"log": "/docker-entrypoint.sh: Looking for shell scripts in /docker-entrypoint.d/",
"container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186",
"container_name": "/fluentbit-nginx-1",
},
{
"date": 1727119152.0,
"container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186",
"container_name": "/fluentbit-nginx-1",
"source": "stdout",
"log": "/docker-entrypoint.sh: Launching /docker-entrypoint.d/10-listen-on-ipv6-by-default.sh",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("json marshal error: %s", err)
}
messages, err := Decode(bytes.NewBuffer(payloadBytes))
if err != nil {
t.Fatalf("error: %s", err)
}
if len(messages) != len(payload) {
t.Fatalf("incorrect messages returned. Got %d, expected %d", len(messages), len(payload))
}
val, ok := messages[2].Data["container_id"]
if !ok {
t.Fatalf("container_id key not found")
}
if string(val) != "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186" {
t.Fatalf("wrong data returned: %s", val)
}
}

func TestDecodeMessages(t *testing.T) {
msgs := []FluentBitMessage{
{
Expand Down
9 changes: 5 additions & 4 deletions pkg/observability/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,27 @@ func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) {
}
offset := 0
if r.FormValue("offset") != "" {
i, err := strconv.Atoi(r.PathValue("offset"))
i, err := strconv.Atoi(r.FormValue("offset"))
if err == nil {
offset = i
}
}
maxLines := 0
if r.FormValue("maxLines") != "" {
i, err := strconv.Atoi(r.PathValue("maxLines"))
i, err := strconv.Atoi(r.FormValue("maxLines"))
if err == nil {
maxLines = i
}
}
pos := int64(0)
if r.FormValue("pos") != "" {
i, err := strconv.ParseInt(r.PathValue("pos"), 10, 64)
i, err := strconv.ParseInt(r.FormValue("pos"), 10, 64)
if err == nil {
pos = i
}
}
out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset)
search := r.FormValue("search")
out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, search)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("get logs error: %s", err)
Expand Down
30 changes: 23 additions & 7 deletions pkg/observability/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"bufio"
"fmt"
"math"
"strings"
"time"
)

func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, maxLogLines int) (LogEntryResponse, error) {
func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string) (LogEntryResponse, error) {
logEntryResponse := LogEntryResponse{
Enabled: true,
Environments: []string{"dev", "qa", "prod"},
LogEntries: []LogEntry{},
Keys: make(map[KeyValue]int),
}

logFiles := []string{}
Expand All @@ -22,7 +25,8 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset,
for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) {
fileList, err := o.Storage.ReadDir(d.Format("2006/01/02"))
if err != nil {
return logEntryResponse, fmt.Errorf("can't read log directly: %s", err)
logEntryResponse.NextPos = -1
return logEntryResponse, nil // can't read directory, return empty response
}
for _, filename := range fileList {
logFiles = append(logFiles, d.Format("2006/01/02")+"/"+filename)
Expand Down Expand Up @@ -50,20 +54,32 @@ func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset,
for scanner.Scan() && len(logEntryResponse.LogEntries) < maxLogLines { // read multiple lines
// decode, store as logentry
logMessage := decodeMessage(scanner.Bytes())
val, ok := logMessage.Data["log"]
logline, ok := logMessage.Data["log"]
if ok {
timestamp := floatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute)
logEntry := LogEntry{
Timestamp: timestamp.Format(TIMESTAMP_FORMAT),
Data: val,
if search == "" || strings.Contains(logline, search) {
logEntry := LogEntry{
Timestamp: timestamp.Format(TIMESTAMP_FORMAT),
Data: logline,
}
logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry)
for k, v := range logMessage.Data {
if k != "log" {
logEntryResponse.Keys[KeyValue{Key: k, Value: v}] += 1
}
}
}
logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry)
}
}
if err := scanner.Err(); err != nil {
return logEntryResponse, fmt.Errorf("log file read (scanner) error: %s", err)
}
}
if len(logEntryResponse.LogEntries) < maxLogLines {
logEntryResponse.NextPos = -1 // no more records
} else {
logEntryResponse.NextPos = pos
}

return logEntryResponse, nil
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/observability/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"io"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -48,8 +49,9 @@ func TestGetLogs(t *testing.T) {

now := time.Now()
maxLogLines := 100
search := ""

logEntryResponse, err := o.getLogs(now, now, 0, 0, maxLogLines)
logEntryResponse, err := o.getLogs(now, now, 0, 0, maxLogLines, search)
if err != nil {
t.Fatalf("get logs error: %s", err)
}
Expand All @@ -69,3 +71,18 @@ func TestFloatToDate(t *testing.T) {
t.Fatalf("times are not equal. Got: %s, expected: %s", floatToDate, now)
}
}

func TestKeyValue(t *testing.T) {
logEntryResponse := LogEntryResponse{
Keys: map[KeyValue]int{
{Key: "k", Value: "v"}: 4,
},
}
out, err := json.Marshal(logEntryResponse)
if err != nil {
t.Fatalf("error: %s", err)
}
if !strings.Contains(string(out), `"keys":[{"key":"k","value":"v","total":4}]`) {
t.Fatalf("wrong output: %s", out)
}
}
27 changes: 24 additions & 3 deletions pkg/observability/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package observability

import (
"bytes"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -32,12 +34,31 @@ type ConcurrentRWBuffer struct {
}

type LogEntryResponse struct {
Enabled bool `json:"enabled"`
LogEntries []LogEntry `json:"logEntries"`
Environments []string `json:"environments"`
Enabled bool `json:"enabled"`
LogEntries []LogEntry `json:"logEntries"`
Environments []string `json:"environments"`
Keys KeyValueInt `json:"keys"`
NextPos int64 `json:"nextPos"`
}

type LogEntry struct {
Timestamp string `json:"timestamp"`
Data string `json:"data"`
}

type KeyValueInt map[KeyValue]int

type KeyValue struct {
Key string
Value string
}

func (kv KeyValueInt) MarshalJSON() ([]byte, error) {
res := "["
for k, v := range kv {
res += `{ "key" : "` + k.Key + `", "value": "` + k.Value + `", "total": ` + strconv.Itoa(v) + ` },`
}
res = strings.TrimRight(res, ",")
res += "]"
return []byte(res), nil
}
6 changes: 3 additions & 3 deletions webapp/src/NavBar/NavBar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ export function NavBar({serverType}: Props) {
const observabilityLinks = {
"admin": [
{ link: '/', label: 'Status', icon: TbBellRinging },
{ link: '/logs', label: 'Logs', icon: FaStream },
{ link: '/users', label: 'Users', icon: TbUser },
{ link: '/setup', label: 'VPN Setup', icon: TbSettings },
{ link: '/setup', label: 'Setup', icon: TbSettings },
{ link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection },
{ link: '/logs', label: 'Logs', icon: FaStream },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
],
"user": [
Expand Down Expand Up @@ -99,7 +99,7 @@ export function NavBar({serverType}: Props) {
<nav className={classes.navbar}>
<div className={classes.navbarMain}>
<Group className={classes.header} justify="space-between">
VPN Server
{serverType === "vpn" ? "VPN Server" : "Observability Server"}
<Code fw={700}><Version /></Code>
</Group>
{links}
Expand Down

0 comments on commit d084dd6

Please sign in to comment.