From a2746adbc3bbae3575a3c19e542b74d482357093 Mon Sep 17 00:00:00 2001 From: gabriel Date: Thu, 8 Feb 2024 20:08:00 +0100 Subject: [PATCH 1/3] Store msg.MessageId and msg.ReceiptHandle format: msg.MessageId + "-" + *msg.ReceiptHandle In column hash in the logs --- inputs/sqs/msg.go | 7 ++++++- inputs/sqs/pool.go | 1 + lib/msg.go | 1 + outputs/cmd/cmd.go | 1 + outputs/cmd/cmd_test.go | 9 +++++++-- reactorlog/jsonreactorlog/jsonreactorlog.go | 7 +++++++ reactorlog/noopreactorlog/noopreactorlog.go | 1 + reactorlog/reactorlog.go | 1 + 8 files changed, 25 insertions(+), 3 deletions(-) diff --git a/inputs/sqs/msg.go b/inputs/sqs/msg.go index e04ca02..a7465b7 100644 --- a/inputs/sqs/msg.go +++ b/inputs/sqs/msg.go @@ -11,6 +11,7 @@ type Msg struct { M *sqs.DeleteMessageBatchRequestEntry B []byte SentTimestamp int64 + Hash string } // Body will return the bytes of the SQS message @@ -21,4 +22,8 @@ func (m *Msg) Body() []byte { // CreationTimestampMilliseconds will return the creation timestamp for the message func (m *Msg) CreationTimestampMilliseconds() int64 { return m.SentTimestamp -} \ No newline at end of file +} + +func (m *Msg) GetHash() string { + return m.Hash +} diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index 8e6672a..c6ac8d7 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -177,6 +177,7 @@ func (p *sqsListen) deliver(msg *sqs.Message) { }, URL: aws.String(p.URL), SentTimestamp: sentTimestamp, + Hash: *msg.MessageId + "-" + *msg.ReceiptHandle, } jsonParsed, err := gabs.ParseJSON(m.B) diff --git a/lib/msg.go b/lib/msg.go index 562a022..bf197de 100644 --- a/lib/msg.go +++ b/lib/msg.go @@ -4,4 +4,5 @@ package lib type Msg interface { Body() []byte CreationTimestampMilliseconds() int64 + GetHash() string } diff --git a/outputs/cmd/cmd.go b/outputs/cmd/cmd.go index bf62ef2..b222158 100644 --- a/outputs/cmd/cmd.go +++ b/outputs/cmd/cmd.go @@ -170,6 +170,7 @@ func (o *Cmd) Run(rl reactorlog.ReactorLog, msg lib.Msg) error { logLabel := o.findReplace(msg, o.r.Label) rl.SetLabel(logLabel) + rl.SetHash(msg.GetHash()) ctx, cancel := context.WithTimeout(context.Background(), o.maximumCmdTimeLive) defer cancel() diff --git a/outputs/cmd/cmd_test.go b/outputs/cmd/cmd_test.go index 4eb0ab4..eae925d 100644 --- a/outputs/cmd/cmd_test.go +++ b/outputs/cmd/cmd_test.go @@ -9,8 +9,9 @@ import ( ) type Msg struct { - B []byte - ts int64 + B []byte + ts int64 + hash string } func (m *Msg) Body() []byte { @@ -21,6 +22,10 @@ func (m *Msg) CreationTimestampMilliseconds() int64 { return m.ts } +func (m *Msg) GetHash() string { + return m.hash +} + func TestJqReplaceActuallyReplacing(t *testing.T) { var r *reactor.Reactor = nil diff --git a/reactorlog/jsonreactorlog/jsonreactorlog.go b/reactorlog/jsonreactorlog/jsonreactorlog.go index d66aced..efb8336 100644 --- a/reactorlog/jsonreactorlog/jsonreactorlog.go +++ b/reactorlog/jsonreactorlog/jsonreactorlog.go @@ -39,12 +39,14 @@ type ReactorLog interface { Write(b []byte) (int, error) Start(pid int, s string) SetLabel(string) + SetHash(string) } // JSONReactorLog lets you log as json lines. type JSONReactorLog struct { Host string `json:",omitempty"` Label string `json:",omitempty"` + Hash string `json:",omitempty"` Pid int `json:",omitempty"` RID uint64 `json:",omitempty"` TID uint64 `json:",omitempty"` @@ -69,6 +71,10 @@ func (rl *JSONReactorLog) SetLabel(value string) { rl.Label = value } +func (rl *JSONReactorLog) SetHash(value string) { + rl.Hash = value +} + // Write will be called by the reactor and this bytes will be sent to the general log channel func (rl *JSONReactorLog) Write(b []byte) (int, error) { rl.Lock() @@ -146,6 +152,7 @@ func (rl *JSONReactorLog) printJSON() { func (rl *JSONReactorLog) reset() { rl.Host = "" rl.Label = "" + rl.Hash = "" rl.Pid = 0 rl.RID = 0 rl.TID = 0 diff --git a/reactorlog/noopreactorlog/noopreactorlog.go b/reactorlog/noopreactorlog/noopreactorlog.go index 74ee38a..5bc8275 100644 --- a/reactorlog/noopreactorlog/noopreactorlog.go +++ b/reactorlog/noopreactorlog/noopreactorlog.go @@ -4,6 +4,7 @@ type NoopReactorLog struct{} func (NoopReactorLog) Start(pid int, s string) {} func (NoopReactorLog) SetLabel(string) {} +func (NoopReactorLog) SetHash(string) {} func (NoopReactorLog) Done(error) {} func (NoopReactorLog) Write(b []byte) (int, error) { return len(b), nil diff --git a/reactorlog/reactorlog.go b/reactorlog/reactorlog.go index 9cd3222..55aa0c8 100644 --- a/reactorlog/reactorlog.go +++ b/reactorlog/reactorlog.go @@ -4,5 +4,6 @@ type ReactorLog interface { Write(b []byte) (int, error) Start(pid int, s string) SetLabel(string) + SetHash(string) Done(error) } From 7ebaaf4a0ba21802dc481dbfd4310bf10c24b817 Mon Sep 17 00:00:00 2001 From: gabriel Date: Thu, 8 Feb 2024 20:27:24 +0100 Subject: [PATCH 2/3] CGO_ENABLED=0 in github workflow --- .github/workflows/release.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 8379286..a21f1b2 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -29,10 +29,10 @@ jobs: run: go test -v ./... - name: Build linux amd64 - run: GOOS=linux GOARCH=amd64 go build -o goreactor_amd64 -v . + run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o goreactor_amd64 -v . - name: Build linux arm64 - run: GOOS=linux GOARCH=arm64 go build -o goreactor_arm64 -v . + run: CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o goreactor_arm64 -v . - name: Create Release id: create_release From a59e82e27690340472217d5704513991873b6c68 Mon Sep 17 00:00:00 2001 From: gabriel Date: Thu, 8 Feb 2024 20:54:21 +0100 Subject: [PATCH 3/3] Keep MessageId in logs --- inputs/sqs/pool.go | 2 +- reactorlog/jsonreactorlog/jsonreactorlog.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/inputs/sqs/pool.go b/inputs/sqs/pool.go index c6ac8d7..dc30c9a 100644 --- a/inputs/sqs/pool.go +++ b/inputs/sqs/pool.go @@ -177,7 +177,7 @@ func (p *sqsListen) deliver(msg *sqs.Message) { }, URL: aws.String(p.URL), SentTimestamp: sentTimestamp, - Hash: *msg.MessageId + "-" + *msg.ReceiptHandle, + Hash: *msg.MessageId, } jsonParsed, err := gabs.ParseJSON(m.B) diff --git a/reactorlog/jsonreactorlog/jsonreactorlog.go b/reactorlog/jsonreactorlog/jsonreactorlog.go index efb8336..834727e 100644 --- a/reactorlog/jsonreactorlog/jsonreactorlog.go +++ b/reactorlog/jsonreactorlog/jsonreactorlog.go @@ -127,7 +127,7 @@ func (rl *JSONReactorLog) Done(err error) { rl.Error = err.Error() } // https://github.com/golang/go/issues/5491#issuecomment-66079585 - rl.Elapse = time.Now().Sub(rl.st).Seconds() + rl.Elapse = time.Since(rl.st).Seconds() rl.printJSON() rl.reset() }