From 6a84965dfcccb60bfa25486c865c6a9f7b87f286 Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Thu, 7 Nov 2024 11:12:52 -0800 Subject: [PATCH] [v17] Support email plugin status sink (#48601) * Support email plugin status sink * Use constant * Address minor feedback * Fix typo --- integrations/access/email/app.go | 6 + integrations/access/email/client.go | 9 +- integrations/access/email/config.go | 4 + integrations/access/email/mailers.go | 122 +++++++++++++++++- .../access/email/testlib/mock_mailgun.go | 7 + 5 files changed, 140 insertions(+), 8 deletions(-) diff --git a/integrations/access/email/app.go b/integrations/access/email/app.go index 0f37f174c197a..f5b335dfe44d8 100644 --- a/integrations/access/email/app.go +++ b/integrations/access/email/app.go @@ -123,6 +123,7 @@ func (a *App) run(ctx context.Context) error { // init inits plugin func (a *App) init(ctx context.Context) error { + log := logger.Get(ctx) ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() @@ -146,6 +147,11 @@ func (a *App) init(ctx context.Context) error { return trace.Wrap(err) } + log.Debug("Starting client connection health check...") + if err = a.client.CheckHealth(ctx); err != nil { + return trace.Wrap(err, "client connection health check failed") + } + log.Debug("Client connection health check finished ok") return nil } diff --git a/integrations/access/email/client.go b/integrations/access/email/client.go index ce1b8cad48400..b65516962d8c4 100644 --- a/integrations/access/email/client.go +++ b/integrations/access/email/client.go @@ -59,12 +59,12 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin } if conf.Mailgun != nil { - mailer = NewMailgunMailer(*conf.Mailgun, conf.Delivery.Sender, clusterName) + mailer = NewMailgunMailer(*conf.Mailgun, conf.StatusSink, conf.Delivery.Sender, clusterName, conf.RoleToRecipients[types.Wildcard]) logger.Get(ctx).WithField("domain", conf.Mailgun.Domain).Info("Using Mailgun as email transport") } if conf.SMTP != nil { - mailer = NewSMTPMailer(*conf.SMTP, conf.Delivery.Sender, clusterName) + mailer = NewSMTPMailer(*conf.SMTP, conf.StatusSink, conf.Delivery.Sender, clusterName) logger.Get(ctx).WithFields(logger.Fields{ "host": conf.SMTP.Host, "port": conf.SMTP.Port, @@ -79,6 +79,11 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin }, nil } +// CheckHealth checks if the Email client connection is healthy. +func (c *Client) CheckHealth(ctx context.Context) error { + return trace.Wrap(c.mailer.CheckHealth(ctx)) +} + // SendNewThreads sends emails on new requests. Returns EmailData. func (c *Client) SendNewThreads(ctx context.Context, recipients []string, reqID string, reqData RequestData) ([]EmailThread, error) { var threads []EmailThread diff --git a/integrations/access/email/config.go b/integrations/access/email/config.go index 772d616a9031c..9d3967a48f75c 100644 --- a/integrations/access/email/config.go +++ b/integrations/access/email/config.go @@ -64,6 +64,10 @@ type Config struct { Delivery DeliveryConfig `toml:"delivery"` RoleToRecipients common.RawRecipientsMap `toml:"role_to_recipients"` Log logger.Config `toml:"log"` + + // StatusSink receives any status updates from the plugin for + // further processing. Status updates will be ignored if not set. + StatusSink common.StatusSink } // LoadConfig reads the config file, initializes a new Config struct object, and returns it. diff --git a/integrations/access/email/mailers.go b/integrations/access/email/mailers.go index 5f690bb70256a..17864322fbc4b 100644 --- a/integrations/access/email/mailers.go +++ b/integrations/access/email/mailers.go @@ -21,6 +21,7 @@ import ( "crypto/rand" "encoding/binary" "fmt" + "net/http" "os" "strconv" "strings" @@ -29,11 +30,25 @@ import ( "github.com/gravitational/trace" "github.com/mailgun/mailgun-go/v4" "gopkg.in/mail.v2" + + "github.com/gravitational/teleport/integrations/access/common" + "github.com/gravitational/teleport/integrations/lib/logger" +) + +const ( + // statusEmitTimeout specifies the max timeout to emit status. + statusEmitTimeout = 10 * time.Second + // mailgunHTTPTimeout specifies the max timeout for mailgun api send request. + mailgunHTTPTimeout = 10 * time.Second + // smtpDialerTimeout specifies the max timeout for smtp dialer read/write operations. + smtpDialerTimeout = 10 * time.Second ) // Mailer is an interface to mail sender type Mailer interface { Send(ctx context.Context, id, recipient, body, references string) (string, error) + // CheckHealth checks if the Email client connection is healthy. + CheckHealth(ctx context.Context) error } // SMTPMailer implements SMTP mailer @@ -41,6 +56,7 @@ type SMTPMailer struct { dialer *mail.Dialer sender string clusterName string + sink common.StatusSink } // MailgunMailer implements mailgun mailer @@ -48,23 +64,59 @@ type MailgunMailer struct { mailgun *mailgun.MailgunImpl sender string clusterName string + + // fallbackRecipients specifies the list of default recipients. + // This is only used for initial health check. + fallbackRecipients []string } // NewSMTPMailer inits new SMTP mailer -func NewSMTPMailer(c SMTPConfig, sender, clusterName string) Mailer { +func NewSMTPMailer(c SMTPConfig, sink common.StatusSink, sender, clusterName string) Mailer { dialer := mail.NewDialer(c.Host, c.Port, c.Username, c.Password) dialer.StartTLSPolicy = c.MailStartTLSPolicy + dialer.Timeout = smtpDialerTimeout - return &SMTPMailer{dialer, sender, clusterName} + return &SMTPMailer{ + dialer: dialer, + sender: sender, + clusterName: clusterName, + sink: sink, + } } // NewMailgunMailer inits new Mailgun mailer -func NewMailgunMailer(c MailgunConfig, sender, clusterName string) Mailer { +func NewMailgunMailer(c MailgunConfig, sink common.StatusSink, sender, clusterName string, fallbackRecipients []string) Mailer { m := mailgun.NewMailgun(c.Domain, c.PrivateKey) if c.APIBase != "" { m.SetAPIBase(c.APIBase) } - return &MailgunMailer{m, sender, clusterName} + client := &http.Client{ + Transport: &statusSinkTransport{ + RoundTripper: http.DefaultTransport, + sink: sink, + }, + } + m.SetClient(client) + return &MailgunMailer{ + mailgun: m, + sender: sender, + clusterName: clusterName, + fallbackRecipients: fallbackRecipients, + } +} + +// CheckHealth checks the health of the SMTP service. +func (m *SMTPMailer) CheckHealth(ctx context.Context) error { + log := logger.Get(ctx) + client, err := m.dialer.Dial() + m.emitStatus(ctx, err) + if err != nil { + return trace.Wrap(err) + } + if err := client.Close(); err != nil { + log.Debug("Failed to close client connection after health check") + } + return nil } // Send sends email via SMTP @@ -91,10 +143,10 @@ func (m *SMTPMailer) Send(ctx context.Context, id, recipient, body, references s } err = m.dialer.DialAndSend(msg) + m.emitStatus(ctx, err) if err != nil { return "", trace.Wrap(err) } - return id, nil } @@ -123,6 +175,38 @@ func (m *SMTPMailer) base36(input uint64) string { return strings.ToUpper(strconv.FormatUint(input, 36)) } +// emitStatus emits status based on provided statusErr. +func (m *SMTPMailer) emitStatus(ctx context.Context, statusErr error) { + if m.sink == nil { + return + } + + ctx, cancel := context.WithTimeout(ctx, statusEmitTimeout) + defer cancel() + + log := logger.Get(ctx) + code := http.StatusOK + if statusErr != nil { + // Returned error is undocumented. Using generic error code for all errors. + code = http.StatusInternalServerError + } + if err := m.sink.Emit(ctx, common.StatusFromStatusCode(code)); err != nil { + log.WithError(err).Error("Error while emitting Email plugin status") + } +} + +// CheckHealth checks the health of the Mailgun service. +func (m *MailgunMailer) CheckHealth(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout) + defer cancel() + + msg := m.mailgun.NewMessage(m.sender, "Health Check", "Testing Mailgun API connection...", m.fallbackRecipients...) + msg.SetRequireTLS(true) + msg.EnableTestMode() // Test message submission without delivering to recipients. + _, _, err := m.mailgun.Send(ctx, msg) + return trace.Wrap(err) +} + // Send sends email via Mailgun func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, references string) (string, error) { subject := fmt.Sprintf("%v Role Request %v", m.clusterName, id) @@ -136,7 +220,7 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference msg.AddHeader("In-Reply-To", refHeader) } - ctx, cancel := context.WithTimeout(ctx, time.Second*10) + ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout) defer cancel() _, id, err := m.mailgun.Send(ctx, msg) @@ -147,3 +231,29 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference return id, nil } + +// statusSinkTransport wraps the Mailgun client transport and +// emits plugin status. +type statusSinkTransport struct { + http.RoundTripper + sink common.StatusSink +} + +// RoundTrip implements the http.RoundTripper interface. +func (t *statusSinkTransport) RoundTrip(req *http.Request) (*http.Response, error) { + log := logger.Get(req.Context()) + resp, err := t.RoundTripper.RoundTrip(req) + if err != nil { + return nil, trace.Wrap(err) + } + if t.sink != nil { + ctx, cancel := context.WithTimeout(req.Context(), statusEmitTimeout) + defer cancel() + + status := common.StatusFromStatusCode(resp.StatusCode) + if err := t.sink.Emit(ctx, status); err != nil { + log.WithError(err).Error("Error while emitting Email plugin status") + } + } + return resp, nil +} diff --git a/integrations/access/email/testlib/mock_mailgun.go b/integrations/access/email/testlib/mock_mailgun.go index 8b0a856d9bc73..58cbbc8ebb098 100644 --- a/integrations/access/email/testlib/mock_mailgun.go +++ b/integrations/access/email/testlib/mock_mailgun.go @@ -63,6 +63,13 @@ func newMockMailgunServer(concurrency int) *mockMailgunServer { id := uuid.New().String() + // The testmode flag is only used during health check. + // Do no create message when in testmode. + if r.PostFormValue("o:testmode") == "yes" { + fmt.Fprintf(w, `{"id": "%v"}`, id) + return + } + message := mockMailgunMessage{ ID: id, Sender: r.PostFormValue("from"),