From 377a257f3d924f8d98e7519d6b8036d2eba9cdaa Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Mon, 28 Oct 2024 13:08:52 -0700 Subject: [PATCH 1/4] Support email plugin status sink --- integrations/access/email/app.go | 6 + integrations/access/email/client.go | 9 +- integrations/access/email/config.go | 4 + integrations/access/email/mailers.go | 111 +++++++++++++++++- .../access/email/testlib/mock_mailgun.go | 7 ++ 5 files changed, 130 insertions(+), 7 deletions(-) diff --git a/integrations/access/email/app.go b/integrations/access/email/app.go index 67855505a032e..4e29aca01e5df 100644 --- a/integrations/access/email/app.go +++ b/integrations/access/email/app.go @@ -123,6 +123,7 @@ func (a *App) run(ctx context.Context) error { // init inits plugin func (a *App) init(ctx context.Context) error { + log := logger.Get(ctx) ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() @@ -146,6 +147,11 @@ func (a *App) init(ctx context.Context) error { return trace.Wrap(err) } + log.Debug("Starting client connection health check...") + if err = a.client.CheckHealth(ctx); err != nil { + return trace.Wrap(err, "client connection health check failed") + } + log.Debug("Client connection health check finished ok") return nil } diff --git a/integrations/access/email/client.go b/integrations/access/email/client.go index ce1b8cad48400..b65516962d8c4 100644 --- a/integrations/access/email/client.go +++ b/integrations/access/email/client.go @@ -59,12 +59,12 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin } if conf.Mailgun != nil { - mailer = NewMailgunMailer(*conf.Mailgun, conf.Delivery.Sender, clusterName) + mailer = NewMailgunMailer(*conf.Mailgun, conf.StatusSink, conf.Delivery.Sender, clusterName, conf.RoleToRecipients[types.Wildcard]) logger.Get(ctx).WithField("domain", conf.Mailgun.Domain).Info("Using Mailgun as email transport") } if conf.SMTP != nil { - mailer = NewSMTPMailer(*conf.SMTP, conf.Delivery.Sender, clusterName) + mailer = NewSMTPMailer(*conf.SMTP, conf.StatusSink, conf.Delivery.Sender, clusterName) logger.Get(ctx).WithFields(logger.Fields{ "host": conf.SMTP.Host, "port": conf.SMTP.Port, @@ -79,6 +79,11 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin }, nil } +// CheckHealth checks if the Email client connection is healthy. +func (c *Client) CheckHealth(ctx context.Context) error { + return trace.Wrap(c.mailer.CheckHealth(ctx)) +} + // SendNewThreads sends emails on new requests. Returns EmailData. func (c *Client) SendNewThreads(ctx context.Context, recipients []string, reqID string, reqData RequestData) ([]EmailThread, error) { var threads []EmailThread diff --git a/integrations/access/email/config.go b/integrations/access/email/config.go index 772d616a9031c..9d3967a48f75c 100644 --- a/integrations/access/email/config.go +++ b/integrations/access/email/config.go @@ -64,6 +64,10 @@ type Config struct { Delivery DeliveryConfig `toml:"delivery"` RoleToRecipients common.RawRecipientsMap `toml:"role_to_recipients"` Log logger.Config `toml:"log"` + + // StatusSink receives any status updates from the plugin for + // further processing. Status updates will be ignored if not set. + StatusSink common.StatusSink } // LoadConfig reads the config file, initializes a new Config struct object, and returns it. diff --git a/integrations/access/email/mailers.go b/integrations/access/email/mailers.go index 5f690bb70256a..95f7d1d732137 100644 --- a/integrations/access/email/mailers.go +++ b/integrations/access/email/mailers.go @@ -21,19 +21,31 @@ import ( "crypto/rand" "encoding/binary" "fmt" + "net/http" "os" "strconv" "strings" "time" + "github.com/gravitational/teleport/integrations/access/common" + "github.com/gravitational/teleport/integrations/lib/logger" "github.com/gravitational/trace" "github.com/mailgun/mailgun-go/v4" "gopkg.in/mail.v2" ) +const ( + // statusEmitTimeout specifies the max timeout to emit status. + statusEmitTimeout = 10 * time.Second + // mailgunHTTPTimeout specifies the max timeout for mailgun api send request. + mailgunHTTPTimeout = 10 * time.Second +) + // Mailer is an interface to mail sender type Mailer interface { Send(ctx context.Context, id, recipient, body, references string) (string, error) + // CheckHealth checks if the Email client connection is healthy. + CheckHealth(ctx context.Context) error } // SMTPMailer implements SMTP mailer @@ -41,6 +53,7 @@ type SMTPMailer struct { dialer *mail.Dialer sender string clusterName string + sink common.StatusSink } // MailgunMailer implements mailgun mailer @@ -48,23 +61,53 @@ type MailgunMailer struct { mailgun *mailgun.MailgunImpl sender string clusterName string + + // fallbackRecipients specifies the list of default recipients. + // This is only used for inital health check. + fallbackRecipients []string } // NewSMTPMailer inits new SMTP mailer -func NewSMTPMailer(c SMTPConfig, sender, clusterName string) Mailer { +func NewSMTPMailer(c SMTPConfig, sink common.StatusSink, sender, clusterName string) Mailer { dialer := mail.NewDialer(c.Host, c.Port, c.Username, c.Password) dialer.StartTLSPolicy = c.MailStartTLSPolicy - return &SMTPMailer{dialer, sender, clusterName} + return &SMTPMailer{dialer, sender, clusterName, sink} } // NewMailgunMailer inits new Mailgun mailer -func NewMailgunMailer(c MailgunConfig, sender, clusterName string) Mailer { +func NewMailgunMailer(c MailgunConfig, sink common.StatusSink, sender, clusterName string, fallbackRecipients []string) Mailer { m := mailgun.NewMailgun(c.Domain, c.PrivateKey) if c.APIBase != "" { m.SetAPIBase(c.APIBase) } - return &MailgunMailer{m, sender, clusterName} + client := &http.Client{ + Transport: &statusSinkTransport{ + RoundTripper: http.DefaultTransport, + sink: sink, + }, + } + m.SetClient(client) + return &MailgunMailer{ + mailgun: m, + sender: sender, + clusterName: clusterName, + fallbackRecipients: fallbackRecipients, + } +} + +// CheckHealth checks the health of the SMTP service. +func (m *SMTPMailer) CheckHealth(ctx context.Context) error { + log := logger.Get(ctx) + client, err := m.dialer.Dial() + m.emitStatus(ctx, err) + if err != nil { + return trace.Wrap(err) + } + if err := client.Close(); err != nil { + log.Debug("Failed to close client connection after health check") + } + return nil } // Send sends email via SMTP @@ -91,10 +134,10 @@ func (m *SMTPMailer) Send(ctx context.Context, id, recipient, body, references s } err = m.dialer.DialAndSend(msg) + m.emitStatus(ctx, err) if err != nil { return "", trace.Wrap(err) } - return id, nil } @@ -123,6 +166,38 @@ func (m *SMTPMailer) base36(input uint64) string { return strings.ToUpper(strconv.FormatUint(input, 36)) } +// emitStatus emits generic internal server error status. +func (m *SMTPMailer) emitStatus(ctx context.Context, err error) { + if m.sink == nil { + return + } + + ctx, cancel := context.WithTimeout(ctx, statusEmitTimeout) + defer cancel() + + log := logger.Get(ctx) + code := http.StatusOK + if err != nil { + // Returned error is undocumented. Using geneirc error code for all errors. + code = http.StatusInternalServerError + } + if err := m.sink.Emit(ctx, common.StatusFromStatusCode(code)); err != nil { + log.WithError(err).Error("Error while emitting Email plugin status") + } +} + +// CheckHealth checks the health of the Mailgun service. +func (m *MailgunMailer) CheckHealth(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout) + defer cancel() + + msg := m.mailgun.NewMessage(m.sender, "Health Check", "Testing Mailgun API connection...", m.fallbackRecipients...) + msg.SetRequireTLS(true) + msg.EnableTestMode() // Test message submission without delivering to recpients. + _, _, err := m.mailgun.Send(ctx, msg) + return trace.Wrap(err) +} + // Send sends email via Mailgun func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, references string) (string, error) { subject := fmt.Sprintf("%v Role Request %v", m.clusterName, id) @@ -147,3 +222,29 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference return id, nil } + +// statusSinkTransport wraps the Mailgun client transport and +// emits plugin status. +type statusSinkTransport struct { + http.RoundTripper + sink common.StatusSink +} + +// RoundTrip implements the http.RoundTripper interface. +func (t *statusSinkTransport) RoundTrip(req *http.Request) (*http.Response, error) { + log := logger.Get(req.Context()) + resp, err := t.RoundTripper.RoundTrip(req) + if err != nil { + return nil, trace.Wrap(err) + } + if t.sink != nil { + ctx, cancel := context.WithTimeout(req.Context(), statusEmitTimeout) + defer cancel() + + status := common.StatusFromStatusCode(resp.StatusCode) + if err := t.sink.Emit(ctx, status); err != nil { + log.WithError(err).Error("Error while emitting Email plugin status") + } + } + return resp, nil +} diff --git a/integrations/access/email/testlib/mock_mailgun.go b/integrations/access/email/testlib/mock_mailgun.go index 8b0a856d9bc73..58cbbc8ebb098 100644 --- a/integrations/access/email/testlib/mock_mailgun.go +++ b/integrations/access/email/testlib/mock_mailgun.go @@ -63,6 +63,13 @@ func newMockMailgunServer(concurrency int) *mockMailgunServer { id := uuid.New().String() + // The testmode flag is only used during health check. + // Do no create message when in testmode. + if r.PostFormValue("o:testmode") == "yes" { + fmt.Fprintf(w, `{"id": "%v"}`, id) + return + } + message := mockMailgunMessage{ ID: id, Sender: r.PostFormValue("from"), From 131243313d2c6695ab2203b256ea3121066ece5d Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Mon, 28 Oct 2024 13:58:40 -0700 Subject: [PATCH 2/4] Use constant --- integrations/access/email/mailers.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/integrations/access/email/mailers.go b/integrations/access/email/mailers.go index 95f7d1d732137..f2606c21f0eee 100644 --- a/integrations/access/email/mailers.go +++ b/integrations/access/email/mailers.go @@ -27,11 +27,12 @@ import ( "strings" "time" - "github.com/gravitational/teleport/integrations/access/common" - "github.com/gravitational/teleport/integrations/lib/logger" "github.com/gravitational/trace" "github.com/mailgun/mailgun-go/v4" "gopkg.in/mail.v2" + + "github.com/gravitational/teleport/integrations/access/common" + "github.com/gravitational/teleport/integrations/lib/logger" ) const ( @@ -211,7 +212,7 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference msg.AddHeader("In-Reply-To", refHeader) } - ctx, cancel := context.WithTimeout(ctx, time.Second*10) + ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout) defer cancel() _, id, err := m.mailgun.Send(ctx, msg) From 9b9d4a1322ddfe8dc030cefed9f15e3f4be1d942 Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Tue, 29 Oct 2024 13:58:49 -0700 Subject: [PATCH 3/4] Address minor feedback --- integrations/access/email/mailers.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/integrations/access/email/mailers.go b/integrations/access/email/mailers.go index f2606c21f0eee..ce679e2a5317b 100644 --- a/integrations/access/email/mailers.go +++ b/integrations/access/email/mailers.go @@ -40,6 +40,8 @@ const ( statusEmitTimeout = 10 * time.Second // mailgunHTTPTimeout specifies the max timeout for mailgun api send request. mailgunHTTPTimeout = 10 * time.Second + // smtpDialerTimeout specifies the max timeout for smtp dialer read/write operations. + smtpDialerTimeout = 10 * time.Second ) // Mailer is an interface to mail sender @@ -72,8 +74,14 @@ type MailgunMailer struct { func NewSMTPMailer(c SMTPConfig, sink common.StatusSink, sender, clusterName string) Mailer { dialer := mail.NewDialer(c.Host, c.Port, c.Username, c.Password) dialer.StartTLSPolicy = c.MailStartTLSPolicy + dialer.Timeout = smtpDialerTimeout - return &SMTPMailer{dialer, sender, clusterName, sink} + return &SMTPMailer{ + dialer: dialer, + sender: sender, + clusterName: clusterName, + sink: sink, + } } // NewMailgunMailer inits new Mailgun mailer @@ -167,8 +175,8 @@ func (m *SMTPMailer) base36(input uint64) string { return strings.ToUpper(strconv.FormatUint(input, 36)) } -// emitStatus emits generic internal server error status. -func (m *SMTPMailer) emitStatus(ctx context.Context, err error) { +// emitStatus emits status based on provided statusErr. +func (m *SMTPMailer) emitStatus(ctx context.Context, statusErr error) { if m.sink == nil { return } @@ -178,8 +186,8 @@ func (m *SMTPMailer) emitStatus(ctx context.Context, err error) { log := logger.Get(ctx) code := http.StatusOK - if err != nil { - // Returned error is undocumented. Using geneirc error code for all errors. + if statusErr != nil { + // Returned error is undocumented. Using generic error code for all errors. code = http.StatusInternalServerError } if err := m.sink.Emit(ctx, common.StatusFromStatusCode(code)); err != nil { @@ -194,7 +202,7 @@ func (m *MailgunMailer) CheckHealth(ctx context.Context) error { msg := m.mailgun.NewMessage(m.sender, "Health Check", "Testing Mailgun API connection...", m.fallbackRecipients...) msg.SetRequireTLS(true) - msg.EnableTestMode() // Test message submission without delivering to recpients. + msg.EnableTestMode() // Test message submission without delivering to recipients. _, _, err := m.mailgun.Send(ctx, msg) return trace.Wrap(err) } From 97d215ae4cdc2b71f58eb2733b0466e60271f1e6 Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Tue, 29 Oct 2024 16:31:42 -0700 Subject: [PATCH 4/4] Fix typo --- integrations/access/email/mailers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/access/email/mailers.go b/integrations/access/email/mailers.go index ce679e2a5317b..17864322fbc4b 100644 --- a/integrations/access/email/mailers.go +++ b/integrations/access/email/mailers.go @@ -66,7 +66,7 @@ type MailgunMailer struct { clusterName string // fallbackRecipients specifies the list of default recipients. - // This is only used for inital health check. + // This is only used for initial health check. fallbackRecipients []string }