diff --git a/README.md b/README.md index 5e83042..db751d3 100644 --- a/README.md +++ b/README.md @@ -26,163 +26,46 @@ go get github.com/thisiscetin/sirkeji ## Getting Started -To demonstrate Sirkeji, let's build a system where one component randomly publishes numbers, and others react to following events. +*Please head to the examples folder for a working example.* -Start by subscribing a built-in logger to visualize events, and a blocking function waiting for SIGTERM. -```go -package main - -import "github.com/thisiscetin/sirkeji" - -var ( - gStreamer = sirkeji.NewStreamer() -) - -func main() { - sirkeji.Subscribe(gStreamer, sirkeji.NewLogger()) - - sirkeji.WaitForTermination(gStreamer) -} -``` +Sirkeji needs components to implement `sirkeji.Subscriber` interface to connect them to the streamer. -Register an event and define your first component. +- a `Uid() string` function which returns a `string` unique id of the component +- a `Process(event sirkeji.Event)` function, which is called in a dedicated goroutine for processing events +- a `Subscribed()` function to perform boot-up operations like initializing a ticker in a separate goroutine +- a `Unsubscribed()` function to perform clean-up operations and handling graceful shutdowns ```go -var Number sirkeji.EventType = "Number" -func init() { - sirkeji.RegisterEventType(Number) -} +type Publisher struct {} -type NumberPublisher struct { - uid string - publish func(e sirkeji.Event) -} - -func NewNumberPublisher(publish func(e sirkeji.Event)) NumberPublisher { - return NumberPublisher{ - uid: fmt.Sprintf("number-publisher-%d", time.Now().UnixMilli()), - publish: publish, - } -} - -func (n NumberPublisher) Uid() string { - return n.uid -} - -func (n NumberPublisher) Process(event sirkeji.Event) {} - -func (n NumberPublisher) OnSubscribed() { - go func() { - for { - time.Sleep(1 * time.Second) - randomNumber := rand.Intn(100) - - n.publish(sirkeji.Event{ - Publisher: n.uid, - Type: Number, - Meta: strconv.Itoa(randomNumber), - Payload: randomNumber, - }) - } - }() -} - -func (n NumberPublisher) OnUnsubscribed() {} -``` - -Attach your component to the streamer. - -```go -func main() { - sirkeji.Subscribe(gStreamer, sirkeji.NewLogger()) - sirkeji.Subscribe(gStreamer, NewNumberPublisher(gStreamer.Publish)) +func (p *Publisher) Uid() string {} - sirkeji.WaitForTermination(gStreamer) -} -``` +func (p *Publisher) Process(event sirkeji.Event) {} -When you run the application, you will **immediately** see working software, a system that can be built brick by brick, and an output like the one below. +func (p *Publisher) Subscribed() {} -```bash -2024/11/26 00:12:38 [logger-1732569158496] subscribed to the streamer -2024/11/26 00:12:38 [number-publisher-1732569158496] subscribed to the streamer -2024/11/26 00:12:39.497028 [number-publisher-1732569158496] *Number*, m: 59 | pl: full -2024/11/26 00:12:40.497705 [number-publisher-1732569158496] *Number*, m: 85 | pl: full -2024/11/26 00:12:41.497917 [number-publisher-1732569158496] *Number*, m: 95 | pl: full -2024/11/26 00:12:42.498675 [number-publisher-1732569158496] *Number*, m: 3 | pl: full +func (p *Publisher) Unsubscribed() {} ``` -Let's create another component that listens for a `Number` event, squares it, and publishes a `SquaredNumber` event. +One way to subscribe components to a stream is as follows: ```go -var SquaredNumber sirkeji.EventType = "SquaredNumber" - -func init() { - sirkeji.RegisterEventType(SquaredNumber) -} - -type SquaredNumberPublisher struct { - uid string - publish func(e sirkeji.Event) -} - -func NewSquaredNumberPublisher(publish func(e sirkeji.Event)) SquaredNumberPublisher { - return SquaredNumberPublisher{ - uid: fmt.Sprintf("squared-number-publisher-%d", time.Now().UnixMilli()), - publish: publish, - } -} - -func (s SquaredNumberPublisher) Uid() string { - return s.uid -} - -func (s SquaredNumberPublisher) Process(event sirkeji.Event) { - if event.Type == Number { - number := event.Payload.(int) - numberSq := number * number - - s.publish(sirkeji.Event{ - Publisher: s.uid, - Type: SquaredNumber, - Meta: strconv.Itoa(numberSq), - Payload: numberSq, - }) - } -} - -func (s SquaredNumberPublisher) OnSubscribed() {} - -func (s SquaredNumberPublisher) OnUnsubscribed() {} - -``` - -Don't forget to attach component to the streamer. +var ( + gStreamer = sirkeji.NewStreamer() +) -```go func main() { sirkeji.Subscribe(gStreamer, sirkeji.NewLogger()) - sirkeji.Subscribe(gStreamer, NewNumberPublisher(gStreamer.Publish)) - sirkeji.Subscribe(gStreamer, NewSquaredNumberPublisher(gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number.NewPublisher("number-publisher-1", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number.NewPublisher("number-publisher-2", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, squared_number.NewPublisher("squared-number-publisher-1", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number_count.NewPublisher("number-count-publisher-1", gStreamer.Publish)) sirkeji.WaitForTermination(gStreamer) } ``` -When you run this code you will see an output like below. - -```bash -2024/11/26 00:22:51 [logger-1732569771508] subscribed to the streamer -2024/11/26 00:22:51 [number-publisher-1732569771508] subscribed to the streamer -2024/11/26 00:22:51 [squared-number-publisher-1732569771508] subscribed to the streamer -2024/11/26 00:22:52.509421 [squared-number-publisher-1732569771508] *SquaredNumber*, m: 8464 | pl: full -2024/11/26 00:22:52.509539 [number-publisher-1732569771508] *Number*, m: 92 | pl: full -2024/11/26 00:22:53.510132 [squared-number-publisher-1732569771508] *SquaredNumber*, m: 2704 | pl: full -2024/11/26 00:22:53.510227 [number-publisher-1732569771508] *Number*, m: 52 | pl: full -2024/11/26 00:22:54.510833 [squared-number-publisher-1732569771508] *SquaredNumber*, m: 6889 | pl: full -``` - *Note: With Sirkeji, you can also subscribe and unsubscribe components dynamically and perform much more complex operations. Please refer to the godoc for details.* ## Contributing diff --git a/example/numbers/README.md b/example/numbers/README.md new file mode 100644 index 0000000..3004fb2 --- /dev/null +++ b/example/numbers/README.md @@ -0,0 +1,40 @@ +# sirkeji + +## numbers example + +This example will use sirkeji to demonstrate, +- creating components with their lifecycle methods +- creating a component with an internal state +- subscribing components to the streamer +- subscribing to the streamer with same component multiple times +- defining events in a central sub-package + +```go +func main() { + sirkeji.Subscribe(gStreamer, sirkeji.NewLogger()) + sirkeji.Subscribe(gStreamer, number.NewPublisher("number-publisher-1", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number.NewPublisher("number-publisher-2", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, squared_number.NewPublisher("squared-number-publisher-1", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number_count.NewPublisher("number-count-publisher-1", gStreamer.Publish)) + + sirkeji.WaitForTermination(gStreamer) +} +``` + +Running this package will produce an output like below: +```shell +2024/11/26 11:08:41 [logger-1732608521473] subscribed to the streamer +2024/11/26 11:08:41 [number-publisher-1] subscribed to the streamer +2024/11/26 11:08:41 [number-publisher-2] subscribed to the streamer +2024/11/26 11:08:41 [squared-number-publisher-1] subscribed to the streamer +2024/11/26 11:08:41 [number-count-publisher-1] subscribed to the streamer +2024/11/26 11:08:43.474799 [number-publisher-2] *Number*, m: 646 | pl: full +2024/11/26 11:08:43.475090 [squared-number-publisher-1] *SquaredNumber*, m: 289 | pl: full +2024/11/26 11:08:43.475067 [number-publisher-1] *Number*, m: 17 | pl: full +2024/11/26 11:08:43.475115 [squared-number-publisher-1] *SquaredNumber*, m: 417316 | pl: full +2024/11/26 11:08:45.474784 [number-publisher-2] *Number*, m: 491 | pl: full +2024/11/26 11:08:45.474819 [squared-number-publisher-1] *SquaredNumber*, m: 241081 | pl: full +2024/11/26 11:08:45.474846 [number-publisher-1] *Number*, m: 25 | pl: full +2024/11/26 11:08:45.474886 [squared-number-publisher-1] *SquaredNumber*, m: 625 | pl: full +2024/11/26 11:08:46.474780 [number-count-publisher-1] *NumberCountUpdate*, m: 4 | pl: full +``` \ No newline at end of file diff --git a/example/numbers/events/events.go b/example/numbers/events/events.go new file mode 100644 index 0000000..a36d6c8 --- /dev/null +++ b/example/numbers/events/events.go @@ -0,0 +1,13 @@ +package events + +import "github.com/thisiscetin/sirkeji" + +var ( + Number sirkeji.EventType = "Number" + SquaredNumber sirkeji.EventType = "SquaredNumber" + NumberCountUpdate sirkeji.EventType = "NumberCountUpdate" +) + +func init() { + sirkeji.RegisterEventTypes(Number, SquaredNumber, NumberCountUpdate) +} diff --git a/example/numbers/main.go b/example/numbers/main.go new file mode 100644 index 0000000..16174e4 --- /dev/null +++ b/example/numbers/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "github.com/thisiscetin/sirkeji" + "github.com/thisiscetin/sirkeji/example/numbers/number" + "github.com/thisiscetin/sirkeji/example/numbers/number_count" + "github.com/thisiscetin/sirkeji/example/numbers/squared_number" +) + +var ( + gStreamer = sirkeji.NewStreamer() +) + +func main() { + sirkeji.Subscribe(gStreamer, sirkeji.NewLogger()) + sirkeji.Subscribe(gStreamer, number.NewPublisher("number-publisher-1", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number.NewPublisher("number-publisher-2", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, squared_number.NewPublisher("squared-number-publisher-1", gStreamer.Publish)) + sirkeji.Subscribe(gStreamer, number_count.NewPublisher("number-count-publisher-1", gStreamer.Publish)) + + sirkeji.WaitForTermination(gStreamer) +} diff --git a/example/numbers/number/publisher.go b/example/numbers/number/publisher.go new file mode 100644 index 0000000..aa1610b --- /dev/null +++ b/example/numbers/number/publisher.go @@ -0,0 +1,44 @@ +package number + +import ( + "github.com/thisiscetin/sirkeji" + "github.com/thisiscetin/sirkeji/example/numbers/events" + "math/rand/v2" + "strconv" + "time" +) + +type Publisher struct { + uid string + publish func(e sirkeji.Event) +} + +func (p *Publisher) Uid() string { + return p.uid +} + +func (p *Publisher) Process(event sirkeji.Event) {} + +func (p *Publisher) Subscribed() { + go func() { + for range time.Tick(time.Second * 2) { + n := rand.IntN(1_000) + + p.publish(sirkeji.Event{ + Publisher: p.uid, + Type: events.Number, + Meta: strconv.Itoa(n), + Payload: n, + }) + } + }() +} + +func (p *Publisher) Unsubscribed() {} + +func NewPublisher(uid string, publish func(e sirkeji.Event)) *Publisher { + return &Publisher{ + uid, + publish, + } +} diff --git a/example/numbers/number_count/publisher.go b/example/numbers/number_count/publisher.go new file mode 100644 index 0000000..fb64a0c --- /dev/null +++ b/example/numbers/number_count/publisher.go @@ -0,0 +1,54 @@ +package number_count + +import ( + "github.com/thisiscetin/sirkeji" + "github.com/thisiscetin/sirkeji/example/numbers/events" + "strconv" + "sync" + "time" +) + +type Publisher struct { + uid string + publish func(e sirkeji.Event) + + count int + sync.RWMutex +} + +func (p *Publisher) Uid() string { + return p.uid +} + +func (p *Publisher) Process(event sirkeji.Event) { + if event.Type == events.Number { + p.Lock() + defer p.Unlock() + + p.count++ + } +} + +func (p *Publisher) Subscribed() { + go func() { + for range time.Tick(5 * time.Second) { + p.RLock() + p.publish(sirkeji.Event{ + Publisher: p.uid, + Type: events.NumberCountUpdate, + Meta: strconv.Itoa(p.count), + Payload: p.count, + }) + p.RUnlock() + } + }() +} + +func (p *Publisher) Unsubscribed() {} + +func NewPublisher(uid string, publish func(e sirkeji.Event)) *Publisher { + return &Publisher{ + uid: uid, + publish: publish, + } +} diff --git a/example/numbers/squared_number/publisher.go b/example/numbers/squared_number/publisher.go new file mode 100644 index 0000000..abe47db --- /dev/null +++ b/example/numbers/squared_number/publisher.go @@ -0,0 +1,41 @@ +package squared_number + +import ( + "github.com/thisiscetin/sirkeji" + "github.com/thisiscetin/sirkeji/example/numbers/events" + "strconv" +) + +type Publisher struct { + uid string + publish func(e sirkeji.Event) +} + +func (p *Publisher) Uid() string { + return p.uid +} + +func (p *Publisher) Process(event sirkeji.Event) { + if event.Type == events.Number { + n := event.Payload.(int) + nSquare := n * n + + p.publish(sirkeji.Event{ + Publisher: p.uid, + Type: events.SquaredNumber, + Meta: strconv.Itoa(nSquare), + Payload: nSquare, + }) + } +} + +func (p *Publisher) Subscribed() {} + +func (p *Publisher) Unsubscribed() {} + +func NewPublisher(uid string, publish func(e sirkeji.Event)) *Publisher { + return &Publisher{ + uid: uid, + publish: publish, + } +} diff --git a/logger.go b/logger.go index fcb746d..18288eb 100644 --- a/logger.go +++ b/logger.go @@ -106,8 +106,8 @@ func (l *Logger) Process(event Event) { // // Example: // -// logger.OnSubscribed() -func (l *Logger) OnSubscribed() {} +// logger.Subscribed() +func (l *Logger) Subscribed() {} // OnUnsubscribed is called when the Logger is unsubscribed from a Streamer. // @@ -116,5 +116,5 @@ func (l *Logger) OnSubscribed() {} // // Example: // -// logger.OnUnsubscribed() -func (l *Logger) OnUnsubscribed() {} +// logger.Unsubscribed() +func (l *Logger) Unsubscribed() {} diff --git a/subscriber.go b/subscriber.go index e1fb0a1..a4e2aff 100644 --- a/subscriber.go +++ b/subscriber.go @@ -25,15 +25,15 @@ type Subscriber interface { // - event: The Event instance to be processed by the subscriber. Process(event Event) - // OnSubscribed is called when the subscriber is successfully connected to the Streamer. + // Subscribed is called when the subscriber is successfully connected to the Streamer. // // Use this method to perform any initialization or logging when the subscription is established. - OnSubscribed() + Subscribed() - // OnUnsubscribed is called when the subscriber is disconnected from the Streamer. + // Unsubscribed is called when the subscriber is disconnected from the Streamer. // // Use this method to perform any cleanup or logging when the subscription is terminated. - OnUnsubscribed() + Unsubscribed() } // SubscriptionManager manages the lifecycle of a Subscriber with a Streamer. @@ -107,7 +107,7 @@ func NewSubscriptionManager(streamer Streamer, subscriber Subscriber) (*Subscrip // // Behavior: // - Starts a goroutine to listen for events and route them to the Subscriber's Process method. -// - Calls the Subscriber's OnSubscribed method upon successful subscription. +// - Calls the Subscriber's Subscribed method upon successful subscription. // // Example: // @@ -128,7 +128,7 @@ func (sm *SubscriptionManager) Subscribe() error { } }(ch) - sm.subscriber.OnSubscribed() + sm.subscriber.Subscribed() log.Printf("[%s] subscribed to the streamer\n", sm.subscriber.Uid()) return nil @@ -137,7 +137,7 @@ func (sm *SubscriptionManager) Subscribe() error { // Unsubscribe disconnects the subscriber from the Streamer. // // This method removes the Subscriber from the Streamer, ensuring it no longer -// receives events. It also invokes the Subscriber's OnUnsubscribed method. +// receives events. It also invokes the Subscriber's Unsubscribed method. // // Parameters: // - None. @@ -146,7 +146,7 @@ func (sm *SubscriptionManager) Subscribe() error { // - None. // // Behavior: -// - Calls the Subscriber's OnUnsubscribed method after successfully unsubscribing. +// - Calls the Subscriber's Unsubscribed method after successfully unsubscribing. // // Example: // @@ -154,7 +154,7 @@ func (sm *SubscriptionManager) Subscribe() error { // manager.Unsubscribe() func (sm *SubscriptionManager) Unsubscribe() { sm.streamer.Unsubscribe(sm.subscriber.Uid()) - sm.subscriber.OnUnsubscribed() + sm.subscriber.Unsubscribed() log.Printf("[%s] unsubscribed from the streamer\n", sm.subscriber.Uid()) } diff --git a/subscriber_test.go b/subscriber_test.go index b0d12b8..2a9f301 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -9,10 +9,10 @@ import ( // MockSubscriber is a mock implementation of the Subscriber interface for testing. type MockSubscriber struct { - uid string - processed []Event - onSubscribed bool - onUnsubscribed bool + uid string + processed []Event + subscribed bool + unsubscribed bool sync.Mutex } @@ -34,18 +34,18 @@ func (ms *MockSubscriber) Process(event Event) { ms.processed = append(ms.processed, event) } -func (ms *MockSubscriber) OnSubscribed() { +func (ms *MockSubscriber) Subscribed() { ms.Lock() defer ms.Unlock() - ms.onSubscribed = true + ms.subscribed = true } -func (ms *MockSubscriber) OnUnsubscribed() { +func (ms *MockSubscriber) Unsubscribed() { ms.Lock() defer ms.Unlock() - ms.onUnsubscribed = true + ms.unsubscribed = true } func (ms *MockSubscriber) GetProcessedEvents() []Event { @@ -105,9 +105,9 @@ func TestSubscriptionManagerSubscribe(t *testing.T) { t.Fatalf("unexpected error during subscription: %v", err) } - // Verify that the subscriber's OnSubscribed method was called - if !subscriber.onSubscribed { - t.Errorf("expected OnSubscribed to be called, but it wasn't") + // Verify that the subscriber's Subscribed method was called + if !subscriber.subscribed { + t.Errorf("expected Subscribed to be called, but it wasn't") } // Verify that the subscriber receives published events @@ -137,9 +137,9 @@ func TestSubscriptionManagerUnsubscribe(t *testing.T) { _ = manager.Subscribe() manager.Unsubscribe() - // Verify that the subscriber's OnUnsubscribed method was called - if !subscriber.onUnsubscribed { - t.Errorf("expected OnUnsubscribed to be called, but it wasn't") + // Verify that the subscriber's Unsubscribed method was called + if !subscriber.unsubscribed { + t.Errorf("expected Unsubscribed to be called, but it wasn't") } // Verify that the subscriber no longer receives events