Skip to content

Commit

Permalink
logs endpoint to show logs
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 16, 2024
1 parent 91888cf commit e6c5b61
Show file tree
Hide file tree
Showing 12 changed files with 406 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/observability/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func (o *Observability) WriteBufferToStorage(n int64) error {
if err != nil && err != io.EOF {
return fmt.Errorf("write error from buffer to temporary buffer: %s", err)
}

file, err := o.Storage.OpenFileForWriting("data-" + time.Now().Format("2003-01-02T15:04:05") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10))
now := time.Now()
file, err := o.Storage.OpenFileForWriting(now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10))
if err != nil {
return fmt.Errorf("open file for writing error: %s", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/observability/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestIngestion(t *testing.T) {
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
decodedMessages := decodeMessages(messages)
totalMessages += len(decodedMessages)
}
if len(dirlist) == 0 {
Expand All @@ -71,6 +71,7 @@ func TestIngestion(t *testing.T) {
}

func TestIngestionMoreMessages(t *testing.T) {
t.Skip() // we can skip this for general unit testing
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
Expand Down Expand Up @@ -116,7 +117,7 @@ func TestIngestionMoreMessages(t *testing.T) {
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
decodedMessages := decodeMessages(messages)
totalMessages += len(decodedMessages)
}
if len(dirlist) == 0 {
Expand Down
74 changes: 51 additions & 23 deletions pkg/observability/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,63 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) {
return result, nil
}

func decodeMessage(msgs []byte) []FluentBitMessage {
func decodeMessages(msgs []byte) []FluentBitMessage {
res := []FluentBitMessage{}
recordOffset := 0
for k := 0; k < len(msgs); k++ {
if k > recordOffset+8 && msgs[k] == 0xff && msgs[k-1] == 0xff {
bits := binary.LittleEndian.Uint64(msgs[recordOffset : recordOffset+8])
msg := FluentBitMessage{
Date: math.Float64frombits(bits),
Data: map[string]string{},
}
isKey := false
key := ""
start := 8 + recordOffset
for kk := start; kk < k; kk++ {
if msgs[kk] == 0xff {
if isKey {
isKey = false
msg.Data[key] = string(msgs[start+1 : kk])
start = kk + 1
} else {
isKey = true
key = string(msgs[start:kk])
start = kk
}
}
}
res = append(res, msg)
res = append(res, decodeMessage(msgs[recordOffset:k]))
recordOffset = k + 1
}
}
return res
}
func decodeMessage(data []byte) FluentBitMessage {
bits := binary.LittleEndian.Uint64(data[0:8])
msg := FluentBitMessage{
Date: math.Float64frombits(bits),
Data: map[string]string{},
}
isKey := false
key := ""
start := 8
for kk := start; kk < len(data); kk++ {
if data[kk] == 0xff {
if isKey {
isKey = false
msg.Data[key] = string(data[start+1 : kk])
start = kk + 1
} else {
isKey = true
key = string(data[start:kk])
start = kk
}
}
}
// if last record didn't end with the terminator
if data[len(data)-1] != 0xff {
msg.Data[key] = string(data[start+1:])
}
return msg
}

func scanMessage(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
for i := 0; i < len(data); i++ {
if data[i] == 0xff && data[i-1] == 0xff {
return i + 1, data[0 : i-1], nil
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
if len(data) > 1 && data[len(data)-1] == 0xff && data[len(data)-2] == 0xff {
return len(data[0 : len(data)-2]), data, nil
} else {
return len(data), data, nil
}
}
// Request more data.
return 0, nil, nil
}
105 changes: 103 additions & 2 deletions pkg/observability/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDecoding(t *testing.T) {
}
}

func TestDecodeMsg(t *testing.T) {
func TestDecodeMessages(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197045,
Expand All @@ -54,7 +54,7 @@ func TestDecodeMsg(t *testing.T) {
},
}
encoded := encodeMessage(msgs)
decoded := decodeMessage(encoded)
decoded := decodeMessages(encoded)

if len(msgs) != len(decoded) {
t.Fatalf("length doesn't match")
Expand All @@ -73,3 +73,104 @@ func TestDecodeMsg(t *testing.T) {
}
}
}

func TestDecodeMessage(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
message := decodeMessage(encoded)

if message.Date != message.Date {
t.Fatalf("date doesn't match")
}
if len(msgs[0].Data) != len(message.Data) {
t.Fatalf("length of data doesn't match")
}
for kk := range message.Data {
if msgs[0].Data[kk] != message.Data[kk] {
t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, message.Data[kk], message.Data[kk])
}
}
}
func TestDecodeMessageWithoutTerminator(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
message := decodeMessage(bytes.TrimSuffix(encoded, []byte{0xff, 0xff}))

if message.Date != message.Date {
t.Fatalf("date doesn't match")
}
if len(msgs[0].Data) != len(message.Data) {
t.Fatalf("length of data doesn't match: got: '%s', expected '%s'", message.Data, msgs[0].Data)
}
for kk := range message.Data {
if msgs[0].Data[kk] != message.Data[kk] {
t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, message.Data[kk], msgs[0].Data[kk])
}
}
}

func TestScanMessage(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197045,
Data: map[string]string{
"mykey": "this is myvalue",
"second key": "this is my second value",
"third key": "this is my third value",
},
},
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
// first record
advance, record1, err := scanMessage(encoded, false)
if err != nil {
t.Fatalf("scan lines error: %s", err)
}
firstRecord := decodeMessages(append(record1, []byte{0xff, 0xff}...))
if len(firstRecord) == 0 {
t.Fatalf("first record is empty")
}
if firstRecord[0].Data["mykey"] != "this is myvalue" {
t.Fatalf("wrong data returned")
}
// second record
advance2, record2, err := scanMessage(encoded[advance:], false)
if err != nil {
t.Fatalf("scan lines error: %s", err)
}
secondRecord := decodeMessages(append(record2, []byte{0xff, 0xff}...))
if len(secondRecord) == 0 {
t.Fatalf("first record is empty")
}
if secondRecord[0].Data["second data set"] != "my value" {
t.Fatalf("wrong data returned in second record")
}
// third call
advance3, record3, err := scanMessage(encoded[advance+advance2:], false)
if err != nil {
t.Fatalf("scan lines error: %s", err)
}
if advance3 != 0 {
t.Fatalf("third record should be empty. Got: %+v", record3)
}
}
62 changes: 62 additions & 0 deletions pkg/observability/handlers.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package observability

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
)

func (o *Observability) observabilityHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -21,3 +24,62 @@ func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request)
}
w.WriteHeader(http.StatusOK)
}

