From 35d1a9bb57ae7aa3c9756ae72a95f81a5d084908 Mon Sep 17 00:00:00 2001 From: Paul Larsen Date: Sat, 11 Nov 2023 05:52:47 -0500 Subject: [PATCH] Create an UpdateDispatcher interface to decouple Dispatcher and Updater components (#120) * Make a new interface to abstract away the dispatcher, allowing for custom implementations * Move dispatcher to be an interface * fix lint * fix tests and add comments --- ext/common_test.go | 10 +++++++++- ext/dispatcher.go | 19 ++++++++++++++++--- ext/updater.go | 17 +++++------------ ext/updater_test.go | 16 ++++++++-------- samples/callbackqueryBot/main.go | 18 ++++++++---------- samples/commandBot/main.go | 18 ++++++++---------- samples/conversationBot/main.go | 18 ++++++++---------- samples/echoBot/main.go | 18 ++++++++---------- samples/echoMultiBot/main.go | 19 ++++++++----------- samples/echoWebhookBot/main.go | 18 ++++++++---------- samples/inlinequeryBot/main.go | 18 ++++++++---------- samples/metricsBot/main.go | 2 +- samples/middlewareBot/main.go | 18 ++++++++---------- samples/webappBot/main.go | 18 ++++++++---------- 14 files changed, 111 insertions(+), 116 deletions(-) diff --git a/ext/common_test.go b/ext/common_test.go index c82affd6..091e5845 100644 --- a/ext/common_test.go +++ b/ext/common_test.go @@ -1,6 +1,8 @@ package ext import ( + "errors" + "github.com/PaulSonOfLars/gotgbot/v2" ) @@ -21,11 +23,17 @@ func (d DummyHandler) Name() string { return "dummy" + d.N } +var ErrBadDispatcher = errors.New("can only inject updates if the dispatcher is of type *Dispatcher") + func (u *Updater) InjectUpdate(token string, upd gotgbot.Update) error { bData, ok := u.botMapping.getBot(token) if !ok { return ErrNotFound } - return u.Dispatcher.ProcessUpdate(bData.bot, &upd, nil) + d, ok := u.Dispatcher.(*Dispatcher) + if !ok { + return ErrBadDispatcher + } + return d.ProcessUpdate(bData.bot, &upd, nil) } diff --git a/ext/dispatcher.go b/ext/dispatcher.go index 30ed6f0c..b7a063e0 100644 --- a/ext/dispatcher.go +++ b/ext/dispatcher.go @@ -46,8 +46,18 @@ var ( ContinueGroups = errors.New("group iteration continued") ) +// The UpdateDispatcher interface is used to abstract away common Dispatcher implementations. +// It assumes that all incoming updates come through a JSON channel. +type UpdateDispatcher interface { + Start(b *gotgbot.Bot, updates <-chan json.RawMessage) + Stop() +} + +// The Dispatcher struct is the default UpdateDispatcher implementation. +// It supports grouping of update handlers, allowing for powerful update handling flows. +// Customise the handling of updates by wrapping the Processor struct. type Dispatcher struct { - // Processor defines how to process the raw updates being processed by the Dispatcher. + // Processor defines how to process the raw updates being handled by the Dispatcher. // This can be extended to include additional error handling, metrics, etc. Processor Processor @@ -80,6 +90,9 @@ type Dispatcher struct { waitGroup sync.WaitGroup } +// Ensure compile-time type safety. +var _ UpdateDispatcher = &Dispatcher{} + // DispatcherOpts can be used to configure or override default Dispatcher behaviours. type DispatcherOpts struct { // Processor allows for providing custom Processor interfaces with different behaviours. @@ -109,7 +122,7 @@ type DispatcherOpts struct { MaxRoutines int } -// NewDispatcher creates a new dispatcher, which process and handles incoming updates from the updates channel. +// NewDispatcher creates a new Dispatcher, which process and handles incoming updates from the updates channel. func NewDispatcher(opts *DispatcherOpts) *Dispatcher { var errHandler DispatcherErrorHandler var panicHandler DispatcherPanicHandler @@ -175,7 +188,7 @@ func (d *Dispatcher) MaxUsage() int { // Start to handle incoming updates. // This is a blocking method; it should be called as a goroutine, such that it can receive incoming updates. -func (d *Dispatcher) Start(b *gotgbot.Bot, updates chan json.RawMessage) { +func (d *Dispatcher) Start(b *gotgbot.Bot, updates <-chan json.RawMessage) { // Listen to updates as they come in from the updater. for upd := range updates { d.waitGroup.Add(1) diff --git a/ext/updater.go b/ext/updater.go index 546e5ff6..1f204274 100644 --- a/ext/updater.go +++ b/ext/updater.go @@ -26,7 +26,9 @@ type ErrorFunc func(error) type Updater struct { // Dispatcher is where all the incoming updates are sent to be processed. - Dispatcher *Dispatcher + // The Dispatcher runs in a separate goroutine, allowing for parallel update processing and dispatching. + // Once the Updater has received an update, it sends it to the Dispatcher over a JSON channel. + Dispatcher UpdateDispatcher // UnhandledErrFunc provides more flexibility for dealing with previously unhandled errors, such as failures to get // updates (when long-polling), or failures to unmarshal. @@ -54,23 +56,14 @@ type UpdaterOpts struct { // ErrorLog specifies an optional logger for unexpected behavior from handlers. // If nil, logging is done via the log package's standard logger. ErrorLog *log.Logger - // The dispatcher instance to be used by the updater. - Dispatcher *Dispatcher } -// NewUpdater Creates a new Updater, as well as the necessary structures required for the associated Dispatcher. -func NewUpdater(opts *UpdaterOpts) *Updater { +// NewUpdater Creates a new Updater, as well as a Dispatcher and any optional updater configurations (via UpdaterOpts). +func NewUpdater(dispatcher UpdateDispatcher, opts *UpdaterOpts) *Updater { var unhandledErrFunc ErrorFunc var errLog *log.Logger - // Default dispatcher, no special settings. - dispatcher := NewDispatcher(nil) - if opts != nil { - if opts.Dispatcher != nil { - dispatcher = opts.Dispatcher - } - unhandledErrFunc = opts.UnhandledErrFunc errLog = opts.ErrorLog } diff --git a/ext/updater_test.go b/ext/updater_test.go index 7a4a81d2..72f68ceb 100644 --- a/ext/updater_test.go +++ b/ext/updater_test.go @@ -23,7 +23,7 @@ func TestUpdaterThrowsErrorWhenSameWebhookAddedTwice(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.AddWebhook(b, "test", ext.WebhookOpts{}) if err != nil { @@ -47,7 +47,7 @@ func TestUpdaterSupportsWebhookReAdding(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.AddWebhook(b, "test", ext.WebhookOpts{}) if err != nil { @@ -76,7 +76,7 @@ func TestUpdaterDisallowsEmptyWebhooks(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.AddWebhook(b, "", ext.WebhookOpts{}) if !errors.Is(err, ext.ErrEmptyPath) { @@ -104,7 +104,7 @@ func TestUpdaterAllowsWebhookDeletion(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.StartPolling(b, &ext.PollingOpts{ EnableWebhookDeletion: true, @@ -143,7 +143,7 @@ func TestUpdaterSupportsTwoPollingBots(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.StartPolling(b1, &ext.PollingOpts{ GetUpdatesOpts: &gotgbot.GetUpdatesOpts{ @@ -185,7 +185,7 @@ func TestUpdaterThrowsErrorWhenSameLongPollAddedTwice(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.StartPolling(b, &ext.PollingOpts{ GetUpdatesOpts: &gotgbot.GetUpdatesOpts{ @@ -228,7 +228,7 @@ func TestUpdaterSupportsLongPollReAdding(t *testing.T) { } d := ext.NewDispatcher(&ext.DispatcherOpts{}) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) err := u.StartPolling(b, &ext.PollingOpts{ GetUpdatesOpts: &gotgbot.GetUpdatesOpts{RequestOpts: reqOpts}, @@ -303,7 +303,7 @@ func BenchmarkUpdaterMultibots(b *testing.B) { func benchmarkUpdaterWithNBots(b *testing.B, numBot int) { d := ext.NewDispatcher(nil) - u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d}) + u := ext.NewUpdater(d, nil) wg := sync.WaitGroup{} d.AddHandler(ext.DummyHandler{F: func(b *gotgbot.Bot, ctx *ext.Context) error { diff --git a/samples/callbackqueryBot/main.go b/samples/callbackqueryBot/main.go index 94c7b9af..35ffb409 100644 --- a/samples/callbackqueryBot/main.go +++ b/samples/callbackqueryBot/main.go @@ -28,17 +28,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // /start command to introduce the bot dispatcher.AddHandler(handlers.NewCommand("start", start)) diff --git a/samples/commandBot/main.go b/samples/commandBot/main.go index dd231a44..43ed0d30 100644 --- a/samples/commandBot/main.go +++ b/samples/commandBot/main.go @@ -28,17 +28,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // /start command to introduce the bot dispatcher.AddHandler(handlers.NewCommand("start", start)) diff --git a/samples/conversationBot/main.go b/samples/conversationBot/main.go index 8c27c927..6622f0bf 100644 --- a/samples/conversationBot/main.go +++ b/samples/conversationBot/main.go @@ -32,17 +32,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) dispatcher.AddHandler(handlers.NewConversation( []ext.Handler{handlers.NewCommand("start", start)}, diff --git a/samples/echoBot/main.go b/samples/echoBot/main.go index 0ed876e2..bdffc881 100644 --- a/samples/echoBot/main.go +++ b/samples/echoBot/main.go @@ -36,17 +36,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // Add echo handler to reply to all text messages. dispatcher.AddHandler(handlers.NewMessage(message.Text, echo)) diff --git a/samples/echoMultiBot/main.go b/samples/echoMultiBot/main.go index 9e099833..5d11ce06 100644 --- a/samples/echoMultiBot/main.go +++ b/samples/echoMultiBot/main.go @@ -32,18 +32,15 @@ func main() { webhookSecret := os.Getenv("WEBHOOK_SECRET") // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - ErrorLog: nil, - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // Add stop handler to stop all bots gracefully. dispatcher.AddHandler(handlers.NewCommand("stop", func(b *gotgbot.Bot, ctx *ext.Context) error { diff --git a/samples/echoWebhookBot/main.go b/samples/echoWebhookBot/main.go index 0dd37204..f00d0bdc 100644 --- a/samples/echoWebhookBot/main.go +++ b/samples/echoWebhookBot/main.go @@ -45,17 +45,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // Add echo handler to reply to all text messages. dispatcher.AddHandler(handlers.NewMessage(message.Text, echo)) diff --git a/samples/inlinequeryBot/main.go b/samples/inlinequeryBot/main.go index 59048720..2fea5346 100644 --- a/samples/inlinequeryBot/main.go +++ b/samples/inlinequeryBot/main.go @@ -33,17 +33,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // Create an inline query handler to reply to all inline queries dispatcher.AddHandler(handlers.NewInlineQuery(inlinequery.All, source)) diff --git a/samples/metricsBot/main.go b/samples/metricsBot/main.go index 4ee057b8..6f3eb47e 100644 --- a/samples/metricsBot/main.go +++ b/samples/metricsBot/main.go @@ -48,7 +48,7 @@ func main() { go monitorDispatcherBuffer(dispatcher) // Create the updater with our customised dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: dispatcher}) + updater := ext.NewUpdater(dispatcher, nil) // Add echo handler to reply to all text messages. dispatcher.AddHandler(handlers.NewMessage(message.Text, echo)) diff --git a/samples/middlewareBot/main.go b/samples/middlewareBot/main.go index 0fc1edf5..d3b05d91 100644 --- a/samples/middlewareBot/main.go +++ b/samples/middlewareBot/main.go @@ -30,17 +30,15 @@ func main() { } // Create updater and dispatcher. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // Add echo handler to reply to all text messages. dispatcher.AddHandler(handlers.NewMessage(message.Text, echo)) diff --git a/samples/webappBot/main.go b/samples/webappBot/main.go index e9b196f0..26ad7cd1 100644 --- a/samples/webappBot/main.go +++ b/samples/webappBot/main.go @@ -40,17 +40,15 @@ func main() { } // Create updater and dispatcher to handle updates in a simple manner. - updater := ext.NewUpdater(&ext.UpdaterOpts{ - Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{ - // If an error is returned by a handler, log it and continue going. - Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { - log.Println("an error occurred while handling update:", err.Error()) - return ext.DispatcherActionNoop - }, - MaxRoutines: ext.DefaultMaxRoutines, - }), + dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{ + // If an error is returned by a handler, log it and continue going. + Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction { + log.Println("an error occurred while handling update:", err.Error()) + return ext.DispatcherActionNoop + }, + MaxRoutines: ext.DefaultMaxRoutines, }) - dispatcher := updater.Dispatcher + updater := ext.NewUpdater(dispatcher, nil) // /start command to introduce the bot and send the URL dispatcher.AddHandler(handlers.NewCommand("start", func(b *gotgbot.Bot, ctx *ext.Context) error {