diff --git a/pkg/event_handler/event_handler.go b/pkg/event_handler/event_handler.go index ae4b41d..0f72413 100644 --- a/pkg/event_handler/event_handler.go +++ b/pkg/event_handler/event_handler.go @@ -2,6 +2,7 @@ package event_handler import ( "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/handlers" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog/v2" ) @@ -22,7 +23,7 @@ func Start(eventListener IListener, initControllerHandler func() (IStoppableRsyn return eventListener.Run(func() { klog.Infof("Resync request received. Recreating controllers for the new port configuration") - if controllerHandler != nil { + if controllerHandler != (*handlers.ControllersHandler)(nil) { controllerHandler.Stop() } diff --git a/pkg/event_handler/polling/polling.go b/pkg/event_handler/polling/polling.go index 8a7bdc6..770d1a5 100644 --- a/pkg/event_handler/polling/polling.go +++ b/pkg/event_handler/polling/polling.go @@ -75,10 +75,8 @@ func (h *Handler) Run(resync func()) { klog.Infof("Polling event listener iteration after %d seconds. Checking for changes...", h.pollingRate) configuration, err := integration.GetIntegration(h.portClient, h.stateKey) if err != nil { - klog.Errorf("error resyncing: %s", err.Error()) - } - - if reflect.DeepEqual(currentState, configuration) != true { + klog.Errorf("error getting integration: %s", err.Error()) + } else if reflect.DeepEqual(currentState, configuration) != true { klog.Infof("Changes detected. Resyncing...") currentState = configuration resync() diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index f7ef18a..db2b722 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -2,20 +2,19 @@ package handlers import ( "context" - "time" - "github.com/port-labs/port-k8s-exporter/pkg/config" - "github.com/port-labs/port-k8s-exporter/pkg/port/integration" - "github.com/port-labs/port-k8s-exporter/pkg/crd" "github.com/port-labs/port-k8s-exporter/pkg/goutils" "github.com/port-labs/port-k8s-exporter/pkg/k8s" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "github.com/port-labs/port-k8s-exporter/pkg/port/integration" "github.com/port-labs/port-k8s-exporter/pkg/signal" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/klog/v2" + "sync" + "time" ) type ControllersHandler struct { @@ -24,6 +23,7 @@ type ControllersHandler struct { stateKey string portClient *cli.PortClient stopCh chan struct{} + isStopped bool } func NewControllersHandler(exporterConfig *port.Config, portConfig *port.IntegrationAppConfig, k8sClient *k8s.Client, portClient *cli.PortClient) *ControllersHandler { @@ -71,59 +71,81 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.Integra func (c *ControllersHandler) Handle() { klog.Info("Starting informers") c.informersFactory.Start(c.stopCh) - klog.Info("Waiting for informers cache sync") + + currentEntitiesSets := make([]map[string]interface{}, 0) + shouldDeleteStaleEntities := true + var syncWg sync.WaitGroup + for _, controller := range c.controllers { + controller := controller + + go func() { + <-c.stopCh + klog.Info("Shutting down controllers") + controller.Shutdown() + klog.Info("Exporter exiting") + }() + + klog.Infof("Waiting for informer cache to sync for resource '%s'", controller.Resource.Kind) if err := controller.WaitForCacheSync(c.stopCh); err != nil { klog.Fatalf("Error while waiting for informer cache sync: %s", err.Error()) } - } - currentEntitiesSet := make([]map[string]interface{}, 0) - for _, controller := range c.controllers { - controllerEntitiesSet, rawDataExamples, err := controller.GetEntitiesSet() - if err != nil { - klog.Errorf("error getting controller entities set: %s", err.Error()) - } - currentEntitiesSet = append(currentEntitiesSet, controllerEntitiesSet) - if len(rawDataExamples) > 0 { - err = integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, rawDataExamples) - if err != nil { - klog.Warningf("failed to post integration kind example: %s", err.Error()) + syncWg.Add(1) + go func() { + defer syncWg.Done() + klog.Infof("Starting full initial resync for resource '%s'", controller.Resource.Kind) + initialSyncResult := controller.RunInitialSync() + klog.Infof("Done full initial resync, starting live events sync for resource '%s'", controller.Resource.Kind) + controller.RunEventsSync(1, c.stopCh) + if initialSyncResult.EntitiesSet != nil { + currentEntitiesSets = append(currentEntitiesSets, initialSyncResult.EntitiesSet) } - } - } - - klog.Info("Deleting stale entities") - c.RunDeleteStaleEntities(currentEntitiesSet) - klog.Info("Starting controllers") - for _, controller := range c.controllers { - controller.Run(1, c.stopCh) + if len(initialSyncResult.RawDataExamples) > 0 { + err := integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, initialSyncResult.RawDataExamples) + if err != nil { + klog.Warningf("failed to post integration kind example: %s", err.Error()) + } + } + shouldDeleteStaleEntities = shouldDeleteStaleEntities && initialSyncResult.ShouldDeleteStaleEntities + }() } + syncWg.Wait() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() go func() { <-c.stopCh - klog.Info("Shutting down controllers") - for _, controller := range c.controllers { - controller.Shutdown() - } - klog.Info("Exporter exiting") + cancelCtx() }() + + if shouldDeleteStaleEntities { + klog.Info("Deleting stale entities") + c.runDeleteStaleEntities(ctx, currentEntitiesSets) + klog.Info("Done deleting stale entities") + } else { + klog.Warning("Skipping delete of stale entities due to a failure in getting all current entities from k8s") + } } -func (c *ControllersHandler) RunDeleteStaleEntities(currentEntitiesSet []map[string]interface{}) { - _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) +func (c *ControllersHandler) runDeleteStaleEntities(ctx context.Context, currentEntitiesSet []map[string]interface{}) { + _, err := c.portClient.Authenticate(ctx, c.portClient.ClientID, c.portClient.ClientSecret) if err != nil { klog.Errorf("error authenticating with Port: %v", err) } - err = c.portClient.DeleteStaleEntities(context.Background(), c.stateKey, goutils.MergeMaps(currentEntitiesSet...)) + err = c.portClient.DeleteStaleEntities(ctx, c.stateKey, goutils.MergeMaps(currentEntitiesSet...)) if err != nil { klog.Errorf("error deleting stale entities: %s", err.Error()) } - klog.Info("Done deleting stale entities") } func (c *ControllersHandler) Stop() { + if c.isStopped { + return + } + klog.Info("Stopping controllers") close(c.stopCh) + c.isStopped = true } diff --git a/pkg/handlers/controllers_test.go b/pkg/handlers/controllers_test.go new file mode 100644 index 0000000..c8db339 --- /dev/null +++ b/pkg/handlers/controllers_test.go @@ -0,0 +1,540 @@ +package handlers + +import ( + "context" + "errors" + "fmt" + guuid "github.com/google/uuid" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/defaults" + "github.com/port-labs/port-k8s-exporter/pkg/k8s" + "github.com/port-labs/port-k8s-exporter/pkg/port" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + _ "github.com/port-labs/port-k8s-exporter/test_utils" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + apiextensionsv1fake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached/memory" + discoveryfake "k8s.io/client-go/discovery/fake" + k8sfake "k8s.io/client-go/dynamic/fake" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/restmapper" + "strings" + "sync" + "testing" + "time" +) + +var ( + blueprint = "k8s-export-test-bp" + deploymentKind = "apps/v1/deployments" + daemonSetKind = "apps/v1/daemonsets" +) + +type fixture struct { + t *testing.T + controllersHandler *ControllersHandler + k8sClient *k8s.Client + portClient *cli.PortClient +} + +type fixtureConfig struct { + portClientId string + portClientSecret string + stateKey string + sendRawDataExamples *bool + resources []port.Resource + existingObjects []runtime.Object +} + +type resourceMapEntry struct { + list *metav1.APIResourceList + err error +} + +type fakeDiscovery struct { + *discoveryfake.FakeDiscovery + + lock sync.Mutex + groupList *metav1.APIGroupList + groupListErr error + resourceMap map[string]*resourceMapEntry +} + +func (c *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + c.lock.Lock() + defer c.lock.Unlock() + if rl, ok := c.resourceMap[groupVersion]; ok { + return rl.list, rl.err + } + return nil, errors.New("doesn't exist") +} + +func (c *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { + c.lock.Lock() + defer c.lock.Unlock() + if c.groupList == nil { + return nil, errors.New("doesn't exist") + } + return c.groupList, c.groupListErr +} + +func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { + defaultTrue := true + sendRawDataExamples := &defaultTrue + if fixtureConfig.sendRawDataExamples != nil { + sendRawDataExamples = fixtureConfig.sendRawDataExamples + } + + integrationConfig := &port.IntegrationAppConfig{ + DeleteDependents: true, + CreateMissingRelatedEntities: true, + SendRawDataExamples: sendRawDataExamples, + Resources: fixtureConfig.resources, + } + + if fixtureConfig.stateKey != "" { + config.ApplicationConfig.StateKey = fixtureConfig.stateKey + } else { + config.ApplicationConfig.StateKey = "my-k8s-exporter" + } + + applicationConfig := &config.ApplicationConfiguration{ + ConfigFilePath: config.ApplicationConfig.ConfigFilePath, + ResyncInterval: config.ApplicationConfig.ResyncInterval, + PortBaseURL: config.ApplicationConfig.PortBaseURL, + EventListenerType: config.ApplicationConfig.EventListenerType, + CreateDefaultResources: config.ApplicationConfig.CreateDefaultResources, + OverwriteConfigurationOnRestart: config.ApplicationConfig.OverwriteConfigurationOnRestart, + Resources: integrationConfig.Resources, + DeleteDependents: integrationConfig.DeleteDependents, + CreateMissingRelatedEntities: integrationConfig.CreateMissingRelatedEntities, + UpdateEntityOnlyOnDiff: config.ApplicationConfig.UpdateEntityOnlyOnDiff, + PortClientId: config.ApplicationConfig.PortClientId, + PortClientSecret: config.ApplicationConfig.PortClientSecret, + StateKey: config.ApplicationConfig.StateKey, + } + + if fixtureConfig.portClientId != "" { + applicationConfig.PortClientId = fixtureConfig.portClientId + } + if fixtureConfig.portClientSecret != "" { + applicationConfig.PortClientSecret = fixtureConfig.portClientSecret + } + + exporterConfig := &port.Config{ + StateKey: applicationConfig.StateKey, + EventListenerType: applicationConfig.EventListenerType, + CreateDefaultResources: applicationConfig.CreateDefaultResources, + ResyncInterval: applicationConfig.ResyncInterval, + OverwriteConfigurationOnRestart: applicationConfig.OverwriteConfigurationOnRestart, + Resources: applicationConfig.Resources, + DeleteDependents: applicationConfig.DeleteDependents, + CreateMissingRelatedEntities: applicationConfig.CreateMissingRelatedEntities, + } + + groups := make([]metav1.APIGroup, 0) + resourceMap := make(map[string]*resourceMapEntry) + + for _, resource := range integrationConfig.Resources { + gvr := getGvr(resource.Kind) + version := metav1.GroupVersionForDiscovery{Version: gvr.Version, GroupVersion: fmt.Sprintf("%s/%s", gvr.Group, gvr.Version)} + groupFound := false + for i, group := range groups { + if group.Name == gvr.Group { + groupFound = true + versionFound := false + for _, v := range group.Versions { + if v.Version == gvr.Version { + versionFound = true + break + } + } + if !versionFound { + groups[i].Versions = append(groups[i].Versions, version) + } + } + } + if !groupFound { + groups = append(groups, metav1.APIGroup{Name: gvr.Group, Versions: []metav1.GroupVersionForDiscovery{version}}) + } + resourceMapKey := fmt.Sprintf("%s/%s", gvr.Group, gvr.Version) + apiResource := metav1.APIResource{ + Name: gvr.Resource, + Namespaced: true, + Group: gvr.Group, + Version: gvr.Version, + } + if _, ok := resourceMap[resourceMapKey]; ok { + resourceMap[resourceMapKey].list.APIResources = append(resourceMap[resourceMapKey].list.APIResources, apiResource) + } else { + resourceMap[resourceMapKey] = &resourceMapEntry{ + list: &metav1.APIResourceList{ + GroupVersion: resourceMapKey, + APIResources: []metav1.APIResource{apiResource}, + }, + } + } + } + + fakeD := &fakeDiscovery{ + groupList: &metav1.APIGroupList{ + Groups: groups, + }, + resourceMap: resourceMap, + } + + kClient := fakeclientset.NewSimpleClientset() + discoveryClient := discovery.NewDiscoveryClient(fakeD.RESTClient()) + dynamicClient := k8sfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), newGvrToListKind(), fixtureConfig.existingObjects...) + fae := apiextensionsv1fake.FakeApiextensionsV1{Fake: &kClient.Fake} + apiExtensionsClient := apiextensionsv1.New(fae.RESTClient()) + cacheClient := memory.NewMemCacheClient(fakeD) + discoveryMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheClient) + k8sClient := &k8s.Client{DiscoveryClient: discoveryClient, DynamicClient: dynamicClient, DiscoveryMapper: discoveryMapper, ApiExtensionClient: apiExtensionsClient} + portClient := cli.New(applicationConfig) + err := defaults.InitIntegration(portClient, exporterConfig) + if err != nil { + t.Errorf("error initializing integration: %v", err) + } + + controllersHandler := NewControllersHandler(exporterConfig, integrationConfig, k8sClient, portClient) + + return &fixture{ + t: t, + controllersHandler: controllersHandler, + k8sClient: k8sClient, + portClient: portClient, + } +} + +func newResource(selectorQuery string, mappings []port.EntityMapping, kind string) port.Resource { + return port.Resource{ + Kind: kind, + Selector: port.Selector{ + Query: selectorQuery, + }, + Port: port.Port{ + Entity: port.EntityMappings{ + Mappings: mappings, + }, + }, + } +} + +func newDeployment() *appsv1.Deployment { + labels := map[string]string{ + "app": "port-k8s-exporter", + } + return &appsv1.Deployment{ + TypeMeta: v1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "port-k8s-exporter", + Namespace: "port-k8s-exporter", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &v1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "port-k8s-exporter", + Image: "port-k8s-exporter:latest", + }, + }, + }, + }, + }, + } +} + +func newDaemonSet() *appsv1.DaemonSet { + labels := map[string]string{ + "app": "port-k8s-exporter", + } + return &appsv1.DaemonSet{ + TypeMeta: v1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "apps/v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "port-k8s-exporter-ds", + Namespace: "port-k8s-exporter", + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &v1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "port-k8s-exporter", + Image: "port-k8s-exporter:latest", + }, + }, + }, + }, + }, + } +} + +func newUnstructured(obj interface{}) *unstructured.Unstructured { + res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + panic(err) + } + return &unstructured.Unstructured{Object: res} +} + +func newGvrToListKind() map[schema.GroupVersionResource]string { + return map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + {Group: "apps", Version: "v1", Resource: "daemonsets"}: "DaemonSetList", + } +} + +func getGvr(kind string) schema.GroupVersionResource { + s := strings.SplitN(kind, "/", 3) + return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} +} + +func getBaseResource(kind string) port.Resource { + return newResource("", []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{ + "text": "\"pod\"", + "num": "1", + "bool": "true", + "obj": ".spec.selector", + "arr": ".spec.template.spec.containers", + }, + Relations: map[string]interface{}{ + "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }, kind) +} + +func (f *fixture) createObjects(objects []*unstructured.Unstructured, kind string) { + if objects != nil { + for _, d := range objects { + gvr := getGvr(kind) + _, err := f.k8sClient.DynamicClient.Resource(gvr).Namespace(d.GetNamespace()).Create(context.TODO(), d, metav1.CreateOptions{}) + if err != nil { + f.t.Errorf("error creating object %s: %v", d.GetName(), err) + } + } + } +} + +func (f *fixture) updateObjects(objects []*unstructured.Unstructured, kind string) { + if objects != nil { + for _, d := range objects { + gvr := getGvr(kind) + _, err := f.k8sClient.DynamicClient.Resource(gvr).Namespace(d.GetNamespace()).Update(context.TODO(), d, metav1.UpdateOptions{}) + if err != nil { + f.t.Errorf("error updating object %s: %v", d.GetName(), err) + } + } + } +} + +func (f *fixture) deleteObjects(objects []struct{ kind, namespace, name string }) { + if objects != nil { + for _, d := range objects { + gvr := getGvr(d.kind) + err := f.k8sClient.DynamicClient.Resource(gvr).Namespace(d.namespace).Delete(context.TODO(), d.name, metav1.DeleteOptions{}) + if err != nil { + f.t.Errorf("error deleting object %s: %v", d.name, err) + } + } + } +} + +func (f *fixture) assertObjectsHandled(objects []struct{ kind, name string }) { + assert.Eventually(f.t, func() bool { + integrationKinds, err := f.portClient.GetIntegrationKinds(f.controllersHandler.stateKey) + if err != nil { + return false + } + + for _, obj := range objects { + examples := integrationKinds[obj.kind].Examples + found := false + for _, example := range examples { + if example.Data["metadata"].(map[string]interface{})["name"] == obj.name { + found = true + continue + } + } + if !found { + return false + } + } + + return true + }, time.Second*10, time.Millisecond*500) + + assert.Eventually(f.t, func() bool { + entities, err := f.portClient.SearchEntities(context.Background(), port.SearchBody{ + Rules: []port.Rule{ + { + Property: "$datasource", + Operator: "contains", + Value: "port-k8s-exporter", + }, + { + Property: "$datasource", + Operator: "contains", + Value: fmt.Sprintf("statekey/%s", f.controllersHandler.stateKey), + }, + }, + Combinator: "and", + }) + + for _, obj := range objects { + found := false + for _, entity := range entities { + if entity.Identifier == obj.name { + found = true + continue + } + } + if !found { + return false + } + } + + return err == nil && len(entities) == len(objects) + }, time.Second*10, time.Millisecond*500) +} + +func (f *fixture) runControllersHandle() { + f.controllersHandler.Handle() +} + +func TestSuccessfulControllersHandle(t *testing.T) { + de := newDeployment() + de.Name = guuid.NewString() + da := newDaemonSet() + da.Name = guuid.NewString() + resources := []port.Resource{getBaseResource(deploymentKind), getBaseResource(daemonSetKind)} + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{newUnstructured(de), newUnstructured(da)}, stateKey: guuid.NewString()}) + defer f.portClient.DeleteIntegration(f.controllersHandler.stateKey) + + // To test later that the delete stale entities is working + f.portClient.CreateEntity(context.Background(), &port.EntityRequest{Blueprint: blueprint, Identifier: guuid.NewString()}, "", false) + + f.runControllersHandle() + + f.assertObjectsHandled([]struct{ kind, name string }{{kind: deploymentKind, name: de.Name}, {kind: daemonSetKind, name: da.Name}}) + + nde := newDeployment() + nde.Name = guuid.NewString() + f.createObjects([]*unstructured.Unstructured{newUnstructured(nde)}, deploymentKind) + + nda := newDaemonSet() + nda.Name = guuid.NewString() + f.createObjects([]*unstructured.Unstructured{newUnstructured(nda)}, daemonSetKind) + + assert.Eventually(t, func() bool { + for _, eid := range []string{nde.Name, nda.Name} { + _, err := f.portClient.ReadEntity(context.Background(), eid, blueprint) + if err != nil { + return false + } + } + return true + }, time.Second*10, time.Millisecond*500) + + nde.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects([]*unstructured.Unstructured{newUnstructured(nde)}, deploymentKind) + da.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects([]*unstructured.Unstructured{newUnstructured(da)}, daemonSetKind) + + assert.Eventually(t, func() bool { + entity, err := f.portClient.ReadEntity(context.Background(), nde.Name, blueprint) + if err != nil || entity.Properties["obj"].(map[string]interface{})["matchLabels"].(map[string]interface{})["app"] != nde.Spec.Selector.MatchLabels["app"] { + return false + } + entity, err = f.portClient.ReadEntity(context.Background(), da.Name, blueprint) + return err == nil && entity.Properties["obj"].(map[string]interface{})["matchLabels"].(map[string]interface{})["app"] == nde.Spec.Selector.MatchLabels["app"] + }, time.Second*10, time.Millisecond*500) + + f.deleteObjects([]struct{ kind, namespace, name string }{ + {kind: deploymentKind, namespace: de.Namespace, name: de.Name}, {kind: daemonSetKind, namespace: da.Namespace, name: da.Name}, + {kind: deploymentKind, namespace: nde.Namespace, name: nde.Name}, {kind: daemonSetKind, namespace: nda.Namespace, name: nda.Name}}) + + assert.Eventually(t, func() bool { + for _, eid := range []string{de.Name, da.Name, nde.Name, nda.Name} { + _, err := f.portClient.ReadEntity(context.Background(), eid, blueprint) + if err == nil || !strings.Contains(err.Error(), "was not found") { + return false + } + } + return true + }, time.Second*10, time.Millisecond*500) +} + +func TestControllersHandleTolerateFailure(t *testing.T) { + resources := []port.Resource{getBaseResource(deploymentKind)} + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{}}) + + f.runControllersHandle() + + invalidId := fmt.Sprintf("%s!@#", guuid.NewString()) + d := newDeployment() + d.Name = invalidId + f.createObjects([]*unstructured.Unstructured{newUnstructured(d)}, deploymentKind) + + id := guuid.NewString() + d.Name = id + f.createObjects([]*unstructured.Unstructured{newUnstructured(d)}, deploymentKind) + + assert.Eventually(t, func() bool { + _, err := f.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil + }, time.Second*5, time.Millisecond*500) + + f.deleteObjects([]struct{ kind, namespace, name string }{{kind: deploymentKind, namespace: d.Namespace, name: d.Name}}) + + assert.Eventually(t, func() bool { + _, err := f.portClient.ReadEntity(context.Background(), id, blueprint) + return err != nil && strings.Contains(err.Error(), "was not found") + }, time.Second*5, time.Millisecond*500) +} + +func TestControllersHandler_Stop(t *testing.T) { + resources := []port.Resource{getBaseResource(deploymentKind)} + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{}}) + + f.controllersHandler.Stop() + assert.True(t, f.controllersHandler.isStopped) + f.controllersHandler.Stop() + assert.True(t, f.controllersHandler.isStopped) + assert.Panics(t, func() { close(f.controllersHandler.stopCh) }) +} diff --git a/pkg/jq/parser.go b/pkg/jq/parser.go index 3edfa51..2e1dad6 100644 --- a/pkg/jq/parser.go +++ b/pkg/jq/parser.go @@ -117,17 +117,23 @@ func ParseMapInterface(jqQueries map[string]string, obj interface{}) (map[string return mapInterface, nil } -func ParseRelations(jqQueries map[string]interface{}, obj interface{}) (map[string]interface{}, error) { +func ParseMapRecursively(jqQueries map[string]interface{}, obj interface{}) (map[string]interface{}, error) { mapInterface := make(map[string]interface{}, len(jqQueries)) for key, jqQuery := range jqQueries { if reflect.TypeOf(jqQuery).Kind() == reflect.String { - queryRes, _ := ParseMapInterface(map[string]string{key: jqQuery.(string)}, obj) + queryRes, err := ParseMapInterface(map[string]string{key: jqQuery.(string)}, obj) + if err != nil { + return nil, err + } mapInterface = goutils.MergeMaps(mapInterface, queryRes) } else if reflect.TypeOf(jqQuery).Kind() == reflect.Map { for mapKey, mapValue := range jqQuery.(map[string]interface{}) { - queryRes, _ := ParseRelations(map[string]interface{}{mapKey: mapValue}, obj) + queryRes, err := ParseMapRecursively(map[string]interface{}{mapKey: mapValue}, obj) + if err != nil { + return nil, err + } for queryKey, queryVal := range queryRes { if mapInterface[key] == nil { mapInterface[key] = make(map[string]interface{}) @@ -139,13 +145,15 @@ func ParseRelations(jqQueries map[string]interface{}, obj interface{}) (map[stri jqArrayValue := reflect.ValueOf(jqQuery) relations := make([]interface{}, jqArrayValue.Len()) for i := 0; i < jqArrayValue.Len(); i++ { - relation, err := ParseRelations(map[string]interface{}{key: jqArrayValue.Index(i).Interface()}, obj) + relation, err := ParseMapRecursively(map[string]interface{}{key: jqArrayValue.Index(i).Interface()}, obj) if err != nil { return nil, err } relations[i] = relation[key] } mapInterface[key] = relations + } else { + return nil, fmt.Errorf("invalid jq query type '%T'", jqQuery) } } diff --git a/pkg/jq/parser_test.go b/pkg/jq/parser_test.go new file mode 100644 index 0000000..f091b14 --- /dev/null +++ b/pkg/jq/parser_test.go @@ -0,0 +1,84 @@ +package jq + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" + + "github.com/port-labs/port-k8s-exporter/pkg/port" + _ "github.com/port-labs/port-k8s-exporter/test_utils" +) + +var ( + blueprint = "k8s-export-test-bp" +) + +func TestJqSearchRelation(t *testing.T) { + + mapping := []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{}, + Relations: map[string]interface{}{ + "k8s-relation": map[string]interface{}{ + "combinator": "\"or\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"$identifier\"", + "operator": "\"=\"", + "value": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }, + }, + }, + } + res, _ := ParseMapRecursively(mapping[0].Relations, nil) + assert.Equal(t, res, map[string]interface{}{ + "k8s-relation": map[string]interface{}{ + "combinator": "or", + "rules": []interface{}{ + map[string]interface{}{ + "property": "$identifier", + "operator": "=", + "value": "e_AgPMYvq1tAs8TuqM", + }, + }, + }, + }) + +} + +func TestJqSearchIdentifier(t *testing.T) { + + mapping := []port.EntityMapping{ + { + Identifier: map[string]interface{}{ + "combinator": "\"and\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"prop1\"", + "operator": "\"in\"", + "value": ".values", + }, + }, + }, + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + }, + } + res, _ := ParseMapRecursively(mapping[0].Identifier.(map[string]interface{}), map[string]interface{}{"values": []string{"val1", "val2"}}) + assert.Equal(t, res, map[string]interface{}{ + "combinator": "and", + "rules": []interface{}{ + map[string]interface{}{ + "property": "prop1", + "operator": "in", + "value": []string{"val1", "val2"}, + }, + }, + }) + +} diff --git a/pkg/k8s/controller.go b/pkg/k8s/controller.go index 92d8b63..443e0b3 100644 --- a/pkg/k8s/controller.go +++ b/pkg/k8s/controller.go @@ -3,23 +3,19 @@ package k8s import ( "context" "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/goutils" + "github.com/port-labs/port-k8s-exporter/pkg/port/entity" + "reflect" "time" - "hash/fnv" - "strconv" - "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/jq" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" - "github.com/port-labs/port-k8s-exporter/pkg/port/mapping" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" - "encoding/json" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -42,13 +38,22 @@ type EventItem struct { ActionType EventActionType } +type SyncResult struct { + EntitiesSet map[string]interface{} + RawDataExamples []interface{} + ShouldDeleteStaleEntities bool +} + type Controller struct { - Resource port.AggregatedResource - portClient *cli.PortClient - integrationConfig *port.IntegrationAppConfig - informer cache.SharedIndexInformer - lister cache.GenericLister - workqueue workqueue.RateLimitingInterface + Resource port.AggregatedResource + portClient *cli.PortClient + integrationConfig *port.IntegrationAppConfig + informer cache.SharedIndexInformer + lister cache.GenericLister + eventHandler cache.ResourceEventHandlerRegistration + eventsWorkqueue workqueue.RateLimitingInterface + initialSyncWorkqueue workqueue.RateLimitingInterface + isInitialSyncDone bool } func NewController(resource port.AggregatedResource, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { @@ -58,22 +63,32 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI cli.WithDeleteDependents(integrationConfig.DeleteDependents)(portClient) cli.WithCreateMissingRelatedEntities(integrationConfig.CreateMissingRelatedEntities)(portClient) controller := &Controller{ - Resource: resource, - portClient: portClient, - integrationConfig: integrationConfig, - informer: informer.Informer(), - lister: informer.Lister(), - workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + Resource: resource, + portClient: portClient, + integrationConfig: integrationConfig, + informer: informer.Informer(), + lister: informer.Lister(), + initialSyncWorkqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + eventsWorkqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } - controller.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + controller.eventHandler, _ = controller.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { var err error var item EventItem item.ActionType = CreateAction item.Key, err = cache.MetaNamespaceKeyFunc(obj) - if err == nil { - controller.workqueue.Add(item) + if err != nil { + return + } + + if controller.isInitialSyncDone || controller.eventHandler.HasSynced() { + if !controller.isInitialSyncDone { + controller.isInitialSyncDone = true + } + controller.eventsWorkqueue.Add(item) + } else { + controller.initialSyncWorkqueue.Add(item) } }, UpdateFunc: func(old interface{}, new interface{}) { @@ -81,11 +96,12 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI var item EventItem item.ActionType = UpdateAction item.Key, err = cache.MetaNamespaceKeyFunc(new) - if err == nil { + if err != nil { + return + } - if controller.shouldSendUpdateEvent(old, new, integrationConfig.UpdateEntityOnlyOnDiff == nil || *(integrationConfig.UpdateEntityOnlyOnDiff)) { - controller.workqueue.Add(item) - } + if controller.shouldSendUpdateEvent(old, new, integrationConfig.UpdateEntityOnlyOnDiff == nil || *(integrationConfig.UpdateEntityOnlyOnDiff)) { + controller.eventsWorkqueue.Add(item) } }, DeleteFunc: func(obj interface{}) { @@ -93,11 +109,13 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI var item EventItem item.ActionType = DeleteAction item.Key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err == nil { - err = controller.objectHandler(obj, item) - if err != nil { - klog.Errorf("Error deleting item '%s' of resource '%s': %s", item.Key, resource.Kind, err.Error()) - } + if err != nil { + return + } + + _, err = controller.objectHandler(obj, item) + if err != nil { + klog.Errorf("Error deleting item '%s' of resource '%s': %s", item.Key, resource.Kind, err.Error()) } }, }) @@ -106,13 +124,11 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI } func (c *Controller) Shutdown() { - klog.Infof("Shutting down controller for resource '%s'", c.Resource.Kind) - c.workqueue.ShutDown() - klog.Infof("Closed controller for resource '%s'", c.Resource.Kind) + c.initialSyncWorkqueue.ShutDown() + c.eventsWorkqueue.ShutDown() } func (c *Controller) WaitForCacheSync(stopCh <-chan struct{}) error { - klog.Infof("Waiting for informer cache to sync for resource '%s'", c.Resource.Kind) if ok := cache.WaitForCacheSync(stopCh, c.informer.HasSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } @@ -120,106 +136,154 @@ func (c *Controller) WaitForCacheSync(stopCh <-chan struct{}) error { return nil } -func (c *Controller) Run(workers int, stopCh <-chan struct{}) { +func (c *Controller) RunInitialSync() *SyncResult { + entitiesSet := make(map[string]interface{}) + rawDataExamples := make([]interface{}, 0) + shouldDeleteStaleEntities := true + shouldContinue := true + requeueCounter := 0 + var requeueCounterDiff int + var syncResult *SyncResult + for shouldContinue && (requeueCounter > 0 || c.initialSyncWorkqueue.Len() > 0 || !c.eventHandler.HasSynced()) { + syncResult, requeueCounterDiff, shouldContinue = c.processNextWorkItem(c.initialSyncWorkqueue) + requeueCounter += requeueCounterDiff + if syncResult != nil { + entitiesSet = goutils.MergeMaps(entitiesSet, syncResult.EntitiesSet) + amountOfExamplesToAdd := min(len(syncResult.RawDataExamples), MaxRawDataExamplesToSend-len(rawDataExamples)) + rawDataExamples = append(rawDataExamples, syncResult.RawDataExamples[:amountOfExamplesToAdd]...) + shouldDeleteStaleEntities = shouldDeleteStaleEntities && syncResult.ShouldDeleteStaleEntities + } + } + + return &SyncResult{ + EntitiesSet: entitiesSet, + RawDataExamples: rawDataExamples, + ShouldDeleteStaleEntities: shouldDeleteStaleEntities, + } +} + +func (c *Controller) RunEventsSync(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - klog.Infof("Starting workers for resource '%s'", c.Resource.Kind) for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(func() { + shouldContinue := true + for shouldContinue { + _, _, shouldContinue = c.processNextWorkItem(c.eventsWorkqueue) + } + }, time.Second, stopCh) } - klog.Infof("Started workers for resource '%s'", c.Resource.Kind) } -func (c *Controller) runWorker() { - for c.processNextWorkItem() { +func (c *Controller) processNextWorkItem(workqueue workqueue.RateLimitingInterface) (*SyncResult, int, bool) { + permanentErrorSyncResult := &SyncResult{ + EntitiesSet: make(map[string]interface{}), + RawDataExamples: make([]interface{}, 0), + ShouldDeleteStaleEntities: false, } -} -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() + obj, shutdown := workqueue.Get() if shutdown { - return false + return permanentErrorSyncResult, 0, false } - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) + syncResult, requeueCounterDiff, err := func(obj interface{}) (*SyncResult, int, error) { + defer workqueue.Done(obj) + + numRequeues := workqueue.NumRequeues(obj) + requeueCounterDiff := 0 + if numRequeues > 0 { + requeueCounterDiff = -1 + } item, ok := obj.(EventItem) if !ok { - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected event item of resource '%s' in workqueue but got %#v", c.Resource.Kind, obj)) - return nil + workqueue.Forget(obj) + return permanentErrorSyncResult, requeueCounterDiff, fmt.Errorf("expected event item of resource '%s' in workqueue but got %#v", c.Resource.Kind, obj) } - if err := c.syncHandler(item); err != nil { - if c.workqueue.NumRequeues(obj) >= MaxNumRequeues { - utilruntime.HandleError(fmt.Errorf("error syncing '%s' of resource '%s': %s, give up after %d requeues", item.Key, c.Resource.Kind, err.Error(), MaxNumRequeues)) - return nil + syncResult, err := c.syncHandler(item) + if err != nil { + if numRequeues >= MaxNumRequeues { + workqueue.Forget(obj) + return syncResult, requeueCounterDiff, fmt.Errorf("error syncing '%s' of resource '%s': %s, give up after %d requeues", item.Key, c.Resource.Kind, err.Error(), MaxNumRequeues) } - c.workqueue.AddRateLimited(obj) - return fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error()) + if numRequeues == 0 { + requeueCounterDiff = 1 + } else { + requeueCounterDiff = 0 + } + workqueue.AddRateLimited(obj) + return nil, requeueCounterDiff, fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error()) } - c.workqueue.Forget(obj) - return nil + workqueue.Forget(obj) + return syncResult, requeueCounterDiff, nil }(obj) if err != nil { utilruntime.HandleError(err) - return true } - return true + return syncResult, requeueCounterDiff, true } -func (c *Controller) syncHandler(item EventItem) error { +func (c *Controller) syncHandler(item EventItem) (*SyncResult, error) { obj, exists, err := c.informer.GetIndexer().GetByKey(item.Key) if err != nil { - return fmt.Errorf("error fetching object with key '%s' from informer cache: %v", item.Key, err) + return nil, fmt.Errorf("error fetching object with key '%s' from informer cache: %v", item.Key, err) } if !exists { utilruntime.HandleError(fmt.Errorf("'%s' in work queue no longer exists", item.Key)) - return nil + return nil, nil } - err = c.objectHandler(obj, item) - if err != nil { - return fmt.Errorf("error handling object with key '%s': %v", item.Key, err) - } - - return nil + return c.objectHandler(obj, item) } -func (c *Controller) objectHandler(obj interface{}, item EventItem) error { - _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) - if err != nil { - return fmt.Errorf("error authenticating with Port: %v", err) - } - +func (c *Controller) objectHandler(obj interface{}, item EventItem) (*SyncResult, error) { errors := make([]error, 0) + entitiesSet := make(map[string]interface{}) + rawDataExamplesToReturn := make([]interface{}, 0) for _, kindConfig := range c.Resource.KindConfigs { - portEntities, _, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse) + portEntities, rawDataExamples, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse) if err != nil { + entitiesSet = nil utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err)) continue } + if rawDataExamplesToReturn != nil { + amountOfExamplesToAdd := min(len(rawDataExamples), MaxRawDataExamplesToSend-len(rawDataExamplesToReturn)) + rawDataExamplesToReturn = append(rawDataExamplesToReturn, rawDataExamples[:amountOfExamplesToAdd]...) + } + for _, portEntity := range portEntities { - err = c.entityHandler(portEntity, item.ActionType) + handledEntity, err := c.entityHandler(portEntity, item.ActionType) if err != nil { errors = append(errors, err) + entitiesSet = nil + } + + if entitiesSet != nil && item.ActionType != DeleteAction { + entitiesSet[c.portClient.GetEntityIdentifierKey(handledEntity)] = nil } } } + var err error if len(errors) > 0 { - return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors) + err = fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors) } - return nil + return &SyncResult{ + EntitiesSet: entitiesSet, + RawDataExamples: rawDataExamplesToReturn, + ShouldDeleteStaleEntities: entitiesSet != nil, + }, err } func isPassSelector(obj interface{}, selector port.Selector) (bool, error) { @@ -235,20 +299,7 @@ func isPassSelector(obj interface{}, selector port.Selector) (bool, error) { return selectorResult, err } -func mapEntities(obj interface{}, mappings []port.EntityMapping) ([]port.Entity, error) { - entities := make([]port.Entity, 0, len(mappings)) - for _, entityMapping := range mappings { - portEntity, err := mapping.NewEntity(obj, entityMapping) - if err != nil { - return nil, fmt.Errorf("invalid entity mapping '%#v': %v", entityMapping, err) - } - entities = append(entities, *portEntity) - } - - return entities, nil -} - -func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, mappings []port.EntityMapping, itemsToParse string) ([]port.Entity, []interface{}, error) { +func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, mappings []port.EntityMapping, itemsToParse string) ([]port.EntityRequest, []interface{}, error) { unstructuredObj, ok := obj.(*unstructured.Unstructured) if !ok { return nil, nil, fmt.Errorf("error casting to unstructured") @@ -259,7 +310,7 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, return nil, nil, fmt.Errorf("error converting from unstructured: %v", err) } - entities := make([]port.Entity, 0, len(mappings)) + entities := make([]port.EntityRequest, 0, len(mappings)) objectsToMap := make([]interface{}, 0) if itemsToParse == "" { @@ -297,7 +348,7 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, if *c.integrationConfig.SendRawDataExamples && len(rawDataExamples) < MaxRawDataExamplesToSend { rawDataExamples = append(rawDataExamples, objectToMap) } - currentEntities, err := mapEntities(objectToMap, mappings) + currentEntities, err := entity.MapEntities(objectToMap, mappings) if err != nil { return nil, nil, err } @@ -309,117 +360,42 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, return entities, rawDataExamples, nil } -func checkIfOwnEntity(entity port.Entity, portClient *cli.PortClient) (*bool, error) { - portEntities, err := portClient.SearchEntities(context.Background(), port.SearchBody{ - Rules: []port.Rule{ - { - Property: "$datasource", - Operator: "contains", - Value: "port-k8s-exporter", - }, - { - Property: "$identifier", - Operator: "=", - Value: entity.Identifier, - }, - { - Property: "$datasource", - Operator: "contains", - Value: fmt.Sprintf("statekey/%s", config.ApplicationConfig.StateKey), - }, - { - Property: "$blueprint", - Operator: "=", - Value: entity.Blueprint, - }, - }, - Combinator: "and", - }) +func (c *Controller) entityHandler(portEntity port.EntityRequest, action EventActionType) (*port.Entity, error) { + _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) if err != nil { - return nil, err + return nil, fmt.Errorf("error authenticating with Port: %v", err) } - if len(portEntities) > 0 { - result := true - return &result, nil - } - result := false - return &result, nil -} - -func (c *Controller) entityHandler(portEntity port.Entity, action EventActionType) error { switch action { case CreateAction, UpdateAction: - _, err := c.portClient.CreateEntity(context.Background(), &portEntity, "", c.portClient.CreateMissingRelatedEntities) + upsertedEntity, err := c.portClient.CreateEntity(context.Background(), &portEntity, "", c.portClient.CreateMissingRelatedEntities) if err != nil { - return fmt.Errorf("error upserting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) + return nil, fmt.Errorf("error upserting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) } - klog.V(0).Infof("Successfully upserted entity '%s' of blueprint '%s'", portEntity.Identifier, portEntity.Blueprint) + klog.V(0).Infof("Successfully upserted entity '%s' of blueprint '%s'", upsertedEntity.Identifier, upsertedEntity.Blueprint) + return upsertedEntity, nil case DeleteAction: - result, err := checkIfOwnEntity(portEntity, c.portClient) + if reflect.TypeOf(portEntity.Identifier).Kind() != reflect.String { + return nil, nil + } + + result, err := entity.CheckIfOwnEntity(portEntity, c.portClient) if err != nil { - return fmt.Errorf("error checking if entity '%s' of blueprint '%s' is owned by this exporter: %v", portEntity.Identifier, portEntity.Blueprint, err) + return nil, fmt.Errorf("error checking if entity '%s' of blueprint '%s' is owned by this exporter: %v", portEntity.Identifier, portEntity.Blueprint, err) } if *result { - err := c.portClient.DeleteEntity(context.Background(), portEntity.Identifier, portEntity.Blueprint, c.portClient.DeleteDependents) + err := c.portClient.DeleteEntity(context.Background(), portEntity.Identifier.(string), portEntity.Blueprint, c.portClient.DeleteDependents) if err != nil { - return fmt.Errorf("error deleting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) + return nil, fmt.Errorf("error deleting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) } klog.V(0).Infof("Successfully deleted entity '%s' of blueprint '%s'", portEntity.Identifier, portEntity.Blueprint) } else { klog.Warningf("trying to delete entity but didn't find it in port with this exporter ownership, entity id: '%s', blueprint:'%s'", portEntity.Identifier, portEntity.Blueprint) } - } - return nil -} - -func (c *Controller) GetEntitiesSet() (map[string]interface{}, []interface{}, error) { - k8sEntitiesSet := map[string]interface{}{} - objects, err := c.lister.List(labels.Everything()) - if err != nil { - return nil, nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.Resource.Kind, err) - } - - rawDataExamples := make([]interface{}, 0) - for _, obj := range objects { - for _, kindConfig := range c.Resource.KindConfigs { - mappings := make([]port.EntityMapping, 0, len(kindConfig.Port.Entity.Mappings)) - for _, m := range kindConfig.Port.Entity.Mappings { - mappings = append(mappings, port.EntityMapping{ - Identifier: m.Identifier, - Blueprint: m.Blueprint, - }) - } - entities, examples, err := c.getObjectEntities(obj, kindConfig.Selector, mappings, kindConfig.Port.ItemsToParse) - if err != nil { - return nil, nil, fmt.Errorf("error getting entities of object: %v", err) - } - for _, entity := range entities { - k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil - } - rawDataExamples = append(rawDataExamples, examples[:min(len(examples), MaxRawDataExamplesToSend-len(rawDataExamples))]...) - } - } - - return k8sEntitiesSet, rawDataExamples, nil -} - -func hashAllEntities(entities []port.Entity) (string, error) { - h := fnv.New64a() - for _, entity := range entities { - entityBytes, err := json.Marshal(entity) - if err != nil { - return "", err - } - _, err = h.Write(entityBytes) - if err != nil { - return "", err - } - } - return strconv.FormatUint(h.Sum64(), 10), nil + return nil, nil } func (c *Controller) shouldSendUpdateEvent(old interface{}, new interface{}, updateEntityOnlyOnDiff bool) bool { @@ -438,12 +414,12 @@ func (c *Controller) shouldSendUpdateEvent(old interface{}, new interface{}, upd klog.Errorf("Error getting new entities: %v", err) return true } - oldEntitiesHash, err := hashAllEntities(oldEntities) + oldEntitiesHash, err := entity.HashAllEntities(oldEntities) if err != nil { klog.Errorf("Error hashing old entities: %v", err) return true } - newEntitiesHash, err := hashAllEntities(newEntities) + newEntitiesHash, err := entity.HashAllEntities(newEntities) if err != nil { klog.Errorf("Error hashing new entities: %v", err) return true diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index a4cdf74..7f3475f 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -3,36 +3,38 @@ package k8s import ( "context" "fmt" + guuid "github.com/google/uuid" + "github.com/port-labs/port-k8s-exporter/pkg/signal" "reflect" "strings" "testing" "time" - "github.com/port-labs/port-k8s-exporter/pkg/jq" - "github.com/stretchr/testify/assert" - "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/port" _ "github.com/port-labs/port-k8s-exporter/test_utils" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - k8sfake "k8s.io/client-go/dynamic/fake" - + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" + k8sfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/tools/cache" ) var ( noResyncPeriodFunc = func() time.Duration { return 0 } + blueprint = "k8s-export-test-bp" ) type fixture struct { t *testing.T controller *Controller + kubeClient *k8sfake.FakeDynamicClient } type fixtureConfig struct { @@ -41,7 +43,7 @@ type fixtureConfig struct { stateKey string sendRawDataExamples *bool resource port.Resource - objects []runtime.Object + existingObjects []runtime.Object } func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { @@ -57,7 +59,6 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { SendRawDataExamples: sendRawDataExamples, Resources: []port.Resource{fixtureConfig.resource}, } - kubeclient := k8sfake.NewSimpleDynamicClient(runtime.NewScheme()) newConfig := &config.ApplicationConfiguration{ ConfigFilePath: config.ApplicationConfig.ConfigFilePath, @@ -85,9 +86,14 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { newConfig.StateKey = fixtureConfig.stateKey } + kubeClient := k8sfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), newGvrToListKind(), fixtureConfig.existingObjects...) + controller := newController(t, fixtureConfig.resource, kubeClient, interationConfig, newConfig) + controller.portClient.Authenticate(context.Background(), newConfig.PortClientId, newConfig.PortClientSecret) + return &fixture{ t: t, - controller: newController(fixtureConfig.resource, fixtureConfig.objects, kubeclient, interationConfig, newConfig), + controller: controller, + kubeClient: kubeClient, } } @@ -110,6 +116,10 @@ func newDeployment() *appsv1.Deployment { "app": "port-k8s-exporter", } return &appsv1.Deployment{ + TypeMeta: v1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, ObjectMeta: v1.ObjectMeta{ Name: "port-k8s-exporter", Namespace: "port-k8s-exporter", @@ -141,6 +151,10 @@ func newDeploymentWithCustomLabels(generation int64, labels map[string]string, ) *appsv1.Deployment { return &appsv1.Deployment{ + TypeMeta: v1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, ObjectMeta: v1.ObjectMeta{ Name: "port-k8s-exporter", Namespace: "port-k8s-exporter", @@ -177,48 +191,31 @@ func newUnstructured(obj interface{}) *unstructured.Unstructured { return &unstructured.Unstructured{Object: res} } -func newController(resource port.Resource, objects []runtime.Object, kubeclient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { - k8sI := dynamicinformer.NewDynamicSharedInformerFactory(kubeclient, noResyncPeriodFunc()) - s := strings.SplitN(resource.Kind, "/", 3) - gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} - informer := k8sI.ForResource(gvr) - kindConfig := port.KindConfig{Selector: resource.Selector, Port: resource.Port} - c := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, informer, integrationConfig, applicationConfig) - - for _, d := range objects { - informer.Informer().GetIndexer().Add(d) +func newGvrToListKind() map[schema.GroupVersionResource]string { + return map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", } - - return c } -func (f *fixture) runControllerSyncHandler(item EventItem, expectError bool) { - err := f.controller.syncHandler(item) - if !expectError && err != nil { - f.t.Errorf("error syncing item: %v", err) - } else if expectError && err == nil { - f.t.Error("expected error syncing item, got nil") - } - -} +func newController(t *testing.T, resource port.Resource, kubeClient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(kubeClient, noResyncPeriodFunc()) + gvr := getGvr(resource.Kind) + informer := informerFactory.ForResource(gvr) + kindConfig := port.KindConfig{Selector: resource.Selector, Port: resource.Port} + controller := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, informer, integrationConfig, applicationConfig) + ctx := context.Background() -func (f *fixture) runControllerGetEntitiesSet(expectedEntitiesSet map[string]interface{}, expectedExamples []interface{}, expectError bool) { - entitiesSet, examples, err := f.controller.GetEntitiesSet() - if !expectError && err != nil { - f.t.Errorf("error syncing item: %v", err) - } else if expectError && err == nil { - f.t.Error("expected error syncing item, got nil") + informerFactory.Start(ctx.Done()) + if synced := informerFactory.WaitForCacheSync(ctx.Done()); !synced[gvr] { + t.Errorf("informer for %s hasn't synced", gvr) } - eq := reflect.DeepEqual(entitiesSet, expectedEntitiesSet) - if !eq { - f.t.Errorf("expected entities set: %v, got: %v", expectedEntitiesSet, entitiesSet) - } + return controller +} - eq = reflect.DeepEqual(examples, expectedExamples) - if !eq { - f.t.Errorf("expected raw data examples: %v, got: %v", expectedExamples, examples) - } +func getGvr(kind string) schema.GroupVersionResource { + s := strings.SplitN(kind, "/", 3) + return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} } func getKey(deployment *appsv1.Deployment, t *testing.T) string { @@ -230,13 +227,11 @@ func getKey(deployment *appsv1.Deployment, t *testing.T) string { return key } -func TestCreateDeployment(t *testing.T) { - d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} - resource := newResource("", []port.EntityMapping{ +func getBaseDeploymentResource() port.Resource { + return newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ @@ -251,135 +246,259 @@ func TestCreateDeployment(t *testing.T) { }, }, }) - item := EventItem{Key: getKey(d, t), ActionType: CreateAction} +} + +func (f *fixture) createObjects(objects []*unstructured.Unstructured) { + gvr := getGvr(f.controller.Resource.Kind) + currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() + if objects != nil { + for _, d := range objects { + _, err := f.kubeClient.Resource(gvr).Namespace(d.GetNamespace()).Create(context.TODO(), d, metav1.CreateOptions{}) + if err != nil { + f.t.Errorf("error creating object %s: %v", d.GetName(), err) + } + } - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + assert.Eventually(f.t, func() bool { + return f.controller.eventsWorkqueue.Len() == currentNumEventsInQueue+len(objects) + }, time.Second*2, time.Millisecond*100) + } } -func TestJqSearchRelation(t *testing.T) { +func (f *fixture) updateObjects(objects []*unstructured.Unstructured) { + gvr := getGvr(f.controller.Resource.Kind) + currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() + if objects != nil { + for _, d := range objects { + _, err := f.kubeClient.Resource(gvr).Namespace(d.GetNamespace()).Update(context.TODO(), d, metav1.UpdateOptions{}) + if err != nil { + f.t.Errorf("error updating object %s: %v", d.GetName(), err) + } + } - mapping := []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{}, - Relations: map[string]interface{}{ - "k8s-relation": map[string]interface{}{ - "combinator": "\"or\"", - "rules": []interface{}{ - map[string]interface{}{ - "property": "\"$identifier\"", - "operator": "\"=\"", - "value": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }, - }, - }, + assert.Eventually(f.t, func() bool { + return f.controller.eventsWorkqueue.Len() == currentNumEventsInQueue+len(objects) + }, time.Second*2, time.Millisecond*100) } - res, _ := jq.ParseRelations(mapping[0].Relations, nil) - assert.Equal(t, res, map[string]interface{}{ - "k8s-relation": map[string]interface{}{ - "combinator": "or", - "rules": []interface{}{ - map[string]interface{}{ - "property": "$identifier", - "operator": "=", - "value": "e_AgPMYvq1tAs8TuqM", - }, - }, - }, +} + +func (f *fixture) deleteObjects(objects []struct{ namespace, name string }) { + gvr := getGvr(f.controller.Resource.Kind) + if objects != nil { + for _, d := range objects { + err := f.kubeClient.Resource(gvr).Namespace(d.namespace).Delete(context.TODO(), d.name, metav1.DeleteOptions{}) + if err != nil { + f.t.Errorf("error deleting object %s: %v", d.name, err) + } + } + } +} + +func (f *fixture) assertSyncResult(result *SyncResult, expectedResult *SyncResult) { + assert.True(f.t, reflect.DeepEqual(result.EntitiesSet, expectedResult.EntitiesSet), fmt.Sprintf("expected entities set: %v, got: %v", expectedResult.EntitiesSet, result.EntitiesSet)) + assert.True(f.t, reflect.DeepEqual(result.RawDataExamples, expectedResult.RawDataExamples), fmt.Sprintf("expected raw data examples: %v, got: %v", expectedResult.RawDataExamples, result.RawDataExamples)) + assert.True(f.t, result.ShouldDeleteStaleEntities == expectedResult.ShouldDeleteStaleEntities, fmt.Sprintf("expected should delete stale entities: %v, got: %v", expectedResult.ShouldDeleteStaleEntities, result.ShouldDeleteStaleEntities)) +} + +func (f *fixture) runControllerSyncHandler(item EventItem, expectedResult *SyncResult, expectError bool) { + syncResult, err := f.controller.syncHandler(item) + if !expectError && err != nil { + f.t.Errorf("error syncing item: %v", err) + } else if expectError && err == nil { + f.t.Error("expected error syncing item, got nil") + } + + f.assertSyncResult(syncResult, expectedResult) +} + +func (f *fixture) runControllerInitialSync(expectedResult *SyncResult) { + syncResult := f.controller.RunInitialSync() + + f.assertSyncResult(syncResult, expectedResult) +} + +func (f *fixture) runControllerEventsSync() { + f.controller.RunEventsSync(1, signal.SetupSignalHandler()) +} + +func TestSuccessfulRunInitialSync(t *testing.T) { + ud1 := newUnstructured(newDeployment()) + ud1.SetName("deployment1") + ud2 := newUnstructured(newDeployment()) + ud2.SetName("deployment2") + f := newFixture(t, &fixtureConfig{resource: getBaseDeploymentResource(), existingObjects: []runtime.Object{ud1, ud2}}) + f.runControllerInitialSync(&SyncResult{ + EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, ud1.GetName()): nil, fmt.Sprintf("%s;%s", blueprint, ud2.GetName()): nil}, + RawDataExamples: []interface{}{ud1.Object, ud2.Object}, ShouldDeleteStaleEntities: true, }) +} +func TestRunInitialSyncWithSelectorQuery(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + notSelectedResource := getBaseDeploymentResource() + notSelectedResource.Selector.Query = ".metadata.name != \"port-k8s-exporter\"" + f := newFixture(t, &fixtureConfig{resource: notSelectedResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: true}) } -func TestCreateDeploymentWithSearchRelation(t *testing.T) { +func TestRunInitialSyncWithBadPropMapping(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} - item := EventItem{Key: getKey(d, t), ActionType: CreateAction} + ud := newUnstructured(d) + badPropMappingResource := getBaseDeploymentResource() + badPropMappingResource.Port.Entity.Mappings[0].Properties["text"] = "bad-jq" + f := newFixture(t, &fixtureConfig{resource: badPropMappingResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: false}) +} + +func TestRunInitialSyncWithBadEntity(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + badEntityResource := getBaseDeploymentResource() + badEntityResource.Port.Entity.Mappings[0].Identifier = "\"!@#\"" + f := newFixture(t, &fixtureConfig{resource: badEntityResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: false}) +} + +func TestRunInitialSyncWithBadSelector(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + badSelectorResource := getBaseDeploymentResource() + badSelectorResource.Selector.Query = "bad-jq" + f := newFixture(t, &fixtureConfig{resource: badSelectorResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: false}) +} + +func TestRunEventsSyncWithCreateEvent(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + id := guuid.NewString() resource := newResource("", []port.EntityMapping{ { - Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": map[string]interface{}{ - "combinator": "\"or\"", - "rules": []interface{}{ - map[string]interface{}{ - "property": "\"$identifier\"", - "operator": "\"=\"", - "value": "\"e_AgPMYvq1tAs8TuqM\"", - }, - map[string]interface{}{ - "property": "\"$identifier\"", - "operator": "\"=\"", - "value": ".metadata.name", - }, - }, - }, - }, + Identifier: fmt.Sprintf("\"%s\"", id), + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{}}) + + f.createObjects([]*unstructured.Unstructured{ud}) + defer f.controller.portClient.DeleteEntity(context.Background(), id, blueprint, true) + f.runControllerEventsSync() + + assert.Eventually(t, func() bool { + _, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil + }, time.Second*5, time.Millisecond*500) } -func TestUpdateDeployment(t *testing.T) { +func TestRunEventsSyncWithUpdateEvent(t *testing.T) { + id := guuid.NewString() + resource := getBaseDeploymentResource() + resource.Port.Entity.Mappings[0].Identifier = fmt.Sprintf("\"%s\"", id) + resource.Port.Entity.Mappings[0].Properties["bool"] = ".spec.selector.matchLabels.app == \"new-label\"" + d := newDeployment() + ud := newUnstructured(d) + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) + + defer f.controller.portClient.DeleteEntity(context.Background(), id, blueprint, true) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, id): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) + assert.Eventually(t, func() bool { + entity, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil && entity.Properties["bool"] == false + }, time.Second*5, time.Millisecond*500) + + d.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects([]*unstructured.Unstructured{newUnstructured(d)}) + f.runControllerEventsSync() + + assert.Eventually(t, func() bool { + entity, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil && entity.Properties["bool"] == true + }, time.Second*5, time.Millisecond*500) +} + +func TestRunEventsSyncWithDeleteEvent(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) resource := newResource("", []port.EntityMapping{ { - Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", - Icon: "\"Microservice\"", - Team: "[\"Test\", \"Test2\"]", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", - }, + Identifier: "\"entityToBeDeleted\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) - item := EventItem{Key: getKey(d, t), ActionType: UpdateAction} + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) + + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, "entityToBeDeleted"): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) + f.deleteObjects([]struct{ namespace, name string }{{namespace: d.Namespace, name: d.Name}}) + + assert.Eventually(t, func() bool { + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityToBeDeleted", blueprint) + return err != nil && strings.Contains(err.Error(), "was not found") + }, time.Second*5, time.Millisecond*500) - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) +} + +func TestCreateDeployment(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + resource := getBaseDeploymentResource() + item := EventItem{Key: getKey(d, t), ActionType: CreateAction} + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) +} + +func TestCreateDeploymentWithSearchRelation(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + item := EventItem{Key: getKey(d, t), ActionType: CreateAction} + resource := getBaseDeploymentResource() + resource.Port.Entity.Mappings[0].Relations = map[string]interface{}{ + "k8s-relation": map[string]interface{}{ + "combinator": "\"or\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"$identifier\"", + "operator": "\"=\"", + "value": "\"e_AgPMYvq1tAs8TuqM\"", + }, + map[string]interface{}{ + "property": "\"$identifier\"", + "operator": "\"=\"", + "value": ".metadata.name", + }, + }, + }, + } + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) +} + +func TestUpdateDeployment(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + resource := getBaseDeploymentResource() + item := EventItem{Key: getKey(d, t), ActionType: UpdateAction} + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestDeleteDeploymentSameOwner(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) resource := newResource("", []port.EntityMapping{ { Identifier: "\"entityWithSameOwner\"", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) - f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, objects: objects}) - f.runControllerSyncHandler(createItem, false) + f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithSameOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) - f.runControllerSyncHandler(item, false) - _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithSameOwner", "k8s-export-test-bp") + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithSameOwner", blueprint) if err != nil && !strings.Contains(err.Error(), "was not found") { t.Errorf("expected entity to be deleted") } @@ -387,21 +506,21 @@ func TestDeleteDeploymentSameOwner(t *testing.T) { func TestDeleteDeploymentDifferentOwner(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) resource := newResource("", []port.EntityMapping{ { Identifier: "\"entityWithDifferentOwner\"", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} + f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, existingObjects: []runtime.Object{ud}}) - f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, objects: objects}) - f.runControllerSyncHandler(createItem, false) + f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithDifferentOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) - f.runControllerSyncHandler(item, false) - _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithDifferentOwner", "k8s-export-test-bp") + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithDifferentOwner", blueprint) if err != nil && strings.Contains(err.Error(), "was not found") { t.Errorf("expected entity to exist") } @@ -409,74 +528,44 @@ func TestDeleteDeploymentDifferentOwner(t *testing.T) { func TestSelectorQueryFilterDeployment(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) resource := newResource(".metadata.name != \"port-k8s-exporter\"", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"wrong-k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"wrong-%s\"", blueprint), }, }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: true}, false) } func TestFailPortAuth(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - - f := newFixture(t, &fixtureConfig{portClientId: "wrongclientid", portClientSecret: "wrongclientsecret", resource: resource, objects: objects}) - f.runControllerSyncHandler(item, true) + f := newFixture(t, &fixtureConfig{portClientId: "wrongclientid", portClientSecret: "wrongclientsecret", resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: nil, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: false}, true) } func TestFailDeletePortEntity(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"wrong-k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"wrong-%s\"", blueprint), }, }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) -} - -func TestGetEntitiesSet(t *testing.T) { - d := newUnstructured(newDeployment()) - var structuredObj interface{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(d.Object, &structuredObj) - if err != nil { - t.Errorf("Error from unstructured: %s", err.Error()) - } - - objects := []runtime.Object{d} - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", - }, - }) - expectedEntitiesSet := map[string]interface{}{ - "k8s-export-test-bp;port-k8s-exporter": nil, - } - - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerGetEntitiesSet(expectedEntitiesSet, []interface{}{structuredObj}, false) - - sendRawDataExamples := false - f = newFixture(t, &fixtureConfig{sendRawDataExamples: &sendRawDataExamples, resource: resource, objects: objects}) - f.runControllerGetEntitiesSet(expectedEntitiesSet, []interface{}{}, false) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { @@ -490,7 +579,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ @@ -508,7 +597,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{}, @@ -516,7 +605,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { }, { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ @@ -534,7 +623,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { for _, mapping := range fullMapping { - controllerWithFullMapping := newFixture(t, &fixtureConfig{resource: mapping, objects: []runtime.Object{}}).controller + controllerWithFullMapping := newFixture(t, &fixtureConfig{resource: mapping, existingObjects: []runtime.Object{}}).controller // Test changes in each individual property properties := map[string]Property{ @@ -572,3 +661,48 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { assert.True(t, result, fmt.Sprintf("Expected true when labels changes and feature flag is off")) } } + +func TestCreateDeploymentWithSearchIdentifier(t *testing.T) { + id := guuid.NewString() + randTxt := guuid.NewString() + d := newDeployment() + ud := newUnstructured(d) + resource := getBaseDeploymentResource() + resource.Port.Entity.Mappings[0].Identifier = fmt.Sprintf("\"%s\"", id) + resource.Port.Entity.Mappings[0].Properties["text"] = fmt.Sprintf("\"%s\"", randTxt) + resource.Port.Entity.Mappings[0].Properties["bool"] = "true" + item := EventItem{Key: getKey(d, t), ActionType: CreateAction} + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, id): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + + entity, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + if err != nil { + t.Errorf("error reading entity: %v", err) + } + assert.True(t, entity.Properties["bool"] == true, fmt.Sprintf("expected bool to be true, got: %v", entity.Properties["bool"])) + + item = EventItem{Key: getKey(d, t), ActionType: UpdateAction} + resource.Port.Entity.Mappings[0].Identifier = map[string]interface{}{ + "combinator": "\"and\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"text\"", + "operator": "\"=\"", + "value": fmt.Sprintf("\"%s\"", randTxt), + }, + }} + resource.Port.Entity.Mappings[0].Properties["bool"] = "false" + f = newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, id): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + + entity, err = f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + if err != nil { + t.Errorf("error reading entity: %v", err) + } + assert.True(t, entity.Properties["bool"] == false, fmt.Sprintf("expected bool to be false, got: %v", entity.Properties["bool"])) + + deleteItem := EventItem{Key: getKey(d, t), ActionType: DeleteAction} + f.runControllerSyncHandler(deleteItem, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) +} diff --git a/pkg/port/cli/entity.go b/pkg/port/cli/entity.go index 7d3ce53..6103358 100644 --- a/pkg/port/cli/entity.go +++ b/pkg/port/cli/entity.go @@ -51,7 +51,7 @@ func (c *PortClient) ReadEntity(ctx context.Context, id string, blueprint string return &pb.Entity, nil } -func (c *PortClient) CreateEntity(ctx context.Context, e *port.Entity, runID string, createMissingRelatedEntities bool) (*port.Entity, error) { +func (c *PortClient) CreateEntity(ctx context.Context, e *port.EntityRequest, runID string, createMissingRelatedEntities bool) (*port.Entity, error) { pb := &port.ResponseBody{} resp, err := c.Client.R(). SetBody(e). diff --git a/pkg/port/cli/integration.go b/pkg/port/cli/integration.go index 3130a46..dad75f3 100644 --- a/pkg/port/cli/integration.go +++ b/pkg/port/cli/integration.go @@ -96,3 +96,17 @@ func (c *PortClient) PostIntegrationKindExample(stateKey string, kind string, ex } return nil } + +func (c *PortClient) GetIntegrationKinds(stateKey string) (map[string]port.IntegrationKind, error) { + pb := &port.IntegrationKindsResponse{} + resp, err := c.Client.R(). + SetResult(&pb). + Get(fmt.Sprintf("v1/integration/%s/kinds", stateKey)) + if err != nil { + return nil, err + } + if !pb.OK { + return nil, fmt.Errorf("failed to get integration kinds, got: %s", resp.Body()) + } + return pb.Data, nil +} diff --git a/pkg/port/entity/entity.go b/pkg/port/entity/entity.go new file mode 100644 index 0000000..f3ed326 --- /dev/null +++ b/pkg/port/entity/entity.go @@ -0,0 +1,130 @@ +package entity + +import ( + "context" + "encoding/json" + "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/jq" + "github.com/port-labs/port-k8s-exporter/pkg/port" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "hash/fnv" + "reflect" + "strconv" +) + +func CheckIfOwnEntity(entity port.EntityRequest, portClient *cli.PortClient) (*bool, error) { + portEntities, err := portClient.SearchEntities(context.Background(), port.SearchBody{ + Rules: []port.Rule{ + { + Property: "$datasource", + Operator: "contains", + Value: "port-k8s-exporter", + }, + { + Property: "$identifier", + Operator: "=", + Value: entity.Identifier, + }, + { + Property: "$datasource", + Operator: "contains", + Value: fmt.Sprintf("statekey/%s", config.ApplicationConfig.StateKey), + }, + { + Property: "$blueprint", + Operator: "=", + Value: entity.Blueprint, + }, + }, + Combinator: "and", + }) + if err != nil { + return nil, err + } + + if len(portEntities) > 0 { + result := true + return &result, nil + } + result := false + return &result, nil +} + +func HashAllEntities(entities []port.EntityRequest) (string, error) { + h := fnv.New64a() + for _, entity := range entities { + entityBytes, err := json.Marshal(entity) + if err != nil { + return "", err + } + _, err = h.Write(entityBytes) + if err != nil { + return "", err + } + } + return strconv.FormatUint(h.Sum64(), 10), nil +} + +func MapEntities(obj interface{}, mappings []port.EntityMapping) ([]port.EntityRequest, error) { + entities := make([]port.EntityRequest, 0, len(mappings)) + for _, entityMapping := range mappings { + portEntity, err := newEntityRequest(obj, entityMapping) + if err != nil { + return nil, fmt.Errorf("invalid entity mapping '%#v': %v", entityMapping, err) + } + entities = append(entities, *portEntity) + } + + return entities, nil +} + +func newEntityRequest(obj interface{}, mapping port.EntityMapping) (*port.EntityRequest, error) { + var err error + entity := &port.EntityRequest{} + + if reflect.TypeOf(mapping.Identifier).Kind() == reflect.String { + entity.Identifier, err = jq.ParseString(mapping.Identifier.(string), obj) + } else if reflect.TypeOf(mapping.Identifier).Kind() == reflect.Map { + entity.Identifier, err = jq.ParseMapRecursively(mapping.Identifier.(map[string]interface{}), obj) + } else { + return nil, fmt.Errorf("invalid identifier type '%T'", mapping.Identifier) + } + + if err != nil { + return nil, err + } + if mapping.Title != "" { + entity.Title, err = jq.ParseString(mapping.Title, obj) + if err != nil { + return nil, err + } + } + entity.Blueprint, err = jq.ParseString(mapping.Blueprint, obj) + if err != nil { + return nil, err + } + if mapping.Team != "" { + entity.Team, err = jq.ParseInterface(mapping.Team, obj) + if err != nil { + return nil, err + } + } + if mapping.Icon != "" { + entity.Icon, err = jq.ParseString(mapping.Icon, obj) + if err != nil { + return nil, err + } + } + entity.Properties, err = jq.ParseMapInterface(mapping.Properties, obj) + if err != nil { + return nil, err + } + entity.Relations, err = jq.ParseMapRecursively(mapping.Relations, obj) + if err != nil { + return nil, err + } + + return entity, err + +} diff --git a/pkg/port/mapping/entity.go b/pkg/port/mapping/entity.go deleted file mode 100644 index 1cb4d70..0000000 --- a/pkg/port/mapping/entity.go +++ /dev/null @@ -1,48 +0,0 @@ -package mapping - -import ( - "github.com/port-labs/port-k8s-exporter/pkg/jq" - "github.com/port-labs/port-k8s-exporter/pkg/port" -) - -func NewEntity(obj interface{}, mapping port.EntityMapping) (*port.Entity, error) { - var err error - entity := &port.Entity{} - entity.Identifier, err = jq.ParseString(mapping.Identifier, obj) - if err != nil { - return &port.Entity{}, err - } - if mapping.Title != "" { - entity.Title, err = jq.ParseString(mapping.Title, obj) - if err != nil { - return &port.Entity{}, err - } - } - entity.Blueprint, err = jq.ParseString(mapping.Blueprint, obj) - if err != nil { - return &port.Entity{}, err - } - if mapping.Team != "" { - entity.Team, err = jq.ParseInterface(mapping.Team, obj) - if err != nil { - return &port.Entity{}, err - } - } - if mapping.Icon != "" { - entity.Icon, err = jq.ParseString(mapping.Icon, obj) - if err != nil { - return &port.Entity{}, err - } - } - entity.Properties, err = jq.ParseMapInterface(mapping.Properties, obj) - if err != nil { - return &port.Entity{}, err - } - entity.Relations, err = jq.ParseRelations(mapping.Relations, obj) - if err != nil { - return &port.Entity{}, err - } - - return entity, err - -} diff --git a/pkg/port/models.go b/pkg/port/models.go index 7ddfb0d..8e2edff 100644 --- a/pkg/port/models.go +++ b/pkg/port/models.go @@ -38,6 +38,15 @@ type ( UpdatedAt *time.Time `json:"updatedAt,omitempty"` } + Example struct { + Id string `json:"_id,omitempty"` + Data map[string]any `json:"data,omitempty"` + } + + IntegrationKind struct { + Examples []Example `json:"examples"` + } + Property struct { Type string `json:"type,omitempty"` Title string `json:"title,omitempty"` @@ -220,9 +229,14 @@ type ResponseBody struct { Pages Page `json:"pages"` } +type IntegrationKindsResponse struct { + OK bool `json:"ok"` + Data map[string]IntegrationKind `json:"data"` +} + type EntityMapping struct { - Identifier string `json:"identifier" yaml:"identifier"` - Title string `json:"title" yaml:"title"` + Identifier interface{} `json:"identifier" yaml:"identifier"` + Title string `json:"title,omitempty" yaml:"title,omitempty"` Blueprint string `json:"blueprint" yaml:"blueprint"` Icon string `json:"icon,omitempty" yaml:"icon,omitempty"` Team string `json:"team,omitempty" yaml:"team,omitempty"` @@ -230,6 +244,16 @@ type EntityMapping struct { Relations map[string]interface{} `json:"relations,omitempty" yaml:"relations,omitempty"` } +type EntityRequest struct { + Identifier interface{} `json:"identifier" yaml:"identifier"` + Title string `json:"title,omitempty" yaml:"title,omitempty"` + Blueprint string `json:"blueprint" yaml:"blueprint"` + Icon string `json:"icon,omitempty" yaml:"icon,omitempty"` + Team interface{} `json:"team,omitempty" yaml:"team,omitempty"` + Properties map[string]interface{} `json:"properties,omitempty" yaml:"properties,omitempty"` + Relations map[string]interface{} `json:"relations,omitempty" yaml:"relations,omitempty"` +} + type EntityMappings struct { Mappings []EntityMapping `json:"mappings" yaml:"mappings"` }