diff --git a/mem/example_test.go b/mem/example_test.go index 3e20514..747b45a 100644 --- a/mem/example_test.go +++ b/mem/example_test.go @@ -28,7 +28,7 @@ func Example_primitives() { for _, row := range lines { for _, col := range row { - w := t2.NewWriter() + w := t2.NewWriter(ctx) w.Write([]byte(col)) w.Close() } @@ -45,7 +45,7 @@ func Example_primitives() { return err } - w := t3.NewWriter() + w := t3.NewWriter(ctx) w.Attributes().Set("Length", fmt.Sprintf("%d", len(body))) w.Attributes().Set("StartsWith", string(body[0:1])) @@ -67,7 +67,7 @@ func Example_primitives() { } for _, m := range messages { - w := t1.NewWriter() + w := t1.NewWriter(context.Background()) w.Write(m) w.Close() } diff --git a/mem/server_test.go b/mem/server_test.go index 0ce6ade..754614f 100644 --- a/mem/server_test.go +++ b/mem/server_test.go @@ -29,21 +29,6 @@ func (r *ConcurrentReceiver) Receive(ctx context.Context, m *msg.Message) error return nil } -// PanicReceiver panics upon consumption of a message. -// It is safe to utilize by concurrent goroutines. -type PanicReceiver struct { - t *testing.T -} - -func (r *PanicReceiver) Receive(ctx context.Context, m *msg.Message) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - panic("AHHH") - } -} - // RetryReceiver returns an error upon consumption of a Message. Once it // has been called a certain number of times, it writes to an channel and // returns nil. diff --git a/mem/topic.go b/mem/topic.go index c4f31cf..a56a7f2 100644 --- a/mem/topic.go +++ b/mem/topic.go @@ -2,6 +2,7 @@ package mem import ( "bytes" + "context" "sync" "github.com/zerofox-oss/go-msg" @@ -17,7 +18,7 @@ var _ msg.Topic = &Topic{} // NewWriter returns a MessageWriter. // The MessageWriter may be used to write messages to a channel. -func (t *Topic) NewWriter() msg.MessageWriter { +func (t *Topic) NewWriter(context.Context) msg.MessageWriter { return &MessageWriter{ c: t.C, diff --git a/mem/topic_test.go b/mem/topic_test.go index f583e25..0971b5f 100644 --- a/mem/topic_test.go +++ b/mem/topic_test.go @@ -1,6 +1,7 @@ package mem_test import ( + "context" "testing" "github.com/zerofox-oss/go-msg" @@ -10,7 +11,7 @@ import ( func TestMessageWriter_Attributes(t *testing.T) { testTopic := &mem.Topic{} - w := testTopic.NewWriter() + w := testTopic.NewWriter(context.Background()) attrs := w.Attributes() attrs.Set("test", "value") @@ -26,7 +27,7 @@ func TestMessageWriter_WriteAndClose(t *testing.T) { } go func() { - w := testTopic.NewWriter() + w := testTopic.NewWriter(context.Background()) w.Write([]byte("Don't ")) w.Write([]byte("call me ")) w.Write([]byte("junior!")) @@ -53,7 +54,7 @@ func TestMesageWriter_SingleUse(t *testing.T) { C: channel, } - w := testTopic.NewWriter() + w := testTopic.NewWriter(context.Background()) text := [][]byte{ []byte("I have a bad feeling about this..."), diff --git a/msg.go b/msg.go index 5bace86..7b3d7a9 100644 --- a/msg.go +++ b/msg.go @@ -162,14 +162,17 @@ type MessageWriter interface { // Multiple goroutines may invoke method on a Topic simultaneously. type Topic interface { // NewWriter returns a new MessageWriter - NewWriter() MessageWriter + NewWriter(context.Context) MessageWriter } // The TopicFunc is an adapter to allow the use of ordinary functions // as a Topic. TopicFunc(f) is a Topic that calls f. -type TopicFunc func() MessageWriter +type TopicFunc func(context.Context) MessageWriter + +// Ensure TopicFunc implements Topic +var _ Topic = TopicFunc(nil) // NewWriter calls f(ctx,m) -func (t TopicFunc) NewWriter() MessageWriter { - return t() +func (t TopicFunc) NewWriter(ctx context.Context) MessageWriter { + return t(ctx) }