Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add access monitoring rules to msteams plugins #47638

Merged
merged 9 commits into from
Oct 22, 2024
96 changes: 78 additions & 18 deletions integrations/access/msteams/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package msteams
import (
"context"
"log/slog"
"slices"
"time"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/integrations/access/accessmonitoring"
"github.com/gravitational/teleport/integrations/access/common"
"github.com/gravitational/teleport/integrations/access/common/teleport"
"github.com/gravitational/teleport/integrations/lib"
Expand Down Expand Up @@ -53,7 +55,8 @@ type App struct {
watcherJob lib.ServiceJob
pd *pd.CompareAndSwap[PluginData]

log *slog.Logger
log *slog.Logger
accessMonitoringRules *accessmonitoring.RuleHandler

*lib.Process
}
Expand Down Expand Up @@ -85,13 +88,11 @@ func (a *App) Run(ctx context.Context) error {
}

a.Process = lib.NewProcess(ctx)
a.watcherJob, err = a.newWatcherJob()
if err != nil {
return trace.Wrap(err)
}

a.SpawnCriticalJob(a.mainJob)
a.SpawnCriticalJob(a.watcherJob)

select {
case <-ctx.Done():
Expand All @@ -116,10 +117,14 @@ func (a *App) init(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()

var err error
a.apiClient, err = common.GetTeleportClient(ctx, a.conf.Teleport)
if err != nil {
return trace.Wrap(err)
if a.conf.Client != nil {
a.apiClient = a.conf.Client
} else {
var err error
a.apiClient, err = common.GetTeleportClient(ctx, a.conf.Teleport)
if err != nil {
return trace.Wrap(err)
}
}

a.pd = pd.NewCAS(
Expand All @@ -145,6 +150,24 @@ func (a *App) init(ctx context.Context) error {
return trace.Wrap(err)
}

a.accessMonitoringRules = accessmonitoring.NewRuleHandler(accessmonitoring.RuleHandlerConfig{
marcoandredinis marked this conversation as resolved.
Show resolved Hide resolved
Client: a.apiClient,
PluginName: pluginName,
// Map msteams.RecipientData onto the common recipient type used
// by the access monitoring rules watcher.
FetchRecipientCallback: func(ctx context.Context, name string) (*common.Recipient, error) {
msTeamsRecipient, err := a.bot.FetchRecipient(ctx, name)
if err != nil {
return nil, trace.Wrap(err)
}
return &common.Recipient{
Name: name,
ID: msTeamsRecipient.ID,
Kind: string(msTeamsRecipient.Kind),
}, nil
},
Comment on lines +158 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plugin might be very slow with AMRs and we might want to increase the timeout to more than 5 seconds. Fetching recipients installs the app for every recipient, which can take several seconds and can fail.

We mitigated this by fetching every recipient on startup, this allowed to avoid timeouts and fail fast. A safer way to do this would be to tentatively fetch recipients when loading the AMR. We would still miss the fail fast, but at least we'd avoid the timeouts.

We can merge this as-is (with an increase timeout, I'd bump to at least 15 sec) and open an issue to improve the AMR logic later if that's still an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased the timeout for now, would installing the app for all possible recipients when initializing the rule cache at the start and then again whenever a new one is found be what you mean for the potential fix?

})

return a.initBot(ctx)
}

Expand Down Expand Up @@ -187,27 +210,52 @@ func (a *App) initBot(ctx context.Context) error {
return nil
}

// newWatcherJob creates WatcherJob
func (a *App) newWatcherJob() (lib.ServiceJob, error) {
return watcherjob.NewJob(
// run starts the main process
func (a *App) run(ctx context.Context) error {

process := lib.MustGetProcess(ctx)

watchKinds := []types.WatchKind{
{Kind: types.KindAccessRequest},
{Kind: types.KindAccessMonitoringRule},
}
acceptedWatchKinds := make([]string, 0, len(watchKinds))
watcherJob, err := watcherjob.NewJobWithConfirmedWatchKinds(
a.apiClient,
watcherjob.Config{
Watch: types.Watch{
Kinds: []types.WatchKind{{Kind: types.KindAccessRequest}},
},
Watch: types.Watch{Kinds: watchKinds, AllowPartialSuccess: true},
EventFuncTimeout: handlerTimeout,
},
a.onWatcherEvent,
func(ws types.WatchStatus) {
for _, watchKind := range ws.GetKinds() {
acceptedWatchKinds = append(acceptedWatchKinds, watchKind.Kind)
}
},
)
}

// run starts the main process
func (a *App) run(ctx context.Context) error {
ok, err := a.watcherJob.WaitReady(ctx)
if err != nil {
return trace.Wrap(err)
}

process.SpawnCriticalJob(watcherJob)

ok, err := watcherJob.WaitReady(ctx)
if err != nil {
return trace.Wrap(err)
}
if len(acceptedWatchKinds) == 0 {
return trace.BadParameter("failed to initialize watcher for all the required resources: %+v",
watchKinds)
}
// Check if KindAccessMonitoringRule resources are being watched,
// the role the plugin is running as may not have access.
if slices.Contains(acceptedWatchKinds, types.KindAccessMonitoringRule) {
if err := a.accessMonitoringRules.InitAccessMonitoringRulesCache(ctx); err != nil {
return trace.Wrap(err, "initializing Access Monitoring Rule cache")
}
}
a.watcherJob = watcherJob
a.watcherJob.SetReady(ok)
Comment on lines +240 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code contains two races:

  • we start processing AR events while the AMR cache is not inited
  • we start processing AMR events while the AMR cache is not inited (cache should be inited when receiving the watcher INIT event)

This will lead to two issues:

  • ARs processed early after plugin startup are not sent to the correct recipients
  • AMRs created/updated early after plugin startup are ignored by the plugin

As those issues also exist in the common AMR plugin implementation we might decide to merge the PR anyway and fix those later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting queuing up incoming ARs and AMRs for processing until the init is done?

Copy link
Contributor

@hugoShaka hugoShaka Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the AR vs AMR race I'm not sure yet, I would tend to create the AMR watcher first, then based on the result, setup AR watching. I'm worried this approach could lead to out-of-order events but I'm not even sure Teleport protects against them in the first place with a dual-listener. I'll ask @tigrato and @espadolini if we have strong ordering guarantees in the auth event stream.

For the AMR init race, I would edit the watcherjob logic to make it run the cache init login on the INIT event. This is the proper way of initializing a cache in Teleport, but the watcherjob lib currently swallows the init event because it was not designed to maintain a local cache but only receive ARs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no protection against out of order events even with a single listener: watcher events are guaranteed to be delivered in order only with respects to each individual item's events, with no guarantee of ordering across items.

FWIW the integrations/lib/watcherjob.job seems to take full advantage of that, sequentially applying events for the same kind and name but processing events for different resources concurrently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yeah, any replica fed by a watcher should open the watcher, wait for the OpInit event which is guaranteed to be the first event yielded by the watcher, then fetch the current state, then apply events in order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So plan for now is merge this then create a sperate issue and PR for the fixes to the access monitoring rule cache?
@hugoShaka

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all other plugins already have broken watchers so one more won't be an issue.

if ok {
a.log.InfoContext(ctx, "Plugin is ready")
} else {
Expand Down Expand Up @@ -243,6 +291,10 @@ func (a *App) checkTeleportVersion(ctx context.Context) (proto.PingResponse, err
// onWatcherEvent called when an access request event is received
func (a *App) onWatcherEvent(ctx context.Context, event types.Event) error {
kind := event.Resource.GetKind()
if kind == types.KindAccessMonitoringRule {
return trace.Wrap(a.accessMonitoringRules.HandleAccessMonitoringRule(ctx, event))
}

if kind != types.KindAccessRequest {
return trace.Errorf("unexpected kind %s", kind)
}
Expand Down Expand Up @@ -480,6 +532,14 @@ func (a *App) getMessageRecipients(ctx context.Context, req types.AccessRequest)
recipientSet := stringset.New()

a.log.DebugContext(ctx, "Getting suggested reviewer recipients")
accessRuleRecipients := a.accessMonitoringRules.RecipientsFromAccessMonitoringRules(ctx, req)
accessRuleRecipients.ForEach(func(r common.Recipient) {
recipientSet.Add(r.Name)
})
if recipientSet.Len() != 0 {
return recipientSet.ToSlice()
}

var validEmailsSuggReviewers []string
for _, reviewer := range req.GetSuggestedReviewers() {
if !lib.IsEmail(reviewer) {
Expand Down
80 changes: 71 additions & 9 deletions integrations/access/msteams/testlib/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
v1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/integrations/access/common"
"github.com/gravitational/teleport/integrations/access/msteams"
Expand All @@ -37,7 +39,7 @@ import (
"github.com/gravitational/teleport/integrations/lib/testing/integration"
)

// MsTeamsBaseSuite is the Slack access plugin test suite.
// MsTeamsBaseSuite is the MsTeams access plugin test suite.
// It implements the testify.TestingSuite interface.
type MsTeamsBaseSuite struct {
*integration.AccessRequestSuite
Expand All @@ -51,16 +53,19 @@ type MsTeamsBaseSuite struct {
reviewer2TeamsUser msapi.User
}

// SetupTest starts a fake Slack, generates the plugin configuration, and loads
// the fixtures in Slack. It runs for each test.
// SetupTest starts a fake MsTeams, generates the plugin configuration, and loads
// the fixtures in MsTeams. It runs for each test.
func (s *MsTeamsBaseSuite) SetupTest() {
t := s.T()

err := logger.Setup(logger.Config{Severity: "debug"})
require.NoError(t, err)
s.raceNumber = runtime.GOMAXPROCS(0)

s.fakeTeams = NewFakeTeams(s.raceNumber)
t.Cleanup(s.fakeTeams.Close)

// We need requester users as well, the slack plugin sends messages to users
// We need requester users as well, the MsTeams plugin sends messages to users
// when their access request got approved.
s.requesterOSSTeamsUser = s.fakeTeams.StoreUser(msapi.User{Name: "Requester OSS", Mail: integration.RequesterOSSUserName})
s.requester1TeamsUser = s.fakeTeams.StoreUser(msapi.User{Name: "Requester Ent", Mail: integration.Requester1UserName})
Expand All @@ -71,16 +76,17 @@ func (s *MsTeamsBaseSuite) SetupTest() {

var conf msteams.Config
conf.Teleport = s.TeleportConfig()
apiClient, err := common.GetTeleportClient(context.Background(), s.TeleportConfig())
require.NoError(t, err)
conf.Client = apiClient
conf.StatusSink = s.fakeStatusSink
conf.MSAPI = s.fakeTeams.Config
conf.MSAPI.SetBaseURLs(s.fakeTeams.URL(), s.fakeTeams.URL(), s.fakeTeams.URL())
conf.Log = logger.Config{
Severity: "debug",
}

s.appConfig = &conf
}

// startApp starts the Slack plugin, waits for it to become ready and returns.
// startApp starts the MsTeams plugin, waits for it to become ready and returns.
func (s *MsTeamsBaseSuite) startApp() {
s.T().Helper()
t := s.T()
Expand Down Expand Up @@ -414,7 +420,9 @@ func (s *MsTeamsSuiteEnterprise) TestRace() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)

s.appConfig.Log.Severity = "debug" // Turn off noisy debug logging
err := logger.Setup(logger.Config{Severity: "info"}) // Turn off noisy debug logging
require.NoError(t, err)

s.startApp()

var (
Expand Down Expand Up @@ -527,3 +535,57 @@ func (s *MsTeamsSuiteEnterprise) TestRace() {
return next
})
}

func (s *MsTeamsSuiteOSS) TestRecipientsFromAccessMonitoringRule() {
t := s.T()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
t.Cleanup(cancel)

s.startApp()

_, err := s.ClientByName(integration.RulerUserName).
AccessMonitoringRulesClient().
CreateAccessMonitoringRule(ctx, &accessmonitoringrulesv1.AccessMonitoringRule{
Kind: types.KindAccessMonitoringRule,
Version: types.V1,
Metadata: &v1.Metadata{
Name: "test-msteams-amr",
},
Spec: &accessmonitoringrulesv1.AccessMonitoringRuleSpec{
Subjects: []string{types.KindAccessRequest},
Condition: "!is_empty(access_request.spec.roles)",
Notification: &accessmonitoringrulesv1.Notification{
Name: "msteams",
Recipients: []string{
s.reviewer1TeamsUser.ID,
s.reviewer2TeamsUser.Mail,
},
},
},
})
assert.NoError(t, err)

// Test execution: create an access request
req := s.CreateAccessRequest(ctx, integration.RequesterOSSUserName, nil)

s.checkPluginData(ctx, req.GetName(), func(data msteams.PluginData) bool {
return len(data.TeamsData) > 0
})

title := "Access Request " + req.GetName()
msgs, err := s.getNewMessages(ctx, 2)
require.NoError(t, err)

var body1 testTeamsMessage
require.NoError(t, json.Unmarshal([]byte(msgs[0].Body), &body1))
body1.checkTitle(t, title)
require.Equal(t, msgs[0].RecipientID, s.reviewer1TeamsUser.ID)

var body2 testTeamsMessage
require.NoError(t, json.Unmarshal([]byte(msgs[1].Body), &body2))
body1.checkTitle(t, title)
require.Equal(t, msgs[1].RecipientID, s.reviewer2TeamsUser.ID)

assert.NoError(t, s.ClientByName(integration.RulerUserName).
AccessMonitoringRulesClient().DeleteAccessMonitoringRule(ctx, "test-msteams-amr"))
}
Loading