Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dynamic cluster distribution issue 20965, update the shard… #21042

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,32 @@ func NewApplicationController(
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
shard, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard)
if err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %w", err)
}

// update the shard number in the clusterSharding, and resync all applications if the shard number is updated
if ctrl.clusterSharding.UpdateShard(shard) {
// update shard number in stateCache
ctrl.stateCache.UpdateShard(shard)

// resync all applications
apps, err := ctrl.appLister.List(labels.Everything())
if err != nil {
return err
}
for _, app := range apps {
if !ctrl.canProcessApp(app) {
continue
}
key, err := cache.MetaNamespaceKeyFunc(app)
if err == nil {
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.clusterSharding.AddApp(app)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine the two blocks, since they check for the same condition. Or, do continue if err != nil. We also probably wanna log the error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined!

}
}
}
}
return nil
Expand Down
7 changes: 7 additions & 0 deletions controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type LiveStateCache interface {
GetClustersInfo() []clustercache.ClusterInfo
// Init must be executed before cache can be used
Init() error
// UpdateShard will update the shard of ClusterSharding when the shard has changed.
UpdateShard(shard int) bool
}

type ObjectUpdatedHandler = func(managedByApp map[string]bool, ref v1.ObjectReference)
Expand Down Expand Up @@ -912,3 +914,8 @@ func (c *liveStateCache) GetClustersInfo() []clustercache.ClusterInfo {
func (c *liveStateCache) GetClusterCache(server string) (clustercache.ClusterCache, error) {
return c.getSyncedCluster(server)
}

// UpdateShard will update the shard of ClusterSharding when the shard has changed.
func (c *liveStateCache) UpdateShard(shard int) bool {
return c.clusterSharding.UpdateShard(shard)
}
18 changes: 18 additions & 0 deletions controller/cache/mocks/LiveStateCache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type ClusterShardingCache interface {
UpdateApp(a *v1alpha1.Application)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
GetAppDistribution() map[string]int
UpdateShard(shard int) bool
}

type ClusterSharding struct {
Expand Down Expand Up @@ -243,3 +245,33 @@ func (sharding *ClusterSharding) UpdateApp(a *v1alpha1.Application) {
log.Debugf("Skipping sharding distribution update. No relevant changes")
}
}

// GetAppDistribution should be not be called from a DestributionFunction because
// it could cause a deadlock when updateDistribution is called.
func (sharding *ClusterSharding) GetAppDistribution() map[string]int {
sharding.lock.RLock()
clusters := sharding.Clusters
apps := sharding.Apps
sharding.lock.RUnlock()

appDistribution := make(map[string]int, len(clusters))

for _, a := range apps {
if _, ok := appDistribution[a.Spec.Destination.Server]; !ok {
appDistribution[a.Spec.Destination.Server] = 0
}
appDistribution[a.Spec.Destination.Server]++
}
return appDistribution
}

// UpdateShard will update the shard of ClusterSharding when the shard has changed.
func (sharding *ClusterSharding) UpdateShard(shard int) bool {
if shard != sharding.Shard {
sharding.lock.RLock()
sharding.Shard = shard
sharding.lock.RUnlock()
return true
}
return false
}
5 changes: 4 additions & 1 deletion controller/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,10 @@ func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.S
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %w", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
// if `err == nil`, should not log the following warning message
if err != nil {
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
}
errors.CheckError(err)
} else {
Expand Down
Loading