Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v17] Support email plugin status sink #48601

Merged
merged 4 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions integrations/access/email/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (a *App) run(ctx context.Context) error {

// init inits plugin
func (a *App) init(ctx context.Context) error {
log := logger.Get(ctx)
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()

Expand All @@ -146,6 +147,11 @@ func (a *App) init(ctx context.Context) error {
return trace.Wrap(err)
}

log.Debug("Starting client connection health check...")
if err = a.client.CheckHealth(ctx); err != nil {
return trace.Wrap(err, "client connection health check failed")
}
log.Debug("Client connection health check finished ok")
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions integrations/access/email/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin
}

if conf.Mailgun != nil {
mailer = NewMailgunMailer(*conf.Mailgun, conf.Delivery.Sender, clusterName)
mailer = NewMailgunMailer(*conf.Mailgun, conf.StatusSink, conf.Delivery.Sender, clusterName, conf.RoleToRecipients[types.Wildcard])
logger.Get(ctx).WithField("domain", conf.Mailgun.Domain).Info("Using Mailgun as email transport")
}

if conf.SMTP != nil {
mailer = NewSMTPMailer(*conf.SMTP, conf.Delivery.Sender, clusterName)
mailer = NewSMTPMailer(*conf.SMTP, conf.StatusSink, conf.Delivery.Sender, clusterName)
logger.Get(ctx).WithFields(logger.Fields{
"host": conf.SMTP.Host,
"port": conf.SMTP.Port,
Expand All @@ -79,6 +79,11 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin
}, nil
}

// CheckHealth checks if the Email client connection is healthy.
func (c *Client) CheckHealth(ctx context.Context) error {
return trace.Wrap(c.mailer.CheckHealth(ctx))
}

// SendNewThreads sends emails on new requests. Returns EmailData.
func (c *Client) SendNewThreads(ctx context.Context, recipients []string, reqID string, reqData RequestData) ([]EmailThread, error) {
var threads []EmailThread
Expand Down
4 changes: 4 additions & 0 deletions integrations/access/email/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Config struct {
Delivery DeliveryConfig `toml:"delivery"`
RoleToRecipients common.RawRecipientsMap `toml:"role_to_recipients"`
Log logger.Config `toml:"log"`

// StatusSink receives any status updates from the plugin for
// further processing. Status updates will be ignored if not set.
StatusSink common.StatusSink
}

// LoadConfig reads the config file, initializes a new Config struct object, and returns it.
Expand Down
122 changes: 116 additions & 6 deletions integrations/access/email/mailers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
"net/http"
"os"
"strconv"
"strings"
Expand All @@ -29,42 +30,93 @@ import (
"github.com/gravitational/trace"
"github.com/mailgun/mailgun-go/v4"
"gopkg.in/mail.v2"

"github.com/gravitational/teleport/integrations/access/common"
"github.com/gravitational/teleport/integrations/lib/logger"
)

const (
// statusEmitTimeout specifies the max timeout to emit status.
statusEmitTimeout = 10 * time.Second
// mailgunHTTPTimeout specifies the max timeout for mailgun api send request.
mailgunHTTPTimeout = 10 * time.Second
// smtpDialerTimeout specifies the max timeout for smtp dialer read/write operations.
smtpDialerTimeout = 10 * time.Second
)

// Mailer is an interface to mail sender
type Mailer interface {
Send(ctx context.Context, id, recipient, body, references string) (string, error)
// CheckHealth checks if the Email client connection is healthy.
CheckHealth(ctx context.Context) error
}

// SMTPMailer implements SMTP mailer
type SMTPMailer struct {
dialer *mail.Dialer
sender string
clusterName string
sink common.StatusSink
}

// MailgunMailer implements mailgun mailer
type MailgunMailer struct {
mailgun *mailgun.MailgunImpl
sender string
clusterName string

// fallbackRecipients specifies the list of default recipients.
// This is only used for initial health check.
fallbackRecipients []string
}

// NewSMTPMailer inits new SMTP mailer
func NewSMTPMailer(c SMTPConfig, sender, clusterName string) Mailer {
func NewSMTPMailer(c SMTPConfig, sink common.StatusSink, sender, clusterName string) Mailer {
dialer := mail.NewDialer(c.Host, c.Port, c.Username, c.Password)
dialer.StartTLSPolicy = c.MailStartTLSPolicy
dialer.Timeout = smtpDialerTimeout

return &SMTPMailer{dialer, sender, clusterName}
return &SMTPMailer{
dialer: dialer,
sender: sender,
clusterName: clusterName,
sink: sink,
}
}

// NewMailgunMailer inits new Mailgun mailer
func NewMailgunMailer(c MailgunConfig, sender, clusterName string) Mailer {
func NewMailgunMailer(c MailgunConfig, sink common.StatusSink, sender, clusterName string, fallbackRecipients []string) Mailer {
m := mailgun.NewMailgun(c.Domain, c.PrivateKey)
if c.APIBase != "" {
m.SetAPIBase(c.APIBase)
}
return &MailgunMailer{m, sender, clusterName}
client := &http.Client{
Transport: &statusSinkTransport{
RoundTripper: http.DefaultTransport,
sink: sink,
},
}
m.SetClient(client)
return &MailgunMailer{
mailgun: m,
sender: sender,
clusterName: clusterName,
fallbackRecipients: fallbackRecipients,
}
}

// CheckHealth checks the health of the SMTP service.
func (m *SMTPMailer) CheckHealth(ctx context.Context) error {
log := logger.Get(ctx)
client, err := m.dialer.Dial()
m.emitStatus(ctx, err)
if err != nil {
return trace.Wrap(err)
}
if err := client.Close(); err != nil {
log.Debug("Failed to close client connection after health check")
}
return nil
}

// Send sends email via SMTP
Expand All @@ -91,10 +143,10 @@ func (m *SMTPMailer) Send(ctx context.Context, id, recipient, body, references s
}

err = m.dialer.DialAndSend(msg)
m.emitStatus(ctx, err)
if err != nil {
return "", trace.Wrap(err)
}

return id, nil
}

Expand Down Expand Up @@ -123,6 +175,38 @@ func (m *SMTPMailer) base36(input uint64) string {
return strings.ToUpper(strconv.FormatUint(input, 36))
}

// emitStatus emits status based on provided statusErr.
func (m *SMTPMailer) emitStatus(ctx context.Context, statusErr error) {
if m.sink == nil {
return
}

ctx, cancel := context.WithTimeout(ctx, statusEmitTimeout)
defer cancel()

log := logger.Get(ctx)
code := http.StatusOK
if statusErr != nil {
// Returned error is undocumented. Using generic error code for all errors.
code = http.StatusInternalServerError
}
if err := m.sink.Emit(ctx, common.StatusFromStatusCode(code)); err != nil {
log.WithError(err).Error("Error while emitting Email plugin status")
}
}

// CheckHealth checks the health of the Mailgun service.
func (m *MailgunMailer) CheckHealth(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout)
defer cancel()

msg := m.mailgun.NewMessage(m.sender, "Health Check", "Testing Mailgun API connection...", m.fallbackRecipients...)
msg.SetRequireTLS(true)
msg.EnableTestMode() // Test message submission without delivering to recipients.
_, _, err := m.mailgun.Send(ctx, msg)
return trace.Wrap(err)
}

// Send sends email via Mailgun
func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, references string) (string, error) {
subject := fmt.Sprintf("%v Role Request %v", m.clusterName, id)
Expand All @@ -136,7 +220,7 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference
msg.AddHeader("In-Reply-To", refHeader)
}

ctx, cancel := context.WithTimeout(ctx, time.Second*10)
ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout)
defer cancel()

_, id, err := m.mailgun.Send(ctx, msg)
Expand All @@ -147,3 +231,29 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference

return id, nil
}

