From fd4b2b60d27c3e15cfaa71ae051a99732e6d723e Mon Sep 17 00:00:00 2001 From: Peter Van Bouwel Date: Tue, 17 Dec 2024 08:52:47 +0100 Subject: [PATCH] feature: localPolicyRetriever cache invalidation [https://github.com/VITObelgium/fakes3pp/issues/15] --- cmd/policy-retrieval.go | 155 ++++++++++++++++++++++++++++++++++- cmd/policy-retrieval_test.go | 4 + 2 files changed, 156 insertions(+), 3 deletions(-) diff --git a/cmd/policy-retrieval.go b/cmd/policy-retrieval.go index 671723d..3dd2482 100644 --- a/cmd/policy-retrieval.go +++ b/cmd/policy-retrieval.go @@ -3,9 +3,12 @@ package cmd import ( "bytes" "fmt" + "log/slog" "path/filepath" "sync" "text/template" + + "github.com/fsnotify/fsnotify" ) const PathSeparator = "/" @@ -14,12 +17,54 @@ const policySuffix = ".json.tmpl" type LocalPolicyRetriever struct{ rolePolicyPath string + + //To communicate cache invalidation. + pm *PolicyManager + + //To monitor file system changes + watcher *fsnotify.Watcher } func NewLocalPolicyRetriever(stsRolePolicyPath string) *LocalPolicyRetriever { - return &LocalPolicyRetriever{ + var lp *LocalPolicyRetriever + + var fileDeleted fileCallback = func(fileName string) { + if lp.pm == nil { + slog.Warn("There was no Policy Manager for local retriever to handle file deletion", "retriever", lp) + } else { + arn, err := lp.getPolicyArn(fileName) + if err != nil { + slog.Error("Could not get arn", "filename", fileName) + } + slog.Info("Remove policy", "arn", arn) + lp.pm.deletePolicyCacheEntry(arn) + } + } + + var fileUpdated fileCallback = func(fileName string) { + if lp.pm == nil { + slog.Warn("There was no Policy Manager for local retriever to handle file update", "retriever", lp) + } else { + arn, err := lp.getPolicyArn(fileName) + if err != nil { + slog.Error("Could not get arn", "filename", fileName) + } + slog.Info("Reload policy", "arn", arn) + lp.pm.deletePolicyCacheEntry(arn) + _, err = lp.pm.getPolicyTemplate(arn) + if err != nil { + slog.Warn("Could not get policy", "policyArn", arn) + } + } + } + + watcher := createFileWatcherAndStartWatching(fileUpdated, fileDeleted) + lp = &LocalPolicyRetriever{ rolePolicyPath: stsRolePolicyPath, + watcher: watcher, } + + return lp } func (r *LocalPolicyRetriever) getPolicyPathPrefix() (string) { @@ -31,6 +76,22 @@ func (r *LocalPolicyRetriever) getPolicyPath(arn string) (string) { return fmt.Sprintf("%s%s%s", r.getPolicyPathPrefix(), safeRoleArn, policySuffix) } +func (r *LocalPolicyRetriever) getPolicyArn(filePath string) (string, error) { + prefix := r.getPolicyPathPrefix() + suffix := policySuffix + + if len(suffix) > len(filePath) || len(prefix) > len(filePath) - len(suffix) { + slog.Warn("Invalid file path for policy", "filepath", filePath) + } + + safePolicyName := filePath[len(prefix):len(filePath) - len(suffix)] + policyArn, err := b32_decode(safePolicyName) + if err != nil { + return "", err + } + return policyArn, nil +} + func (r LocalPolicyRetriever) retrieveAllIdentifiers() ([]string, error) { prefix := r.getPolicyPathPrefix() suffix := policySuffix @@ -50,19 +111,31 @@ func (r LocalPolicyRetriever) retrieveAllIdentifiers() ([]string, error) { } func (r *LocalPolicyRetriever) retrievePolicyStr(arn string) (string, error) { - c, err := readFileFull(r.getPolicyPath(arn)) + filePath := r.getPolicyPath(arn) + startWatching(r.watcher, filePath) // For cache invalidation + c, err := readFileFull(filePath) if err != nil { return "", err } return string(c), err } +func (r *LocalPolicyRetriever) registerPolicyManager(pm *PolicyManager) { + r.pm = pm +} + type PolicyRetriever interface { //Retrieve the policy content based out of an identifier which can be an AWS ARN retrievePolicyStr(string) (string, error) //Get all policy identifiers retrieveAllIdentifiers() ([]string, error) + + //Set PolicyManager + //Each policy retriever can be used by 1 policy Manager when the policy manager gets + //created with a policy retriever it will register itself using this method this allows + //The retriever to do calls to the policy manager for example to communicate policy changes + registerPolicyManager(pm *PolicyManager) } type PolicyManager struct { @@ -176,10 +249,86 @@ func (m *PolicyManager) GetPolicy(arn string, data *PolicySessionData) (string, return buf.String(), nil } +func (m *PolicyManager) deletePolicyCacheEntry(arn string) { + m.tMux.Lock() + defer m.tMux.Unlock() + _, exists := m.templates[arn] + if !exists { + return + } else { + delete(m.templates, arn) + } +} + func NewPolicyManager(r PolicyRetriever) *PolicyManager{ - return &PolicyManager{ + pm := &PolicyManager{ retriever: r, templates: map[string]*template.Template{}, tMux: &sync.RWMutex{}, } + r.registerPolicyManager(pm) + return pm +} + +//A callback function that takes a filepath to action a change to a file. +type fileCallback func(string) () + + +//Start a watcher to keep an eye on files +// +//This will start watching later on +func createFileWatcherAndStartWatching(fileChanged, fileDeleted fileCallback) (*fsnotify.Watcher) { + //See https://github.com/fsnotify/fsnotify + watcher, err := fsnotify.NewWatcher() + if err != nil { + slog.Error("Could not create new watcher", "error", err) + } + + // Start listening for events. + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + slog.Debug("Config watcher event", "event", event) + if event.Has(fsnotify.Write) { + slog.Debug("Write notification", "event", event) + fileChanged(event.Name) + } + if event.Has(fsnotify.Remove) { + slog.Debug("Deletion notification", "event", event) + fileDeleted(event.Name) + // See https://ahmet.im/blog/kubernetes-inotify/ + restartWatching(watcher, event.Name) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + slog.Warn("error with file watcher", "error", err) + } + } + }() + return watcher +} + +func startWatching(watcher *fsnotify.Watcher, fileName string) { + err := watcher.Add(fileName) + if err != nil { + slog.Error("Could not add watcher", "filename", fileName, "error", err) + } else { + slog.Debug("Started watching file", "filename", fileName) + } +} + +func restartWatching(watcher *fsnotify.Watcher, fileName string) { + err := watcher.Remove(fileName) + if err != nil { + slog.Debug("Wanted to stop watching file but watcher was gone", "filename", fileName) + } else { + slog.Debug("Stopped watching file", "filename", fileName) + } + startWatching(watcher, fileName) } \ No newline at end of file diff --git a/cmd/policy-retrieval_test.go b/cmd/policy-retrieval_test.go index a344b97..fb7d442 100644 --- a/cmd/policy-retrieval_test.go +++ b/cmd/policy-retrieval_test.go @@ -57,6 +57,10 @@ func (r TestPolicyRetriever) retrievePolicyStr(arn string) (string, error) { return policy, nil } +func (r TestPolicyRetriever) registerPolicyManager(pm *PolicyManager) { + //Cache invalidation is not a thing for testpolicy retriever so no need to keep PolicyManager +} + func (r TestPolicyRetriever) retrieveAllIdentifiers() ([]string, error) { keys := make([]string, len(r.testPolicies))