-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement connection manager #1021
base: development
Are you sure you want to change the base?
feat: Implement connection manager #1021
Conversation
urls := shuffle(m.unusedURLs()) | ||
urls = append(urls, m.urls...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it here, isn't the urls returned from unusedURLs
are already in m.urls? why not just use m.urls
? and why shuffling them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When creating new connections, I want to prioritize unused URLs but won’t restrict attempts to them if all unused URLs are down. also, unusedURLs()
can return 0 URLs.
Shuffling was implemented to distribute loads. Without shuffling, when an RPC node goes down, all nodes would attempt to connect to the same URL, causing spikes and high load.
if conn := m.getExistingConn(); conn != nil { | ||
return conn, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're trying to get an existing connection here and then retrying again in the backoff retry function, should we remove this and only use the one in the retry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- didn't review tests
- to be continued
// be behind before we don't accept it. block time is 6 seconds, so | ||
// right now we only allow 2 blocks delay | ||
acceptableDelay = 2 * 6 * time.Second | ||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why comments are removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely restore the comments in the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to clean up my own comments but accidentally deleted old ones as well, thanks to dump code editor plugins :(
return err == nil | ||
} | ||
|
||
func (pc *poolConn) close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to update lastUsed
and isUse
?
// so instead we just check the health when we need to and do the maintanance in demand | ||
// HealthCheckInterval time.Duration | ||
// Timeout for creating new connections | ||
ConnectionTimeout time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't get it? why setting a timeout for creating connections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, this is the behavior you want.
Mainly to avoid waiting indefinitely on a stuck connection instead of attempting to reconnect.
Also, this parameter is not used at all and should be cleaned up.
The reason I removed the timeout logic (not completely as you noticed) is that I found it already implemented in the inner substrate client out of the box, so it wasn't needed in our code.
// Minimum number of connections to maintain | ||
MinPoolSize int | ||
// Maximum time a connection can be idle before being closed (if the pool has more than MinPoolSize) | ||
MaxIdleTime time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if I reached it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rawdaGastan the connection should get closed as far as i understand
|
||
type manager struct { | ||
urls []string | ||
pool []*poolConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pools?
t, err := getTime(cl, meta) | ||
if err != nil || time.Since(t) > AcceptableDelay { | ||
cl.Client.Close() | ||
return nil, nil, fmt.Errorf("node health check failed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not embedding the error?
if s.conn != nil && s.conn.isHealthy() { | ||
conn := s.conn.conn | ||
meta := s.conn.meta | ||
s.conn.lastUsed.Store(time.Now().Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we update inUse
field here?
Msg("checking for healthy connections") | ||
|
||
// Try getting existing connection first | ||
if conn := m.getExistingConn(); conn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why trying to get connection before using backoff.Retry
?
return unused | ||
} | ||
|
||
func (m *manager) aquiredConnCount() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it ia about inUse
count not all connections>
log.Debug().Msg("ensuring minimum connections in the pool") | ||
inUseCount := m.aquiredConnCount() | ||
urls := shuffle(m.unusedURLs()) | ||
urls = append(urls, m.urls...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why appending shuffled unused and all manager urls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment here
} | ||
|
||
func (m *manager) getHealthyConn() (*poolConn, error) { | ||
log.Debug().Int("pool_size", len(m.pool)).Int("aquired_count", m.aquiredConnCount()). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw m.pool
is accessed here without any locking which leads to data race. Try running the tests with -race
flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also typo aquired should be acquired
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it does an io, it should have a context.
mgrImpl.mu.RLock() | ||
defer mgrImpl.mu.RUnlock() | ||
|
||
assert.LessOrEqual(t, len(mgrImpl.pool), mgrImpl.config.MinPoolSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't len(mgrImpl.pool) be greater or equal the the MinPoolSize?
// right now we only allow 2 blocks delay | ||
acceptableDelay = 2 * 6 * time.Second | ||
var ( | ||
ErrInvalidVersion = fmt.Errorf("invalid version") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For static errors use only errors.New, fmt.Errorf communicates the intention of either wrapping or formatting
//ErrNotFound is returned if an object is not found | ||
ErrNotFound = fmt.Errorf("object not found") | ||
const ( | ||
AcceptableDelay = 2 * 6 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WHy exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and restore the comment please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed shouldn't be exported since it's an internal implementation
@@ -37,145 +36,437 @@ type Versioned struct { | |||
type Conn = *gsrpc.SubstrateAPI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This alias is misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are your suggestions here?
@@ -37,145 +36,437 @@ type Versioned struct { | |||
type Conn = *gsrpc.SubstrateAPI | |||
type Meta = *types.Metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
conn: conn, | ||
meta: meta, | ||
url: url, | ||
lastUsed: atomic.Int64{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialized to what? use .Store
with unix timestamp
meta: meta, | ||
url: url, | ||
lastUsed: atomic.Int64{}, | ||
inUse: atomic.Bool{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use .Store(false)
url[i], url[j] = url[j], url[i] | ||
}) | ||
// Validate and adjust configuration | ||
if config.MaxPoolSize < 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these validations should fail and never swap, override values provided by the user, we need to tell them they're wrong instead
ctx: ctx, | ||
cancel: cancel, | ||
config: config, | ||
checkChan: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why buffered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer size of 1 is sufficient because we only need to know if a check is pending or not.
If a check is already pending, other goroutines are not blocked (Non-blocking sends).
Also, Multiple check requests are coalesced, collapsing multiple requests into a single check operation (reduces contention).
select {
case m.checkChan <- struct{}{}: // Non-blocking send
log.Debug().Msg("triggered connection check")
default: // If channel is full, skip
log.Debug().Msg("connection check already pending")
}
Note that initially, I have the health check execute on demand—immediately when a connection fails—and periodically, however, I later reconsidered the impact of the side effects on the load and the actual benefits.
if len(url) == 0 { | ||
panic("at least one url is required") | ||
func NewManagerWithConfig(config ManagerConfig, urls ...string) Manager { | ||
if len(urls) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never panic on input validation!
m := &manager{ | ||
urls: shuffle(urls), | ||
pool: make([]*poolConn, 0, config.MaxPoolSize), | ||
ctx: ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redflag to store ctx on a struct.
urls []string | ||
pool []*poolConn | ||
mu sync.RWMutex | ||
ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt we need that, it may even introduce more problems than it fixes
// the connection must be closed after you are done using it | ||
func (p *mgrImpl) Substrate() (*Substrate, error) { | ||
cl, meta, err := p.Raw() | ||
func (m *manager) GetConnection(ctx context.Context) (*Substrate, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is that ctx used?
poolSize := len(m.pool) | ||
|
||
if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) { | ||
if conn, err := m.createConnection(url); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and in case of error?
func (s *Substrate) Close() { | ||
s.Release() | ||
} | ||
|
||
func getTime(cl Conn, meta Meta) (t time.Time, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have context I believe.
if s.conn != nil { | ||
s.conn.inUse.Store(false) | ||
} | ||
s.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's unlock case in 434, and always unlock on 440?
meta Meta | ||
func (m *manager) Close() error { | ||
m.cancel() | ||
m.wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes we only created it with NewManagerWithConfig and may block forever.
return conn, err | ||
} | ||
|
||
func (m *manager) healthChecker() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for {
m.mu.Lock()
for len(m.pool) == 0 {
// Wait until there is work to do or the context is canceled
m.cond.Wait()
}
m.mu.Unlock()
// Perform health checks
select {
case <-m.ctx.Done():
return
default:
m.checkConnections()
}
}
cancel context.CancelFunc | ||
wg sync.WaitGroup | ||
config ManagerConfig | ||
checkChan chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with a condition variable instead
} | ||
|
||
// Default configuration | ||
var DefaultConfig = ManagerConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it's common I'd prefer to have SubstrateManagerDefaultConfig as a function instead
conn Conn | ||
meta Meta | ||
url string | ||
lastUsed atomic.Int64 // Unix timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe the poolConn is used anywhere without the mutex, so probably better to get rid of all of the atomics.
There are few things that i noticed here:
I did a quick read on the websocket, but not sure about it. If we use Please double check the substrate client internal |
OK, found out that we use websocket.
We indeed need to have a pool if we use websocket, because it doesn't have the internal pool like the http one. |
but the docs says the contrary: it is websocket which can do concurrent request https://docs.substrate.io/build/application-development/#connecting-to-a-node
|
Connection Pool Manager Implementation
Overview
This PR introduces a connection pool manager for TFChain client to improve connection handling, and reliability. It replaces the previous simple single connection management with a pool management implementation.
Key Features
Are these changes back-compatible with old code
The
Manager
interface has been updated with new methods. The changes were almost compatible with the old code by opting for having aliases for deprecated/renamed methods and setting defaults for the pool configuration. However:mgr.Close()
method when done with the manager.NewManagerWithConfig
function.Deprecated Methods
No immediate action is required atm but these methods may be removed in the future
mgr.Raw()
is deprecatedmgr.Substrate()
is depricated and replaced withmgr.GetConnection()
.sub.Close()
is deprecated and replaced withsub.Release()
.Checklist: