Skip to content

Commit

Permalink
Code review markups.
Browse files Browse the repository at this point in the history
  • Loading branch information
fasaxc committed Jun 9, 2017
1 parent c260381 commit e9f4332
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
9 changes: 7 additions & 2 deletions pkg/k8s/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func PollK8sForConnectionLimit(
configParams.K8sPortName,
)
if tErr != nil || numTyphas <= 0 {
logCxt.WithError(tErr).Warn("Failed to get number of Typhas")
logCxt.WithError(tErr).WithField("numTyphas", numTyphas).Warn(
"Failed to get number of Typhas")
}
// Get the number of nodes as an estimate for the number of Felix connections we should expect.
numNodes, nErr := k8sAPI.GetNumNodes()
if nErr != nil || numNodes <= 0 {
logCxt.WithError(nErr).Warn("Failed to get number of nodes")
logCxt.WithError(nErr).WithField("numNodes", numNodes).Warn(
"Failed to get number of nodes")
}

target := configParams.MaxConnectionsUpperLimit
Expand Down Expand Up @@ -91,6 +93,9 @@ func CalculateMaxConnLimit(configParams *config.Config, numTyphas, numNodes int)
target = configParams.MaxConnectionsUpperLimit
return
}
// We subtract 1 from the number of Typhas when calculating the fraction to allow for one Typha
// dying during a rolling upgrade, for example. That does mean our load will be less even but
// it reduces the number of expensive disconnections. We add 20% to give some further headroom.
candidate := 1 + numNodes*120/(numTyphas-1)/100
if candidate > target {
reason = "fraction+20%"
Expand Down
20 changes: 9 additions & 11 deletions pkg/syncserver/sync_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *Server) serve(cxt context.Context) {
logCxt.WithError(err).Panic("Failed to accept connection")
}

if s.overConnLimit() {
if s.atConnLimit() {
logCxt.WithField("conn", conn.RemoteAddr()).Warn(
"Too many active connections, dropping incoming connection.")
counterNumConnectionsRejected.Inc()
Expand Down Expand Up @@ -251,18 +251,19 @@ func (s *Server) serve(cxt context.Context) {
encoder: gob.NewEncoder(conn),
readC: make(chan interface{}),
}

// Track the connection's lifetime in connIDToConn so we can kill it later if needed.
s.recordConnection(connection)
// Defer to the connection-handler.
go connection.handle()
// Clean up the entry in connIDToConn as soon as the context is canceled.
go func() {
// Track the connection's lifetime in connIDToConn so we can kill it later if needed.
s.recordConnection(connection)
defer s.discardConnection(connection)
// Defer to the connection to do the handling.
connection.handle()
<-connCxt.Done()
s.discardConnection(connection)
}()
}
}

func (s *Server) overConnLimit() bool {
func (s *Server) atConnLimit() bool {
s.connTrackingLock.Lock()
defer s.connTrackingLock.Unlock()
return len(s.connIDToConn) >= s.maxConns
Expand Down Expand Up @@ -314,9 +315,6 @@ func (s *Server) governNumberOfConnections(cxt context.Context) {
}).Warn("Currently have too many connections, terminating one at random.")
conn.cancelCxt()
counterNumConnectionsDropped.Inc()
// Remove from the map now so that the connection count immediately drops;
// otherwise we might drop too many connections.
delete(s.connIDToConn, connID)
break
}
}
Expand Down

0 comments on commit e9f4332

Please sign in to comment.