// statusSinkTransport wraps the Mailgun client transport and
// emits plugin status.
type statusSinkTransport struct {
http.RoundTripper
sink common.StatusSink
}

// RoundTrip implements the http.RoundTripper interface.
func (t *statusSinkTransport) RoundTrip(req *http.Request) (*http.Response, error) {
log := logger.Get(req.Context())
resp, err := t.RoundTripper.RoundTrip(req)
if err != nil {
return nil, trace.Wrap(err)
}
if t.sink != nil {
ctx, cancel := context.WithTimeout(req.Context(), statusEmitTimeout)
defer cancel()

status := common.StatusFromStatusCode(resp.StatusCode)
if err := t.sink.Emit(ctx, status); err != nil {
log.WithError(err).Error("Error while emitting Email plugin status")
}
}
return resp, nil
}
7 changes: 7 additions & 0 deletions integrations/access/email/testlib/mock_mailgun.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func newMockMailgunServer(concurrency int) *mockMailgunServer {

id := uuid.New().String()

// The testmode flag is only used during health check.
// Do no create message when in testmode.
if r.PostFormValue("o:testmode") == "yes" {
fmt.Fprintf(w, `{"id": "%v"}`, id)
return
}

message := mockMailgunMessage{
ID: id,
Sender: r.PostFormValue("from"),
Expand Down
Loading