Skip to content

Commit

Permalink
Create an UpdateDispatcher interface to decouple Dispatcher and Updat…
Browse files Browse the repository at this point in the history
…er components (#120)

* Make a new interface to abstract away the dispatcher, allowing for custom implementations

* Move dispatcher to be an interface

* fix lint

* fix tests and add comments
  • Loading branch information
PaulSonOfLars authored Nov 11, 2023
1 parent c4b9ec3 commit 35d1a9b
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 116 deletions.
10 changes: 9 additions & 1 deletion ext/common_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ext

import (
"errors"

"github.com/PaulSonOfLars/gotgbot/v2"
)

Expand All @@ -21,11 +23,17 @@ func (d DummyHandler) Name() string {
return "dummy" + d.N
}

var ErrBadDispatcher = errors.New("can only inject updates if the dispatcher is of type *Dispatcher")

func (u *Updater) InjectUpdate(token string, upd gotgbot.Update) error {
bData, ok := u.botMapping.getBot(token)
if !ok {
return ErrNotFound
}

return u.Dispatcher.ProcessUpdate(bData.bot, &upd, nil)
d, ok := u.Dispatcher.(*Dispatcher)
if !ok {
return ErrBadDispatcher
}
return d.ProcessUpdate(bData.bot, &upd, nil)
}
19 changes: 16 additions & 3 deletions ext/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ var (
ContinueGroups = errors.New("group iteration continued")
)

// The UpdateDispatcher interface is used to abstract away common Dispatcher implementations.
// It assumes that all incoming updates come through a JSON channel.
type UpdateDispatcher interface {
Start(b *gotgbot.Bot, updates <-chan json.RawMessage)
Stop()
}

// The Dispatcher struct is the default UpdateDispatcher implementation.
// It supports grouping of update handlers, allowing for powerful update handling flows.
// Customise the handling of updates by wrapping the Processor struct.
type Dispatcher struct {
// Processor defines how to process the raw updates being processed by the Dispatcher.
// Processor defines how to process the raw updates being handled by the Dispatcher.
// This can be extended to include additional error handling, metrics, etc.
Processor Processor

Expand Down Expand Up @@ -80,6 +90,9 @@ type Dispatcher struct {
waitGroup sync.WaitGroup
}

// Ensure compile-time type safety.
var _ UpdateDispatcher = &Dispatcher{}

// DispatcherOpts can be used to configure or override default Dispatcher behaviours.
type DispatcherOpts struct {
// Processor allows for providing custom Processor interfaces with different behaviours.
Expand Down Expand Up @@ -109,7 +122,7 @@ type DispatcherOpts struct {
MaxRoutines int
}

// NewDispatcher creates a new dispatcher, which process and handles incoming updates from the updates channel.
// NewDispatcher creates a new Dispatcher, which process and handles incoming updates from the updates channel.
func NewDispatcher(opts *DispatcherOpts) *Dispatcher {
var errHandler DispatcherErrorHandler
var panicHandler DispatcherPanicHandler
Expand Down Expand Up @@ -175,7 +188,7 @@ func (d *Dispatcher) MaxUsage() int {

// Start to handle incoming updates.
// This is a blocking method; it should be called as a goroutine, such that it can receive incoming updates.
func (d *Dispatcher) Start(b *gotgbot.Bot, updates chan json.RawMessage) {
func (d *Dispatcher) Start(b *gotgbot.Bot, updates <-chan json.RawMessage) {
// Listen to updates as they come in from the updater.
for upd := range updates {
d.waitGroup.Add(1)
Expand Down
17 changes: 5 additions & 12 deletions ext/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type ErrorFunc func(error)

type Updater struct {
// Dispatcher is where all the incoming updates are sent to be processed.
Dispatcher *Dispatcher
// The Dispatcher runs in a separate goroutine, allowing for parallel update processing and dispatching.
// Once the Updater has received an update, it sends it to the Dispatcher over a JSON channel.
Dispatcher UpdateDispatcher

// UnhandledErrFunc provides more flexibility for dealing with previously unhandled errors, such as failures to get
// updates (when long-polling), or failures to unmarshal.
Expand Down Expand Up @@ -54,23 +56,14 @@ type UpdaterOpts struct {
// ErrorLog specifies an optional logger for unexpected behavior from handlers.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger
// The dispatcher instance to be used by the updater.
Dispatcher *Dispatcher
}

// NewUpdater Creates a new Updater, as well as the necessary structures required for the associated Dispatcher.
func NewUpdater(opts *UpdaterOpts) *Updater {
// NewUpdater Creates a new Updater, as well as a Dispatcher and any optional updater configurations (via UpdaterOpts).
func NewUpdater(dispatcher UpdateDispatcher, opts *UpdaterOpts) *Updater {
var unhandledErrFunc ErrorFunc
var errLog *log.Logger

// Default dispatcher, no special settings.
dispatcher := NewDispatcher(nil)

if opts != nil {
if opts.Dispatcher != nil {
dispatcher = opts.Dispatcher
}

unhandledErrFunc = opts.UnhandledErrFunc
errLog = opts.ErrorLog
}
Expand Down
16 changes: 8 additions & 8 deletions ext/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestUpdaterThrowsErrorWhenSameWebhookAddedTwice(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.AddWebhook(b, "test", ext.WebhookOpts{})
if err != nil {
Expand All @@ -47,7 +47,7 @@ func TestUpdaterSupportsWebhookReAdding(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.AddWebhook(b, "test", ext.WebhookOpts{})
if err != nil {
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestUpdaterDisallowsEmptyWebhooks(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.AddWebhook(b, "", ext.WebhookOpts{})
if !errors.Is(err, ext.ErrEmptyPath) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestUpdaterAllowsWebhookDeletion(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b, &ext.PollingOpts{
EnableWebhookDeletion: true,
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestUpdaterSupportsTwoPollingBots(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b1, &ext.PollingOpts{
GetUpdatesOpts: &gotgbot.GetUpdatesOpts{
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestUpdaterThrowsErrorWhenSameLongPollAddedTwice(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b, &ext.PollingOpts{
GetUpdatesOpts: &gotgbot.GetUpdatesOpts{
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestUpdaterSupportsLongPollReAdding(t *testing.T) {
}

d := ext.NewDispatcher(&ext.DispatcherOpts{})
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

err := u.StartPolling(b, &ext.PollingOpts{
GetUpdatesOpts: &gotgbot.GetUpdatesOpts{RequestOpts: reqOpts},
Expand Down Expand Up @@ -303,7 +303,7 @@ func BenchmarkUpdaterMultibots(b *testing.B) {

func benchmarkUpdaterWithNBots(b *testing.B, numBot int) {
d := ext.NewDispatcher(nil)
u := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: d})
u := ext.NewUpdater(d, nil)

wg := sync.WaitGroup{}
d.AddHandler(ext.DummyHandler{F: func(b *gotgbot.Bot, ctx *ext.Context) error {
Expand Down
18 changes: 8 additions & 10 deletions samples/callbackqueryBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// /start command to introduce the bot
dispatcher.AddHandler(handlers.NewCommand("start", start))
Expand Down
18 changes: 8 additions & 10 deletions samples/commandBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// /start command to introduce the bot
dispatcher.AddHandler(handlers.NewCommand("start", start))
Expand Down
18 changes: 8 additions & 10 deletions samples/conversationBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

dispatcher.AddHandler(handlers.NewConversation(
[]ext.Handler{handlers.NewCommand("start", start)},
Expand Down
18 changes: 8 additions & 10 deletions samples/echoBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Add echo handler to reply to all text messages.
dispatcher.AddHandler(handlers.NewMessage(message.Text, echo))
Expand Down
19 changes: 8 additions & 11 deletions samples/echoMultiBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,15 @@ func main() {
webhookSecret := os.Getenv("WEBHOOK_SECRET")

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
ErrorLog: nil,
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Add stop handler to stop all bots gracefully.
dispatcher.AddHandler(handlers.NewCommand("stop", func(b *gotgbot.Bot, ctx *ext.Context) error {
Expand Down
18 changes: 8 additions & 10 deletions samples/echoWebhookBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Add echo handler to reply to all text messages.
dispatcher.AddHandler(handlers.NewMessage(message.Text, echo))
Expand Down
18 changes: 8 additions & 10 deletions samples/inlinequeryBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ func main() {
}

// Create updater and dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{
Dispatcher: ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
}),
dispatcher := ext.NewDispatcher(&ext.DispatcherOpts{
// If an error is returned by a handler, log it and continue going.
Error: func(b *gotgbot.Bot, ctx *ext.Context, err error) ext.DispatcherAction {
log.Println("an error occurred while handling update:", err.Error())
return ext.DispatcherActionNoop
},
MaxRoutines: ext.DefaultMaxRoutines,
})
dispatcher := updater.Dispatcher
updater := ext.NewUpdater(dispatcher, nil)

// Create an inline query handler to reply to all inline queries
dispatcher.AddHandler(handlers.NewInlineQuery(inlinequery.All, source))
Expand Down
2 changes: 1 addition & 1 deletion samples/metricsBot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
go monitorDispatcherBuffer(dispatcher)

// Create the updater with our customised dispatcher.
updater := ext.NewUpdater(&ext.UpdaterOpts{Dispatcher: dispatcher})
updater := ext.NewUpdater(dispatcher, nil)

// Add echo handler to reply to all text messages.
dispatcher.AddHandler(handlers.NewMessage(message.Text, echo))
Expand Down
Loading

0 comments on commit 35d1a9b

Please sign in to comment.