From a81570ecfc2dac86e22459349beced438bfd1bde Mon Sep 17 00:00:00 2001 From: Michael Dockter Date: Mon, 1 Jul 2024 14:54:19 -0400 Subject: [PATCH] 179 dockter 2 (#204) * #179 Restore jsonOutput * #179 Prepare for versioned release --- CHANGELOG.md | 6 ++++++ queues/rabbitmq/managedConsumer.go | 3 ++- queues/rabbitmq/managedConsumer_test.go | 3 ++- queues/rabbitmq/managedProducer.go | 3 ++- queues/rabbitmq/managedProducer_test.go | 3 ++- queues/sqs/managedConsumer.go | 4 ++-- queues/sqs/managedConsumer_test.go | 7 ++++--- queues/sqs/managedProducer.go | 4 ++-- queues/sqs/managedProducer_test.go | 3 ++- queues/sqs/sqs.go | 4 +++- queues/sqs/sqs_test.go | 8 +++++--- 11 files changed, 32 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21b9901..f900116 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - +## [0.3.2] - 2024-07-01 + +### Changed in 0.3.2 + +- Restore `jsonOutput` parameter + ## [0.3.1] - 2024-06-26 ### Changed in 0.3.1 diff --git a/queues/rabbitmq/managedConsumer.go b/queues/rabbitmq/managedConsumer.go index 8c74f0f..a303310 100644 --- a/queues/rabbitmq/managedConsumer.go +++ b/queues/rabbitmq/managedConsumer.go @@ -78,7 +78,8 @@ func (j *RabbitConsumerJob) OnError(err error) { // them to Senzing. // - Workers restart when they are killed or die. // - respond to standard system signals. -func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine *sz.SzEngine, withInfo bool, logLevel string) error { +func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine *sz.SzEngine, withInfo bool, logLevel string, jsonOutput bool) error { + _ = jsonOutput // default to the max number of OS threads if numberOfWorkers <= 0 { diff --git a/queues/rabbitmq/managedConsumer_test.go b/queues/rabbitmq/managedConsumer_test.go index d2919f1..120a5ba 100644 --- a/queues/rabbitmq/managedConsumer_test.go +++ b/queues/rabbitmq/managedConsumer_test.go @@ -55,6 +55,7 @@ func TestStartManagedConsumer(test *testing.T) { szEngine *sz.SzEngine withInfo bool logLevel string + jsonOutput bool } tests := []struct { name string @@ -65,7 +66,7 @@ func TestStartManagedConsumer(test *testing.T) { } for _, tt := range tests { test.Run(tt.name, func(test *testing.T) { - if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.logLevel); (err != nil) != tt.wantErr { + if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.logLevel, tt.args.jsonOutput); (err != nil) != tt.wantErr { test.Errorf("StartManagedConsumer() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/queues/rabbitmq/managedProducer.go b/queues/rabbitmq/managedProducer.go index 248f1f1..69891a2 100644 --- a/queues/rabbitmq/managedProducer.go +++ b/queues/rabbitmq/managedProducer.go @@ -56,7 +56,8 @@ func processRecord(ctx context.Context, record queues.Record, newClientFn func() // the given queue. // - Workers restart when they are killed or die. // - respond to standard system signals. -func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string) { +func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string, jsonOutput bool) { + _ = jsonOutput // default to the max number of OS threads if numberOfWorkers <= 0 { diff --git a/queues/rabbitmq/managedProducer_test.go b/queues/rabbitmq/managedProducer_test.go index 757040e..baddd55 100644 --- a/queues/rabbitmq/managedProducer_test.go +++ b/queues/rabbitmq/managedProducer_test.go @@ -36,6 +36,7 @@ func TestStartManagedProducer(test *testing.T) { numberOfWorkers int recordchan <-chan queues.Record logLevel string + jsonOutput bool } tests := []struct { name string @@ -46,7 +47,7 @@ func TestStartManagedProducer(test *testing.T) { for _, tt := range tests { test.Run(tt.name, func(test *testing.T) { _ = test - StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel) + StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel, tt.args.jsonOutput) }) } } diff --git a/queues/sqs/managedConsumer.go b/queues/sqs/managedConsumer.go index dc50101..b338462 100644 --- a/queues/sqs/managedConsumer.go +++ b/queues/sqs/managedConsumer.go @@ -112,7 +112,7 @@ func (j *Job) OnError(ctx context.Context, err error) { // them to Senzing. // - Workers restart when they are killed or die. // - respond to standard system signals. -func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine sz.SzEngine, withInfo bool, visibilitySeconds int32, logLevel string) error { +func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine sz.SzEngine, withInfo bool, visibilitySeconds int32, logLevel string, jsonOutput bool) error { if szEngine == nil { return errors.New("the G2 Engine is not set, unable to start the managed consumer") @@ -129,7 +129,7 @@ func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers log(3003, logLevel, err) } - client, err := NewClient(ctx, urlString) + client, err := NewClient(ctx, urlString, logLevel, jsonOutput) if err != nil { return fmt.Errorf("unable to get a new SQS client, %w", err) } diff --git a/queues/sqs/managedConsumer_test.go b/queues/sqs/managedConsumer_test.go index 59f3e03..5ca9a5f 100644 --- a/queues/sqs/managedConsumer_test.go +++ b/queues/sqs/managedConsumer_test.go @@ -59,6 +59,7 @@ func TestStartManagedConsumer(test *testing.T) { withInfo bool visibilitySeconds int32 logLevel string + jsonOutput bool } tests := []struct { name string @@ -68,9 +69,9 @@ func TestStartManagedConsumer(test *testing.T) { // TODO: Add test cases. } for _, tt := range tests { - test.Run(tt.name, func(test *testing.T) { - if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.visibilitySeconds, tt.args.logLevel); (err != nil) != tt.wantErr { - test.Errorf("StartManagedConsumer() error = %v, wantErr %v", err, tt.wantErr) + test.Run(tt.name, func(t *testing.T) { + if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.visibilitySeconds, tt.args.logLevel, tt.args.jsonOutput); (err != nil) != tt.wantErr { + t.Errorf("StartManagedConsumer() error = %v, wantErr %v", err, tt.wantErr) } }) } diff --git a/queues/sqs/managedProducer.go b/queues/sqs/managedProducer.go index 8005211..332275f 100644 --- a/queues/sqs/managedProducer.go +++ b/queues/sqs/managedProducer.go @@ -67,7 +67,7 @@ func processRecordBatch(ctx context.Context, recordchan <-chan queues.Record, ne // the given queue. // - Workers restart when they are killed or die. // - respond to standard system signals. -func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string) { +func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string, jsonOutput bool) { // default to the max number of OS threads if numberOfWorkers <= 0 { @@ -81,7 +81,7 @@ func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers } clientPool = make(chan *Client, numberOfWorkers) - newClientFn := func() (*Client, error) { return NewClient(ctx, urlString) } + newClientFn := func() (*Client, error) { return NewClient(ctx, urlString, logLevel, jsonOutput) } // populate an initial client pool go func() { _ = createClients(ctx, numberOfWorkers, newClientFn) }() diff --git a/queues/sqs/managedProducer_test.go b/queues/sqs/managedProducer_test.go index 7e2deaf..99f7fb1 100644 --- a/queues/sqs/managedProducer_test.go +++ b/queues/sqs/managedProducer_test.go @@ -36,6 +36,7 @@ func TestStartManagedProducer(test *testing.T) { numberOfWorkers int recordchan <-chan queues.Record logLevel string + jsonOutput bool } tests := []struct { name string @@ -46,7 +47,7 @@ func TestStartManagedProducer(test *testing.T) { for _, tt := range tests { test.Run(tt.name, func(test *testing.T) { _ = test - StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel) + StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel, tt.args.jsonOutput) }) } } diff --git a/queues/sqs/sqs.go b/queues/sqs/sqs.go index c93457a..b9e279b 100644 --- a/queues/sqs/sqs.go +++ b/queues/sqs/sqs.go @@ -43,7 +43,9 @@ type Client struct { // ---------------------------------------------------------------------------- // New creates a single SQS client -func NewClient(ctx context.Context, urlString string) (*Client, error) { +func NewClient(ctx context.Context, urlString string, logLevel string, jsonOutput bool) (*Client, error) { + _ = logLevel + _ = jsonOutput client := Client{ MaxDelay: 10 * time.Minute, ReconnectDelay: 2 * time.Second, diff --git a/queues/sqs/sqs_test.go b/queues/sqs/sqs_test.go index 6482726..55e3fc2 100644 --- a/queues/sqs/sqs_test.go +++ b/queues/sqs/sqs_test.go @@ -13,8 +13,10 @@ import ( func TestNewClient(test *testing.T) { type args struct { - ctx context.Context - urlString string + ctx context.Context + urlString string + logLevel string + jsonOutput bool } tests := []struct { name string @@ -26,7 +28,7 @@ func TestNewClient(test *testing.T) { } for _, tt := range tests { test.Run(tt.name, func(test *testing.T) { - got, err := NewClient(tt.args.ctx, tt.args.urlString) + got, err := NewClient(tt.args.ctx, tt.args.urlString, tt.args.logLevel, tt.args.jsonOutput) if (err != nil) != tt.wantErr { test.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr) return