Skip to content

Commit

Permalink
remove batch peer update
Browse files Browse the repository at this point in the history
  • Loading branch information
yabinma committed Nov 21, 2024
1 parent a11e2dd commit a13c907
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 66 deletions.
51 changes: 11 additions & 40 deletions mq/publishers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
"golang.org/x/exp/slog"
)

var batchSize = servercfg.GetPeerUpdateBatchSize()
var batchUpdate = servercfg.GetBatchPeerUpdate()

var running bool

// PublishPeerUpdate --- determines and publishes a peer update to all the hosts
Expand Down Expand Up @@ -56,44 +53,18 @@ func PublishPeerUpdate(replacePeers bool) error {
return err
}

//if batch peer update disabled
if !batchUpdate {
for _, host := range hosts {
host := host
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
id := host.Name
if host.ID != uuid.Nil {
id = host.ID.String()
}
slog.Error("failed to publish peer update to host", id, ": ", err)
}
}(host)
}
running = false
slog.Error("leaving PublishPeerUpdate, time cost: ", "Debug", time.Now().Unix()-t1)
return nil
}

//if batch peer update enabled
batchHost := BatchItems(hosts, batchSize)
var wg sync.WaitGroup
for _, v := range batchHost {
hostLen := len(v)
wg.Add(hostLen)
for i := 0; i < hostLen; i++ {
host := hosts[i]
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
id := host.Name
if host.ID != uuid.Nil {
id = host.ID.String()
}
slog.Error("failed to publish peer update to host", id, ": ", err)
for _, host := range hosts {
host := host
time.Sleep(1 * time.Millisecond)
go func(host models.Host) {
if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
id := host.Name
if host.ID != uuid.Nil {
id = host.ID.String()
}
}(host)
}
wg.Wait()
slog.Error("failed to publish peer update to host", id, ": ", err)
}
}(host)
}
running = false
slog.Error("leaving PublishPeerUpdate, time cost: ", "Debug", time.Now().Unix()-t1)
Expand Down
4 changes: 0 additions & 4 deletions scripts/netmaker.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ EMAIL_SENDER_ADDR=
EMAIL_SENDER_USER=
# sender smtp password
EMAIL_SENDER_PASSWORD=
# if batch peer update enable or not
PEER_UPDATE_BATCH=true
# batch peer update size when PEER_UPDATE_BATCH is enabled
PEER_UPDATE_BATCH_SIZE=50
# default domain for internal DNS lookup
DEFAULT_DOMAIN=netmaker.hosted
# managed dns setting, set to true to resolve dns entries on netmaker network
Expand Down
22 changes: 0 additions & 22 deletions servercfg/serverconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,28 +686,6 @@ func validateDomain(domain string) bool {
return exp.MatchString(domain)
}

// GetBatchPeerUpdate - if batch peer update
func GetBatchPeerUpdate() bool {
enabled := true
if os.Getenv("PEER_UPDATE_BATCH") != "" {
enabled = os.Getenv("PEER_UPDATE_BATCH") == "true"
}
return enabled
}

// GetPeerUpdateBatchSize - get the batch size for peer update
func GetPeerUpdateBatchSize() int {
//default 50
batchSize := 50
if os.Getenv("PEER_UPDATE_BATCH_SIZE") != "" {
b, e := strconv.Atoi(os.Getenv("PEER_UPDATE_BATCH_SIZE"))
if e == nil && b > 0 && b < 1000 {
batchSize = b
}
}
return batchSize
}

// GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
func GetEmqxRestEndpoint() string {
return os.Getenv("EMQX_REST_ENDPOINT")
Expand Down

0 comments on commit a13c907

Please sign in to comment.