-
Notifications
You must be signed in to change notification settings - Fork 0
/
backend.go
133 lines (95 loc) · 5.67 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package courier
import (
"context"
"fmt"
"strings"
"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/urns"
)
// BackendConstructorFunc defines a function to create a particular backend type
type BackendConstructorFunc func(*Config) Backend
// Backend represents the part of Courier that deals with looking up and writing channels and results
type Backend interface {
// Start starts the backend and opens any db connections it needs
Start() error
// Stop stops any backend processes
Stop() error
// Cleanup closes any active connections to databases
Cleanup() error
// GetChannel returns the channel with the passed in type and UUID
GetChannel(context.Context, ChannelType, ChannelUUID) (Channel, error)
// GetChannelByAddress returns the channel with the passed in type and address
GetChannelByAddress(context.Context, ChannelType, ChannelAddress) (Channel, error)
// GetContact returns (or creates) the contact for the passed in channel and URN
GetContact(context context.Context, channel Channel, urn urns.URN, auth string, name string) (Contact, error)
// AddURNtoContact adds a URN to the passed in contact
AddURNtoContact(context context.Context, channel Channel, contact Contact, urn urns.URN) (urns.URN, error)
// RemoveURNFromcontact removes a URN from the passed in contact
RemoveURNfromContact(context context.Context, channel Channel, contact Contact, urn urns.URN) (urns.URN, error)
// DeleteMsgWithExternalID delete a message we receive an event that it should be deleted
DeleteMsgWithExternalID(ctx context.Context, channel Channel, externalID string) error
// NewIncomingMsg creates a new message from the given params
NewIncomingMsg(channel Channel, urn urns.URN, text string) Msg
// WriteMsg writes the passed in message to our backend
WriteMsg(context.Context, Msg) error
// NewMsgStatusForID creates a new Status object for the given message id
NewMsgStatusForID(Channel, MsgID, MsgStatusValue) MsgStatus
// NewMsgStatusForExternalID creates a new Status object for the given external id
NewMsgStatusForExternalID(Channel, string, MsgStatusValue) MsgStatus
// WriteMsgStatus writes the passed in status update to our backend
WriteMsgStatus(context.Context, MsgStatus) error
// NewChannelEvent creates a new channel event for the given channel and event type
NewChannelEvent(Channel, ChannelEventType, urns.URN) ChannelEvent
// WriteChannelEvent writes the passed in channel even returning any error
WriteChannelEvent(context.Context, ChannelEvent) error
// WriteChannelLogs writes the passed in channel logs to our backend
WriteChannelLogs(context.Context, []*ChannelLog) error
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg(context.Context) (Msg, error)
// WasMsgSent returns whether the backend thinks the passed in message was already sent. This can be used in cases where
// a backend wants to implement a failsafe against double sending messages (say if they were double queued)
WasMsgSent(context.Context, MsgID) (bool, error)
// ClearMsgSent clears any internal status that a message was previously sent. This can be used in the case where
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error
// Gets a list of all active purges
GetActivePurges(context.Context) ([]string, error)
// Loops through the active and throttled queues to find the current queue keys for a specific channel
GetCurrentQueuesForChannel(context.Context, ChannelUUID) ([]string, error)
// Prepares a list of queues for a purge by renaming them and adding them to the active purge list
PrepareQueuesForPurge(context.Context, []string) ([]string, error)
// Pops n messages from a queue without checking if the message is able to be popped, if there is throttling, etc.
PopMsgs(context.Context, string, int) ([]Msg, error)
// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, Msg, MsgStatus)
// SetFlowSessionTimeoutByMsgId If the flow session for a given msg ID is waiting, set its timeout
SetFlowSessionTimeoutByMsgId(context.Context, MsgID) error
// Check if external ID has been seen in a period
CheckExternalIDSeen(Msg) Msg
// Mark a external ID as seen for a period
WriteExternalIDSeen(Msg)
// Health returns a string describing any health problems the backend has, or empty string if all is well
Health() string
// Status returns a string describing the current status, this can detail queue sizes or other attributes
Status() string
// Heartbeat is called every minute, it can be used by backends to log status to a dashboard such as librato
Heartbeat() error
// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool
}
// NewBackend creates the type of backend passed in
func NewBackend(config *Config) (Backend, error) {
backendFunc, found := registeredBackends[strings.ToLower(config.Backend)]
if !found {
return nil, fmt.Errorf("no such backend type: '%s'", config.Backend)
}
return backendFunc(config), nil
}
// RegisterBackend adds a new backend, called by individual backends in their init() func
func RegisterBackend(backendType string, constructorFunc BackendConstructorFunc) {
registeredBackends[strings.ToLower(backendType)] = constructorFunc
}
var registeredBackends = make(map[string]BackendConstructorFunc)