Skip to content

Commit

Permalink
179 dockter 2 (#204)
Browse files Browse the repository at this point in the history
* #179  Restore jsonOutput

* #179 Prepare for versioned release
  • Loading branch information
docktermj authored Jul 1, 2024
1 parent 452a7a0 commit a81570e
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

-

## [0.3.2] - 2024-07-01

### Changed in 0.3.2

- Restore `jsonOutput` parameter

## [0.3.1] - 2024-06-26

### Changed in 0.3.1
Expand Down
3 changes: 2 additions & 1 deletion queues/rabbitmq/managedConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func (j *RabbitConsumerJob) OnError(err error) {
// them to Senzing.
// - Workers restart when they are killed or die.
// - respond to standard system signals.
func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine *sz.SzEngine, withInfo bool, logLevel string) error {
func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine *sz.SzEngine, withInfo bool, logLevel string, jsonOutput bool) error {
_ = jsonOutput

// default to the max number of OS threads
if numberOfWorkers <= 0 {
Expand Down
3 changes: 2 additions & 1 deletion queues/rabbitmq/managedConsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestStartManagedConsumer(test *testing.T) {
szEngine *sz.SzEngine
withInfo bool
logLevel string
jsonOutput bool
}
tests := []struct {
name string
Expand All @@ -65,7 +66,7 @@ func TestStartManagedConsumer(test *testing.T) {
}
for _, tt := range tests {
test.Run(tt.name, func(test *testing.T) {
if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.logLevel); (err != nil) != tt.wantErr {
if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.logLevel, tt.args.jsonOutput); (err != nil) != tt.wantErr {
test.Errorf("StartManagedConsumer() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
3 changes: 2 additions & 1 deletion queues/rabbitmq/managedProducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func processRecord(ctx context.Context, record queues.Record, newClientFn func()
// the given queue.
// - Workers restart when they are killed or die.
// - respond to standard system signals.
func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string) {
func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string, jsonOutput bool) {
_ = jsonOutput

// default to the max number of OS threads
if numberOfWorkers <= 0 {
Expand Down
3 changes: 2 additions & 1 deletion queues/rabbitmq/managedProducer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestStartManagedProducer(test *testing.T) {
numberOfWorkers int
recordchan <-chan queues.Record
logLevel string
jsonOutput bool
}
tests := []struct {
name string
Expand All @@ -46,7 +47,7 @@ func TestStartManagedProducer(test *testing.T) {
for _, tt := range tests {
test.Run(tt.name, func(test *testing.T) {
_ = test
StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel)
StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel, tt.args.jsonOutput)
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions queues/sqs/managedConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (j *Job) OnError(ctx context.Context, err error) {
// them to Senzing.
// - Workers restart when they are killed or die.
// - respond to standard system signals.
func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine sz.SzEngine, withInfo bool, visibilitySeconds int32, logLevel string) error {
func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, szEngine sz.SzEngine, withInfo bool, visibilitySeconds int32, logLevel string, jsonOutput bool) error {

if szEngine == nil {
return errors.New("the G2 Engine is not set, unable to start the managed consumer")
Expand All @@ -129,7 +129,7 @@ func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers
log(3003, logLevel, err)
}

client, err := NewClient(ctx, urlString)
client, err := NewClient(ctx, urlString, logLevel, jsonOutput)
if err != nil {
return fmt.Errorf("unable to get a new SQS client, %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions queues/sqs/managedConsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestStartManagedConsumer(test *testing.T) {
withInfo bool
visibilitySeconds int32
logLevel string
jsonOutput bool
}
tests := []struct {
name string
Expand All @@ -68,9 +69,9 @@ func TestStartManagedConsumer(test *testing.T) {
// TODO: Add test cases.
}
for _, tt := range tests {
test.Run(tt.name, func(test *testing.T) {
if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.visibilitySeconds, tt.args.logLevel); (err != nil) != tt.wantErr {
test.Errorf("StartManagedConsumer() error = %v, wantErr %v", err, tt.wantErr)
test.Run(tt.name, func(t *testing.T) {
if err := StartManagedConsumer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.szEngine, tt.args.withInfo, tt.args.visibilitySeconds, tt.args.logLevel, tt.args.jsonOutput); (err != nil) != tt.wantErr {
t.Errorf("StartManagedConsumer() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions queues/sqs/managedProducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func processRecordBatch(ctx context.Context, recordchan <-chan queues.Record, ne
// the given queue.
// - Workers restart when they are killed or die.
// - respond to standard system signals.
func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string) {
func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string, jsonOutput bool) {

// default to the max number of OS threads
if numberOfWorkers <= 0 {
Expand All @@ -81,7 +81,7 @@ func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers
}

clientPool = make(chan *Client, numberOfWorkers)
newClientFn := func() (*Client, error) { return NewClient(ctx, urlString) }
newClientFn := func() (*Client, error) { return NewClient(ctx, urlString, logLevel, jsonOutput) }

// populate an initial client pool
go func() { _ = createClients(ctx, numberOfWorkers, newClientFn) }()
Expand Down
3 changes: 2 additions & 1 deletion queues/sqs/managedProducer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestStartManagedProducer(test *testing.T) {
numberOfWorkers int
recordchan <-chan queues.Record
logLevel string
jsonOutput bool
}
tests := []struct {
name string
Expand All @@ -46,7 +47,7 @@ func TestStartManagedProducer(test *testing.T) {
for _, tt := range tests {
test.Run(tt.name, func(test *testing.T) {
_ = test
StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel)
StartManagedProducer(tt.args.ctx, tt.args.urlString, tt.args.numberOfWorkers, tt.args.recordchan, tt.args.logLevel, tt.args.jsonOutput)
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion queues/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type Client struct {
// ----------------------------------------------------------------------------

// New creates a single SQS client
func NewClient(ctx context.Context, urlString string) (*Client, error) {
func NewClient(ctx context.Context, urlString string, logLevel string, jsonOutput bool) (*Client, error) {
_ = logLevel
_ = jsonOutput
client := Client{
MaxDelay: 10 * time.Minute,
ReconnectDelay: 2 * time.Second,
Expand Down
8 changes: 5 additions & 3 deletions queues/sqs/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

func TestNewClient(test *testing.T) {
type args struct {
ctx context.Context
urlString string
ctx context.Context
urlString string
logLevel string
jsonOutput bool
}
tests := []struct {
name string
Expand All @@ -26,7 +28,7 @@ func TestNewClient(test *testing.T) {
}
for _, tt := range tests {
test.Run(tt.name, func(test *testing.T) {
got, err := NewClient(tt.args.ctx, tt.args.urlString)
got, err := NewClient(tt.args.ctx, tt.args.urlString, tt.args.logLevel, tt.args.jsonOutput)
if (err != nil) != tt.wantErr {
test.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down

0 comments on commit a81570e

Please sign in to comment.