func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusBadRequest)
return
}
if r.FormValue("fromDate") == "" {
o.returnError(w, fmt.Errorf("no from date supplied"), http.StatusBadRequest)
return
}
fromDate, err := time.Parse("2006-01-02", r.FormValue("fromDate"))
if err != nil {
o.returnError(w, fmt.Errorf("invalid date: %s", err), http.StatusBadRequest)
return
}
if r.FormValue("endDate") == "" {
o.returnError(w, fmt.Errorf("no end date supplied"), http.StatusBadRequest)
return
}
endDate, err := time.Parse("2006-01-02", r.FormValue("endDate"))
if err != nil {
o.returnError(w, fmt.Errorf("invalid date: %s", err), http.StatusBadRequest)
return
}
offset := 0
if r.FormValue("offset") != "" {
i, err := strconv.Atoi(r.PathValue("offset"))
if err == nil {
offset = i
}
}
maxLines := 0
if r.FormValue("maxLines") != "" {
i, err := strconv.Atoi(r.PathValue("maxLines"))
if err == nil {
maxLines = i
}
}
pos := int64(0)
if r.FormValue("pos") != "" {
i, err := strconv.ParseInt(r.PathValue("pos"), 10, 64)
if err == nil {
pos = i
}
}
out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("get logs error: %s", err)
return
}
outBytes, err := json.Marshal(out)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("marshal error: %s", err)
return
}
w.Write(outBytes)
}
2 changes: 1 addition & 1 deletion pkg/observability/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestIngestionHandler(t *testing.T) {
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
decodedMessages := decodeMessages(messages)
if decodedMessages[0].Date != 1720613813.197045 {
t.Fatalf("unexpected date. Got %f, expected: %f", decodedMessages[0].Date, 1720613813.197045)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/observability/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package observability

import (
"fmt"
"net/http"
"strings"
)

func (o *Observability) returnError(w http.ResponseWriter, err error, statusCode int) {
fmt.Println("========= ERROR =========")
fmt.Printf("Error: %s\n", err)
fmt.Println("=========================")
w.WriteHeader(statusCode)
w.Write([]byte(`{"error": "` + strings.Replace(err.Error(), `"`, `\"`, -1) + `"}`))
}
Loading

0 comments on commit e6c5b61

Please sign in to comment.