Skip to content

Commit

Permalink
DELETE op
Browse files Browse the repository at this point in the history
  • Loading branch information
krhitesh7 committed Apr 19, 2024
1 parent 5a643f3 commit 30aafe0
Showing 1 changed file with 47 additions and 23 deletions.
70 changes: 47 additions & 23 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package cache
import (
"context"
"fmt"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"

Check failure on line 20 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 20 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Check failure on line 26 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 26 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
Expand Down Expand Up @@ -95,7 +97,7 @@ type SnapshotCache interface {

BatchUpsertResources(ctx context.Context, typ string, resourcesUpserted map[string]map[string]types.Resource) error

DeleteResources(ctx context.Context, node string, typ string, resourcesDeleted map[string]types.Resource) error
DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error

Check failure on line 100 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 100 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
}

type snapshotCache struct {
Expand Down Expand Up @@ -360,37 +362,59 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
return nil
}

func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, typ string, resourcesDeleted map[string]types.Resource) error {
func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error {

Check failure on line 365 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 365 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
cache.mu.Lock()
defer cache.mu.Unlock()

if snapshot, ok := cache.snapshots[node]; ok {
// Add new/updated resources to the Resources map
index := GetResponseType(typ)
currentResources := snapshot.(*Snapshot).Resources[index]
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)
resourceToDelete := resourcesToDeleted[0]
resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/")
serviceName := resourceToDeleteParts[4]
zone := resourceToDeleteParts[5]
portString := strings.Split(resourcesToDeleted[0], "_")[1]
claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", resource.EndpointType, serviceName, portString)

cache.log.Infof("DeleteResources claName=%s", claName)

for _, snapshot := range cache.snapshots {
didModify := false
currentResources := snapshot.(*Snapshot).Resources[types.Endpoint]
if rsc, found := currentResources.Items[claName]; found {
cla := rsc.Resource.(*endpoint.ClusterLoadAssignment)
for i, _ := range cla.Endpoints {

Check failure on line 383 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

Check failure on line 383 in pkg/cache/v3/simple.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
if cla.Endpoints[i].Locality.Zone == zone {
newEndpoints := make([]*endpoint.LbEndpoint, 0)
for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints {
if resourceToDelete == GetResourceName(lbEndpoint) {
didModify = true
cache.log.Infof("Removed endpoint %s", resourceToDelete)
continue
}
newEndpoints = append(newEndpoints, lbEndpoint)
}

for name, _ := range resourcesDeleted {
if _, found := currentResources.Items[name]; found {
delete(currentResources.Items, name)
cla.Endpoints[i].LbEndpoints = newEndpoints
}
}
}

// Change in version
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)
currentResources.Items[claName] = types.ResourceWithTTL{
Resource: cla,
}
}

// Update
snapshot.(*Snapshot).Resources[index] = currentResources
cache.snapshots[node] = snapshot
if didModify {
// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
info.mu.Unlock()
}

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node] = snapshot
}
}

Expand Down

0 comments on commit 30aafe0

Please sign in to comment.