Skip to content

Commit

Permalink
fix: promtail parser for azureeventhubs message without time field (#…
Browse files Browse the repository at this point in the history
…14218)

Co-authored-by: Trevor Whitney <[email protected]>
  • Loading branch information
andriikushch and trevorwhitney authored Oct 29, 2024
1 parent baaaa83 commit 2e62abb
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 6 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Bug Fixes

* **promtail:** fix parser for azureeventhubs message without time field ([#14218](https://github.com/grafana/loki/pull/14218))

## [3.1.1](https://github.com/grafana/loki/compare/v3.1.0...v3.1.1) (2024-08-08)


Expand All @@ -12,7 +18,6 @@

* **deps:** bumped dependencies versions to resolve CVEs ([#13789](https://github.com/grafana/loki/issues/13789)) ([34206cd](https://github.com/grafana/loki/commit/34206cd2d6290566034710ae6c2d08af8804bc91))


## [3.1.0](https://github.com/grafana/loki/compare/v3.0.0...v3.1.0) (2024-07-02)


Expand Down
39 changes: 34 additions & 5 deletions clients/pkg/promtail/targets/azureeventhubs/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/loki/v3/clients/pkg/promtail/api"

"github.com/grafana/loki/v3/pkg/logproto"
)

Expand All @@ -33,15 +32,17 @@ func (l azureMonitorResourceLogs) validate() error {
// azureMonitorResourceLog used to unmarshal common schema for Azure resource logs
// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema
type azureMonitorResourceLog struct {
Time string `json:"time"`
Time string `json:"time"`
// Some logs have `time` field, some have `timeStamp` field : https://github.com/grafana/loki/issues/14176
TimeStamp string `json:"timeStamp"`
Category string `json:"category"`
ResourceID string `json:"resourceId"`
OperationName string `json:"operationName"`
}

// validate check if fields marked as required by schema for Azure resource log are not empty
func (l azureMonitorResourceLog) validate() error {
valid := len(l.Time) != 0 &&
valid := l.isTimeOrTimeStampFieldSet() &&
len(l.Category) != 0 &&
len(l.ResourceID) != 0 &&
len(l.OperationName) != 0
Expand All @@ -53,6 +54,34 @@ func (l azureMonitorResourceLog) validate() error {
return nil
}

func (l azureMonitorResourceLog) isTimeOrTimeStampFieldSet() bool {
return len(l.Time) != 0 || len(l.TimeStamp) != 0
}

// getTime returns time from `time` or `timeStamp` field. If both fields are set, `time` is used. If both fields are empty, error is returned.
func (l azureMonitorResourceLog) getTime() (time.Time, error) {
if len(l.Time) == 0 && len(l.TimeStamp) == 0 {
var t time.Time
return t, errors.New("time and timeStamp fields are empty")
}

if len(l.Time) != 0 {
t, err := time.Parse(time.RFC3339, l.Time)
if err != nil {
return t, err
}

return t.UTC(), nil
}

t, err := time.Parse(time.RFC3339, l.TimeStamp)
if err != nil {
return t, err
}

return t.UTC(), nil
}

type messageParser struct {
disallowCustomMessages bool
}
Expand Down Expand Up @@ -153,11 +182,11 @@ func (e *messageParser) parseRecord(record []byte, labelSet model.LabelSet, rela
}

func (e *messageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time {
if !useIncomingTimestamp || logRecord.Time == "" {
if !useIncomingTimestamp || !logRecord.isTimeOrTimeStampFieldSet() {
return messageTime
}

recordTime, err := time.Parse(time.RFC3339, logRecord.Time)
recordTime, err := logRecord.getTime()
if err != nil {
return messageTime
}
Expand Down
34 changes: 34 additions & 0 deletions clients/pkg/promtail/targets/azureeventhubs/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,37 @@ func readFile(t *testing.T, filename string) []byte {
assert.NoError(t, err)
return data
}

func Test_parseMessage_message_without_time_with_time_stamp(t *testing.T) {
messageParser := &messageParser{
disallowCustomMessages: true,
}

message := &sarama.ConsumerMessage{
Value: readFile(t, "testdata/message_without_time_with_time_stamp.json"),
Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC),
}

entries, err := messageParser.Parse(message, nil, nil, true)
assert.NoError(t, err)
assert.Len(t, entries, 1)

expectedLine1 := "{\n \"timeStamp\": \"2024-09-18T00:45:09+00:00\",\n \"resourceId\": \"/RESOURCE_ID\",\n \"operationName\": \"ApplicationGatewayAccess\",\n \"category\": \"ApplicationGatewayAccessLog\"\n }"
assert.Equal(t, expectedLine1, entries[0].Line)

assert.Equal(t, time.Date(2024, time.September, 18, 00, 45, 9, 0, time.UTC), entries[0].Timestamp)
}

func Test_parseMessage_message_without_time_and_time_stamp(t *testing.T) {
messageParser := &messageParser{
disallowCustomMessages: true,
}

message := &sarama.ConsumerMessage{
Value: readFile(t, "testdata/message_without_time_and_time_stamp.json"),
Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC),
}

_, err := messageParser.Parse(message, nil, nil, true)
assert.EqualError(t, err, "required field or fields is empty")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"records": [
{
"resourceId": "/RESOURCE_ID",
"operationName": "ApplicationGatewayAccess",
"category": "ApplicationGatewayAccessLog"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"records": [
{
"timeStamp": "2024-09-18T00:45:09+00:00",
"resourceId": "/RESOURCE_ID",
"operationName": "ApplicationGatewayAccess",
"category": "ApplicationGatewayAccessLog"
}
]
}

0 comments on commit 2e62abb

Please sign in to comment.