Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update client pool logic #846

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ test-local:
-v $${PWD}/docker/resources/replication.cnf:/etc/mysql/conf.d/replication.cnf \
mysql:$(MYSQL_VERSION)
docker/resources/waitfor.sh 127.0.0.1 3306 \
&& go test -race -v -timeout 2m ./... -gocheck.v
&& go test -race -v -timeout 2m ./client/...
docker stop go-mysql-server

fmt:
Expand Down
152 changes: 125 additions & 27 deletions client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"log"
"math"
"math/rand"
"sync"
Expand Down Expand Up @@ -77,43 +78,46 @@ var (
MaxNewConnectionAtOnce = 5
)

// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options.
// minAlive specifies the minimum number of open connections that the pool will try to maintain.
// maxAlive specifies the maximum number of open connections (for internal reasons,
// may be greater by 1 inside newConnectionProducer).
// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
func NewPool(
logFunc LogFunc,
minAlive int,
maxAlive int,
maxIdle int,
// NewPoolWithOptions initializes new connection pool and uses params: addr, user, password, dbName and options.
func NewPoolWithOptions(
addr string,
user string,
password string,
dbName string,
options ...func(conn *Conn),
) *Pool {
if minAlive > maxAlive {
minAlive = maxAlive
options ...PoolOption,
) (*Pool, error) {
po := getDefaultPoolOptions()

po.addr = addr
po.user = user
po.password = password
po.dbName = dbName

for _, o := range options {
o(&po)
}

if po.minAlive > po.maxAlive {
po.minAlive = po.maxAlive
}
if maxIdle > maxAlive {
maxIdle = maxAlive
if po.maxIdle > po.maxAlive {
po.maxIdle = po.maxAlive
}
if maxIdle <= minAlive {
maxIdle = minAlive
if po.maxIdle <= po.minAlive {
po.maxIdle = po.minAlive
}

pool := &Pool{
logFunc: logFunc,
minAlive: minAlive,
maxAlive: maxAlive,
maxIdle: maxIdle,
logFunc: po.logFunc,
minAlive: po.minAlive,
maxAlive: po.maxAlive,
maxIdle: po.maxIdle,

idleCloseTimeout: Timestamp(math.Ceil(DefaultIdleTimeout.Seconds())),
idlePingTimeout: Timestamp(math.Ceil(MaxIdleTimeoutWithoutPing.Seconds())),

connect: func() (*Conn, error) {
return Connect(addr, user, password, dbName, options...)
return Connect(addr, user, password, dbName, po.connOptions...)
},

readyConnection: make(chan Connection),
Expand All @@ -127,13 +131,56 @@ func NewPool(
go pool.newConnectionProducer()

if pool.minAlive > 0 {
pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive)
pool.startNewConnections(pool.minAlive)
go pool.startNewConnections(pool.minAlive)
}

pool.wg.Add(1)
go pool.closeOldIdleConnections()

if po.newPoolPingTimeout > 0 {
ctx, cancel := context.WithTimeout(pool.ctx, po.newPoolPingTimeout)
err := pool.checkConnection(ctx)
cancel()
if err != nil {
pool.Close()
return nil, errors.Errorf("checkConnection: %s", err)
}
}

return pool, nil
}

// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options.
// minAlive specifies the minimum number of open connections that the pool will try to maintain.
// maxAlive specifies the maximum number of open connections (for internal reasons,
// may be greater by 1 inside newConnectionProducer).
// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
//
// Deprecated: use NewPoolWithOptions
func NewPool(
logFunc LogFunc,
minAlive int,
maxAlive int,
maxIdle int,
addr string,
user string,
password string,
dbName string,
options ...func(conn *Conn),
) *Pool {
pool, err := NewPoolWithOptions(
addr,
user,
password,
dbName,
WithLogFunc(logFunc),
WithPoolLimits(minAlive, maxAlive, maxIdle),
WithConnOptions(options...),
)
if err != nil {
pool.logFunc(`Pool: NewPool: %s`, err.Error())
}

return pool
}

Expand Down Expand Up @@ -235,6 +282,7 @@ func (pool *Pool) putConnectionUnsafe(connection Connection) {

func (pool *Pool) newConnectionProducer() {
defer pool.wg.Done()

var connection Connection
var err error

Expand Down Expand Up @@ -263,10 +311,24 @@ func (pool *Pool) newConnectionProducer() {
pool.synchro.stats.TotalCount-- // Bad luck, should try again
pool.synchro.Unlock()

time.Sleep(time.Duration(10+rand.Intn(90)) * time.Millisecond)
continue
pool.logFunc("Cannot establish new db connection: %s", err.Error())

timer := time.NewTimer(
time.Duration(10+rand.Intn(90)) * time.Millisecond,
)

select {
case <-timer.C:
continue
case <-pool.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
}

select {
case pool.readyConnection <- connection:
case <-pool.ctx.Done():
Expand Down Expand Up @@ -309,6 +371,7 @@ func (pool *Pool) getIdleConnectionUnsafe() Connection {

func (pool *Pool) closeOldIdleConnections() {
defer pool.wg.Done()

var toPing []Connection

ticker := time.NewTicker(5 * time.Second)
Expand Down Expand Up @@ -468,13 +531,17 @@ func (pool *Pool) closeConn(conn *Conn) {
}

func (pool *Pool) startNewConnections(count int) {
pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, count)

connections := make([]Connection, 0, count)
for i := 0; i < count; i++ {
if conn, err := pool.createNewConnection(); err == nil {
pool.synchro.Lock()
pool.synchro.stats.TotalCount++
pool.synchro.Unlock()
connections = append(connections, conn)
} else {
pool.logFunc(`Pool: createNewConnection: %s`, err)
}
}

Expand Down Expand Up @@ -512,3 +579,34 @@ func (pool *Pool) Close() {
pool.synchro.idleConnections = nil
pool.synchro.Unlock()
}

// checkConnection tries to connect and ping DB server
func (pool *Pool) checkConnection(ctx context.Context) error {
errChan := make(chan error, 1)

go func() {
atercattus marked this conversation as resolved.
Show resolved Hide resolved
conn, err := pool.connect()
if err == nil {
err = conn.Ping()
_ = conn.Close()
}
errChan <- err
}()

select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// getDefaultPoolOptions returns pool config for low load services
func getDefaultPoolOptions() poolOptions {
return poolOptions{
logFunc: log.Printf,
minAlive: 1,
maxAlive: 10,
maxIdle: 2,
}
}
60 changes: 60 additions & 0 deletions client/pool_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package client

import (
"time"
)

type (
poolOptions struct {
logFunc LogFunc

minAlive int
maxAlive int
maxIdle int

addr string
user string
password string
dbName string

connOptions []func(conn *Conn)

newPoolPingTimeout time.Duration
}
)

type (
PoolOption func(o *poolOptions)
)

// WithPoolLimits sets pool limits:
// - minAlive specifies the minimum number of open connections that the pool will try to maintain.
// - maxAlive specifies the maximum number of open connections (for internal reasons,
// may be greater by 1 inside newConnectionProducer).
// - maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
func WithPoolLimits(minAlive, maxAlive, maxIdle int) PoolOption {
return func(o *poolOptions) {
o.minAlive = minAlive
o.maxAlive = maxAlive
o.maxIdle = maxIdle
}
}

func WithLogFunc(f LogFunc) PoolOption {
return func(o *poolOptions) {
o.logFunc = f
}
}

func WithConnOptions(options ...func(conn *Conn)) PoolOption {
return func(o *poolOptions) {
o.connOptions = append(o.connOptions, options...)
}
}

// WithNewPoolPingTimeout enables connect & ping to DB during the pool initialization
func WithNewPoolPingTimeout(timeout time.Duration) PoolOption {
return func(o *poolOptions) {
o.newPoolPingTimeout = timeout
}
}
41 changes: 39 additions & 2 deletions client/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package client
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"

"github.com/go-mysql-org/go-mysql/test_util"
"github.com/siddontang/go-log/log"
Expand All @@ -26,7 +28,12 @@ func TestPoolSuite(t *testing.T) {

func (s *poolTestSuite) TestPool_Close() {
addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port)
pool := NewPool(log.Debugf, 5, 10, 5, addr, *testUser, *testPassword, "")
pool, err := NewPoolWithOptions(addr, *testUser, *testPassword, "",
WithPoolLimits(5, 10, 5),
WithLogFunc(log.Debugf),
)
require.NoError(s.T(), err)

conn, err := pool.GetConn(context.Background())
require.NoError(s.T(), err)
err = conn.Ping()
Expand All @@ -35,8 +42,38 @@ func (s *poolTestSuite) TestPool_Close() {
pool.Close()
var poolStats ConnectionStats
pool.GetStats(&poolStats)
require.Equal(s.T(), 0, poolStats.TotalCount)
require.Equal(s.T(), 0, poolStats.IdleCount)
require.Len(s.T(), pool.readyConnection, 0)
_, err = pool.GetConn(context.Background())
require.Error(s.T(), err)
}

func (s *poolTestSuite) TestPool_WrongPassword() {
addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port)

_, err := NewPoolWithOptions(addr, *testUser, "wrong-password", "",
WithPoolLimits(5, 10, 5),
WithLogFunc(log.Debugf),
WithNewPoolPingTimeout(time.Second),
)

require.ErrorContains(s.T(), err, "ERROR 1045 (28000): Access denied for user")
}

func (s *poolTestSuite) TestPool_WrongAddr() {
l, err := net.Listen("tcp4", "127.0.0.1:0")
require.NoError(s.T(), err)

laddr, ok := l.Addr().(*net.TCPAddr)
require.True(s.T(), ok)

_ = l.Close()

_, err = NewPoolWithOptions(laddr.String(), *testUser, *testPassword, "",
WithPoolLimits(5, 10, 5),
WithLogFunc(log.Debugf),
WithNewPoolPingTimeout(time.Second),
)

require.Error(s.T(), err)
}
Loading