diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e960c2e8..451a8b274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#1838](https://github.com/NibiruChain/nibiru/pull/1838) - feat(eth): Go-ethereum, crypto, encoding, and unit tests for evm/types - [#1841](https://github.com/NibiruChain/nibiru/pull/1841) - feat(eth): Collections encoders for bytes, Ethereum addresses, and Ethereum hashes - [#1847](https://github.com/NibiruChain/nibiru/pull/1847) - fix(docker-chaosnet): release snapshot docker build failed CI. +- [#1855](https://github.com/NibiruChain/nibiru/pull/1855) - feat(eth-pubsub): Implement in-memory EventBus for real-time topic management and event distribution #### Dapp modules: perp, spot, etc diff --git a/eth/rpc/pubsub/pubsub.go b/eth/rpc/pubsub/pubsub.go new file mode 100644 index 000000000..f5baabd54 --- /dev/null +++ b/eth/rpc/pubsub/pubsub.go @@ -0,0 +1,174 @@ +// Copyright (c) 2023-2024 Nibi, Inc. +package pubsub + +import ( + "sync" + "sync/atomic" + + "github.com/pkg/errors" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" +) + +type UnsubscribeFunc func() + +// EventBus manages topics and subscriptions. A "topic" is a named channel of +// communication. A "subscription" is the action taken by a subscriber to express +// interest in receiving messages broadcasted from a specific topic. +type EventBus interface { + // AddTopic: Adds a new topic with the specified name and message source + AddTopic(name string, src <-chan coretypes.ResultEvent) error + // RemoveTopic: Removes the specified topic and all its related data, + // ensuring clean up of resources. + RemoveTopic(name string) + Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) + Topics() []string +} + +// memEventBus is an implemention of the `EventBus` interface. +type memEventBus struct { + topics map[string]<-chan coretypes.ResultEvent + topicsMux *sync.RWMutex + subscribers map[string]map[uint64]chan<- coretypes.ResultEvent + subscribersMux *sync.RWMutex + currentUniqueID uint64 +} + +// NewEventBus returns a fresh imlpemention of `memEventBus`, which implements +// the `EventBus` interface for managing Ethereum topics and subscriptions. +func NewEventBus() EventBus { + return &memEventBus{ + topics: make(map[string]<-chan coretypes.ResultEvent), + topicsMux: new(sync.RWMutex), + subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent), + subscribersMux: new(sync.RWMutex), + } +} + +// GenUniqueID atomically increments and returns a unique identifier for a new subscriber. +// This ID is used internally to manage subscriber-specific channels. +func (m *memEventBus) GenUniqueID() uint64 { + return atomic.AddUint64(&m.currentUniqueID, 1) +} + +// Topics returns a list of all topics currently managed by the EventBus. The +// list is safe for concurrent access and is a snapshot of current topic names. +func (m *memEventBus) Topics() (topics []string) { + m.topicsMux.RLock() + defer m.topicsMux.RUnlock() + + topics = make([]string, 0, len(m.topics)) + for topicName := range m.topics { + topics = append(topics, topicName) + } + + return topics +} + +// AddTopic adds a new topic with the specified name and message source +func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error { + m.topicsMux.RLock() + _, ok := m.topics[name] + m.topicsMux.RUnlock() + + if ok { + return errors.New("topic already registered") + } + + m.topicsMux.Lock() + m.topics[name] = src + m.topicsMux.Unlock() + + go m.publishTopic(name, src) + + return nil +} + +// RemoveTopic: Removes the specified topic and all its related data, ensuring +// clean up of resources. +func (m *memEventBus) RemoveTopic(name string) { + m.topicsMux.Lock() + delete(m.topics, name) + m.topicsMux.Unlock() +} + +// Subscribe attempts to create a subscription to the specified topic. It returns +// a channel to receive messages, a function to unsubscribe, and an error if the +// topic does not exist. +func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) { + m.topicsMux.RLock() + _, ok := m.topics[name] + m.topicsMux.RUnlock() + + if !ok { + return nil, nil, errors.Errorf("topic not found: %s", name) + } + + ch := make(chan coretypes.ResultEvent) + m.subscribersMux.Lock() + defer m.subscribersMux.Unlock() + + id := m.GenUniqueID() + if _, ok := m.subscribers[name]; !ok { + m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent) + } + m.subscribers[name][id] = ch + + unsubscribe := func() { + m.subscribersMux.Lock() + defer m.subscribersMux.Unlock() + delete(m.subscribers[name], id) + } + + return ch, unsubscribe, nil +} + +func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) { + for { + msg, ok := <-src + if !ok { + m.closeAllSubscribers(name) + m.topicsMux.Lock() + delete(m.topics, name) + m.topicsMux.Unlock() + return + } + m.publishAllSubscribers(name, msg) + } +} + +// closeAllSubscribers closes all subscriber channels associated with the +// specified topic and removes the topic from the subscribers map. This function +// is typically called when a topic is deleted or no longer available to ensure +// all resources are released properly and to prevent goroutine leaks. It ensures +// thread-safe execution by locking around the operation. +func (m *memEventBus) closeAllSubscribers(name string) { + m.subscribersMux.Lock() + defer m.subscribersMux.Unlock() + + subscribers := m.subscribers[name] + delete(m.subscribers, name) + // #nosec G705 + for _, sub := range subscribers { + close(sub) + } +} + +// publishAllSubscribers sends a message to all subscribers of the specified +// topic. It uses a non-blocking send operation to deliver the message to +// subscriber channels. If a subscriber's channel is not ready to receive the +// message (i.e., the channel is full), the message is skipped for that +// subscriber to avoid blocking the publisher. This function ensures thread-safe +// access to subscribers by using a read lock. +func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) { + m.subscribersMux.RLock() + defer m.subscribersMux.RUnlock() + subscribers := m.subscribers[name] + // #nosec G705 + for _, sub := range subscribers { + select { + case sub <- msg: + default: + } + } +} diff --git a/eth/rpc/pubsub/pubsub_test.go b/eth/rpc/pubsub/pubsub_test.go new file mode 100644 index 000000000..e03de763d --- /dev/null +++ b/eth/rpc/pubsub/pubsub_test.go @@ -0,0 +1,155 @@ +package pubsub + +import ( + "log" + "sort" + "sync" + "testing" + "time" + + rpccore "github.com/cometbft/cometbft/rpc/core/types" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// subscribeAndPublish: Helper function used to perform concurrent subscription +// and publishing actions. It concurrently subscribes multiple clients to the +// specified topic and simultanesouly sends an empty message to the topic channel +// for each subscription. +func subscribeAndPublish(t *testing.T, eb EventBus, topic string, topicChan chan rpccore.ResultEvent) { + var ( + wg sync.WaitGroup + subscribersCount = 50 + emptyMsg = rpccore.ResultEvent{} + ) + for i := 0; i < subscribersCount; i++ { + wg.Add(1) + // concurrently subscribe to the topic + go func() { + defer wg.Done() + _, _, err := eb.Subscribe(topic) + require.NoError(t, err) + }() + + // send events to the topic + wg.Add(1) + go func() { + defer wg.Done() + topicChan <- emptyMsg + }() + } + wg.Wait() +} + +type SuitePubsub struct { + suite.Suite +} + +func TestSuitePubsub(t *testing.T) { + suite.Run(t, new(SuitePubsub)) +} + +func (s *SuitePubsub) TestAddTopic() { + q := NewEventBus() + // dummy vars + topicA := "guard" + topicB := "cream" + + s.NoError(q.AddTopic(topicA, make(<-chan rpccore.ResultEvent))) + s.NoError(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent))) + s.Error(q.AddTopic(topicB, make(<-chan rpccore.ResultEvent))) + + topics := q.Topics() + sort.Strings(topics) // cream should be first + s.Require().EqualValues([]string{topicB, topicA}, topics) +} + +func (s *SuitePubsub) TestSubscribe() { + q := NewEventBus() + + // dummy vars + topicA := "0xfoo" + topicB := "blockchain" + + srcA := make(chan rpccore.ResultEvent) + err := q.AddTopic(topicA, srcA) + s.NoError(err) + + srcB := make(chan rpccore.ResultEvent) + err = q.AddTopic(topicB, srcB) + s.NoError(err) + + // subscriber channels + subChanA, _, err := q.Subscribe(topicA) + s.NoError(err) + subChanB1, _, err := q.Subscribe(topicB) + s.NoError(err) + subChanB2, _, err := q.Subscribe(topicB) + s.NoError(err) + + wg := new(sync.WaitGroup) + wg.Add(4) + + emptyMsg := rpccore.ResultEvent{} + go func() { + defer wg.Done() + msg := <-subChanA + log.Println(topicA+":", msg) + s.EqualValues(emptyMsg, msg) + }() + + go func() { + defer wg.Done() + msg := <-subChanB1 + log.Println(topicB+":", msg) + s.EqualValues(emptyMsg, msg) + }() + + go func() { + defer wg.Done() + msg := <-subChanB2 + log.Println(topicB+"2:", msg) + s.EqualValues(emptyMsg, msg) + }() + + go func() { + defer wg.Done() + + time.Sleep(time.Second) + + close(srcA) + close(srcB) + }() + + wg.Wait() + time.Sleep(time.Second) +} + +// TestConcurrentSubscribeAndPublish: Stress tests the module to make sure that +// operations are handled properly under concurrent access. +func (s *SuitePubsub) TestConcurrentSubscribeAndPublish() { + var ( + wg sync.WaitGroup + eb = NewEventBus() + topicName = "topic-name" + topicCh = make(chan rpccore.ResultEvent) + runsCount = 5 + ) + + err := eb.AddTopic(topicName, topicCh) + s.Require().NoError(err) + + for i := 0; i < runsCount; i++ { + subscribeAndPublish(s.T(), eb, topicName, topicCh) + } + + // close channel to make test end + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(2 * time.Second) + close(topicCh) + }() + + wg.Wait() +}