-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriber_test.go
155 lines (127 loc) · 3.94 KB
/
subscriber_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package sirkeji
import (
"errors"
"sync"
"testing"
"time"
)
// MockSubscriber is a mock implementation of the Subscriber interface for testing.
type MockSubscriber struct {
uid string
processed []Event
subscribed bool
unsubscribed bool
sync.Mutex
}
func NewMockSubscriber(uid string) *MockSubscriber {
return &MockSubscriber{
uid: uid,
processed: []Event{},
}
}
func (ms *MockSubscriber) Uid() string {
return ms.uid
}
func (ms *MockSubscriber) Process(event Event) {
ms.Lock()
defer ms.Unlock()
ms.processed = append(ms.processed, event)
}
func (ms *MockSubscriber) Subscribed() {
ms.Lock()
defer ms.Unlock()
ms.subscribed = true
}
func (ms *MockSubscriber) Unsubscribed() {
ms.Lock()
defer ms.Unlock()
ms.unsubscribed = true
}
func (ms *MockSubscriber) GetProcessedEvents() []Event {
ms.Lock()
defer ms.Unlock()
return ms.processed
}
// TestNewSubscriptionManager verifies the behavior of NewSubscriptionManager.
func TestNewSubscriptionManager(t *testing.T) {
streamer := NewStreamer()
subscriber := NewMockSubscriber("test-subscriber")
t.Run("Valid Initialization", func(t *testing.T) {
manager, err := NewSubscriptionManager(streamer, subscriber)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if manager == nil {
t.Fatal("expected non-nil SubscriptionManager")
}
})
t.Run("Nil Streamer", func(t *testing.T) {
manager, err := NewSubscriptionManager(nil, subscriber)
if !errors.Is(err, ErrStreamerShouldNotBeNil) {
t.Fatalf("expected error: %v, got: %v", ErrStreamerShouldNotBeNil, err)
}
if manager != nil {
t.Fatal("expected nil SubscriptionManager")
}
})
t.Run("Nil Subscriber", func(t *testing.T) {
manager, err := NewSubscriptionManager(streamer, nil)
if !errors.Is(err, ErrSubscriberShouldNotBeNil) {
t.Fatalf("expected error: %v, got: %v", ErrSubscriberShouldNotBeNil, err)
}
if manager != nil {
t.Fatal("expected nil SubscriptionManager")
}
})
}
// TestSubscriptionManagerSubscribe verifies the behavior of the Subscribe method.
func TestSubscriptionManagerSubscribe(t *testing.T) {
streamer := NewStreamer()
subscriber := NewMockSubscriber("test-subscriber")
manager, err := NewSubscriptionManager(streamer, subscriber)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = manager.Subscribe()
if err != nil {
t.Fatalf("unexpected error during subscription: %v", err)
}
// Verify that the subscriber's Subscribed method was called
if !subscriber.subscribed {
t.Errorf("expected Subscribed to be called, but it wasn't")
}
// Verify that the subscriber receives published events
event := Event{Publisher: "system", Type: Info, Meta: "Test Event"}
streamer.Publish(event)
time.Sleep(100 * time.Millisecond) // Allow time for the event to propagate
processedEvents := subscriber.GetProcessedEvents()
if len(processedEvents) != 1 {
t.Fatalf("expected 1 processed event, got %d", len(processedEvents))
}
if processedEvents[0] != event {
t.Errorf("expected event %+v, got %+v", event, processedEvents[0])
}
}
// TestSubscriptionManagerUnsubscribe verifies the behavior of the Unsubscribe method.
func TestSubscriptionManagerUnsubscribe(t *testing.T) {
streamer := NewStreamer()
subscriber := NewMockSubscriber("test-subscriber")
manager, err := NewSubscriptionManager(streamer, subscriber)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_ = manager.Subscribe()
manager.Unsubscribe()
// Verify that the subscriber's Unsubscribed method was called
if !subscriber.unsubscribed {
t.Errorf("expected Unsubscribed to be called, but it wasn't")
}
// Verify that the subscriber no longer receives events
event := Event{Publisher: "system", Type: Info, Meta: "Test Event"}
streamer.Publish(event)
time.Sleep(100 * time.Millisecond) // Allow time for events to propagate
processedEvents := subscriber.GetProcessedEvents()
if len(processedEvents) != 0 {
t.Fatalf("expected 0 processed events, got %d", len(processedEvents))
}
}