From e9edb0d0ffb91e31f6ce3b152a1eecf4e37157ca Mon Sep 17 00:00:00 2001 From: msonghurst Date: Thu, 16 Mar 2017 21:10:36 +0000 Subject: [PATCH 1/8] hard close the connection, prevent it returning to a pool --- conn_sp.go | 4 ++-- conn_sp_test.go | 15 +++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/conn_sp.go b/conn_sp.go index d58882e..5a621f4 100644 --- a/conn_sp.go +++ b/conn_sp.go @@ -49,7 +49,7 @@ func (conn *Conn) ExecSp(spName string, params ...interface{}) (*SpResult, error return nil, err } for i, spParam := range spParams { - //get datavalue for the suplied stored procedure parametar + //get datavalue for the suplied stored procedure parameter var datavalue *C.BYTE datalen := 0 if i < len(params) { @@ -57,7 +57,7 @@ func (conn *Conn) ExecSp(spName string, params ...interface{}) (*SpResult, error if param != nil { data, sqlDatalen, err := typeToSqlBuf(int(spParam.UserTypeId), param, conn.freetdsVersionGte095) if err != nil { - conn.Close() //close the connection + conn.close() //hard close the connection, if pooled don't return it. return nil, err } if len(data) > 0 { diff --git a/conn_sp_test.go b/conn_sp_test.go index 4eb1aeb..386ed76 100644 --- a/conn_sp_test.go +++ b/conn_sp_test.go @@ -417,25 +417,20 @@ func TestExecSpBadParameterDataType(t *testing.T) { select @p1 return`) assert.Nil(t, err) - + err = createProcedure(conn, "test_bad_parameter_data_type2", " as select 1 one; select 2 two; return 456") assert.Nil(t, err) - var intval int16 intval = 1 _, err = conn.ExecSp("test_bad_parameter_data_type", intval) expectedError := "Could not convert int16 to string." assert.Equal(t, expectedError, err.Error()) - + _, err = conn.ExecSp("test_bad_parameter_data_type", "test") - assert.Nil(t, err) + assert.Nil(t, err) _, err = conn.ExecSp("test_bad_parameter_data_type2") - assert.Nil(t, err) - -} - - - + assert.Nil(t, err) +} From 9849f4b672439a76d8a1e4deba9d495057b6fb55 Mon Sep 17 00:00:00 2001 From: msonghurst Date: Fri, 17 Mar 2017 09:45:38 +0000 Subject: [PATCH 2/8] Error number map, Remove pooled connection --- callbacks.go | 2 +- conn.go | 34 ++++++++++++++++++++++++++++++++-- conn_pool.go | 11 +++++++++++ 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/callbacks.go b/callbacks.go index 6d6b23b..4e622b4 100644 --- a/callbacks.go +++ b/callbacks.go @@ -36,7 +36,7 @@ func errHandler(dbprocAddr C.long, severity, dberr, oserr C.int, dberrstr, oserr conn := getConnection(int64(dbprocAddr)) if conn != nil { - conn.addError(err) + conn.addError(err, int(dberr)) } //fmt.Printf("err: %s", err) diff --git a/conn.go b/conn.go index e8cc296..ac18d7c 100644 --- a/conn.go +++ b/conn.go @@ -94,6 +94,9 @@ type Conn struct { messageNums map[int]int messageMutex sync.RWMutex + errors map[int]string + errorsMutex sync.RWMutex + currentResult *Result expiresFromPool time.Time belongsToPool *ConnPool @@ -120,7 +123,12 @@ func (conn *Conn) addMessage(msg string, msgno int) { conn.messageNums[msgno] = i + 1 } -func (conn *Conn) addError(err string) { +func (conn *Conn) addError(err string, errno int) { + conn.errorsMutex.Lock() + defer conn.errorsMutex.Unlock() + + conn.errors[errno] = err + if len(conn.Error) > 0 { conn.Error += "\n" } @@ -141,6 +149,7 @@ func connectWithCredentials(crd *credentials) (*Conn, error) { spParamsCache: NewParamsCache(), credentials: *crd, messageNums: make(map[int]int), + errors: make(map[int]string), } err := conn.reconnect() if err != nil { @@ -188,6 +197,14 @@ func (conn *Conn) close() { } } +// Remove a pooled connection from it's pool. +func RemoveFromPool(conn *Conn) *Conn { + if conn.belongsToPool != nil { + conn.belongsToPool.Remove(conn) + } + return conn +} + //ensure only one getDbProc at a time var getDbProcMutex = &sync.Mutex{} @@ -260,11 +277,15 @@ func (conn *Conn) DbUse() error { func (conn *Conn) clearMessages() { conn.messageMutex.Lock() - defer conn.messageMutex.Unlock() + conn.errorsMutex.Lock() conn.Error = "" + conn.errors = make(map[int]string) conn.Message = "" conn.messageNums = make(map[int]int) + + conn.errorsMutex.Lock() + conn.messageMutex.Unlock() } //Returns the number of occurances of a supplied FreeTDS message number. @@ -276,6 +297,15 @@ func (conn *Conn) HasMessageNumber(msgno int) int { return count } +//Returns the error string for a supplied FreeTDS error number. +//if the error has not occurred then an empty string and false is returned. +func (conn *Conn) HasErrorNumber(errno int) (string, bool) { + conn.errorsMutex.RLock() + err, found := conn.errors[errno] + conn.errorsMutex.RUnlock() + return err, found +} + //Execute sql query. func (conn *Conn) Exec(sql string) ([]*Result, error) { results, err := conn.exec(sql) diff --git a/conn_pool.go b/conn_pool.go index 2fd7c26..7f56f03 100644 --- a/conn_pool.go +++ b/conn_pool.go @@ -162,6 +162,17 @@ func (p *ConnPool) addToPool(conn *Conn) { } } +// Remove a pooled connection from the pool, +// forcing a new connection to take it's place. +func (p *ConnPool) Remove(conn *Conn) { + if conn.belongsToPool != p { + return + } + p.connCount-- + <-p.poolGuard //remove reservation + conn.belongsToPool = nil +} + //Release connection to the pool. func (p *ConnPool) Release(conn *Conn) { if conn.belongsToPool != p { From a2e1fddfc471d0ede8b6a2945d9a8a8886d18236 Mon Sep 17 00:00:00 2001 From: msonghurst Date: Fri, 17 Mar 2017 09:57:04 +0000 Subject: [PATCH 3/8] mutex was not being unlocked --- conn.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/conn.go b/conn.go index ac18d7c..7511377 100644 --- a/conn.go +++ b/conn.go @@ -277,15 +277,14 @@ func (conn *Conn) DbUse() error { func (conn *Conn) clearMessages() { conn.messageMutex.Lock() - conn.errorsMutex.Lock() - conn.Error = "" conn.errors = make(map[int]string) - conn.Message = "" - conn.messageNums = make(map[int]int) + conn.messageMutex.Unlock() conn.errorsMutex.Lock() - conn.messageMutex.Unlock() + conn.Message = "" + conn.messageNums = make(map[int]int) + conn.errorsMutex.Unlock() } //Returns the number of occurances of a supplied FreeTDS message number. From 043347113824369b6e76cb5c5b0755ef4f38e3fc Mon Sep 17 00:00:00 2001 From: msonghurst Date: Fri, 17 Mar 2017 10:25:18 +0000 Subject: [PATCH 4/8] Make RemoveFromPool a func on type conn --- conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conn.go b/conn.go index 7511377..631594a 100644 --- a/conn.go +++ b/conn.go @@ -198,7 +198,7 @@ func (conn *Conn) close() { } // Remove a pooled connection from it's pool. -func RemoveFromPool(conn *Conn) *Conn { +func (conn *Conn) RemoveFromPool() *Conn { if conn.belongsToPool != nil { conn.belongsToPool.Remove(conn) } From 9b9f8c54dd242be99e001ad229a37991d19bc421 Mon Sep 17 00:00:00 2001 From: Mark Songhurst Date: Fri, 17 Mar 2017 16:01:02 +0300 Subject: [PATCH 5/8] Added unit tests --- conn_pool_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++++++ conn_test.go | 38 +++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/conn_pool_test.go b/conn_pool_test.go index eeda49b..2025255 100644 --- a/conn_pool_test.go +++ b/conn_pool_test.go @@ -3,6 +3,7 @@ package freetds import ( "fmt" "github.com/stretchr/testify/assert" + "sync" "testing" "time" ) @@ -168,3 +169,106 @@ func TestConnPoolDo(t *testing.T) { assert.Equal(t, 1, len(p.pool)) assert.Equal(t, 1, p.connCount) } + +func TestPoolRemove_TwoSizedPool(t *testing.T) { + p, _ := NewConnPool(testDbConnStr(2)) + assert.Equal(t, p.connCount, 1) + c1, _ := p.Get() + c2, _ := p.Get() + // The pool has used 2 connections (c1, c2) + assert.Equal(t, p.connCount, 2) + // ...and has no unused connections in it. + assert.Equal(t, len(p.pool), 0) + + // Remove c1 from the pool + p.Remove(c1) + assert.Nil(t, c1.belongsToPool) + // The pool has 1 used connection (c2) + assert.Equal(t, p.connCount, 1) + // ...and still has no unused connections in it. + assert.Equal(t, len(p.pool), 0) + + // Trying to release the removed conn is a safe noop + p.Release(c1) + c1.Close() + + // Get another connection from the pool. + // The pool will need to create this connection + // as it has no unused connections. + c3, _ := p.Get() + + // The pool has used 2 connections (c2, c3) + assert.Equal(t, p.connCount, 2) + // ...and has no unused connections in it. + assert.Equal(t, len(p.pool), 0) + + p.Release(c2) + p.Release(c3) +} + +func TestPoolRemove_OneSizedPool(t *testing.T) { + p, _ := NewConnPool(testDbConnStr(1)) + assert.Equal(t, p.connCount, 1) + c1, _ := p.Get() + // The pool has used 1 connection (c1) + assert.Equal(t, p.connCount, 1) + // ...and has no unused connections in it. + assert.Equal(t, len(p.pool), 0) + + var wg sync.WaitGroup + chGotConn := make(chan *Conn) + + // This goroutine attemps to Get another connection from the pool. + wg.Add(1) + go func() { + // The call to p.Get() will block until c1 is + // removed from the pool by the other goroutine. + c2, _ := p.Get() + assert.NotNil(t, c2) + chGotConn <- c2 + wg.Done() + }() + + wg.Add(1) + go func() { + // Sleep the goroutine for 2 seconds, + // keeping c1 in the pool. + time.Sleep(2 * time.Second) + // Now remove c1 from the pool. + // This will allow the call to p.Get() + // in the other goroutine to unblock. + // Note the use of the RemoveFromPool() func on the conn itself. + c1.RemoveFromPool().Close() + wg.Done() + }() + + // Wait to receive c2, or timeout. + select { + case c2 := <-chGotConn: + // The pool has used 1 connection (c2) + assert.Equal(t, p.connCount, 1) + // ...and has no unused connections in it. + assert.Equal(t, len(p.pool), 0) + p.Release(c2) + + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out waiting for a pooled connection") + } + + wg.Wait() + close(chGotConn) +} + +func TestPoolRemove_OnConn(t *testing.T) { + p, _ := NewConnPool(testDbConnStr(2)) + assert.Equal(t, p.connCount, 1) + c1, _ := p.Get() + + c1.RemoveFromPool() + assert.Nil(t, c1.belongsToPool) + // Calling RemoveFromPool again is a safe noop + c1.RemoveFromPool() + // Trying to remove the already removed conn is a safe noop + p.Remove(c1) + c1.Close() +} diff --git a/conn_test.go b/conn_test.go index 8f365a5..0791608 100644 --- a/conn_test.go +++ b/conn_test.go @@ -495,3 +495,41 @@ func TestMessageNumbers(t *testing.T) { assert.Equal(t, c.HasMessageNumber(msgnumOne), 2) assert.Equal(t, c.HasMessageNumber(msgnumTwo), 1) } + +// Also run with "go test --race" for race condition checking. +func TestErrors(t *testing.T) { + const errnumOne = 11111 + const errnumOneMessage = "errnum-1, goroutine-" + const errnumTwo = 22222 + const errnumTwoMessage = "errnum-2, goroutine-" + + c := &Conn{ + errors: make(map[int]string), + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + c.addError((errnumOneMessage + "alpha"), errnumOne) + wg.Done() + }() + + wg.Add(1) + go func() { + c.addError((errnumOneMessage + "beta"), errnumOne) + c.addError((errnumTwoMessage + "beta"), errnumTwo) + wg.Done() + }() + + wg.Wait() + // The most recent error using errnumOne will overwrite the previous error. + str, found := c.HasErrorNumber(errnumOne) + assert.True(t, found) + assert.Contains(t, str, errnumOneMessage) + + // Only one error using errnumTwo was raised. + str, found = c.HasErrorNumber(errnumTwo) + assert.True(t, found) + assert.Contains(t, str, errnumTwoMessage) +} From d11316c21832159bfb0ad24be93305e66e10767d Mon Sep 17 00:00:00 2001 From: Mark Songhurst Date: Tue, 26 Sep 2017 17:10:45 +0300 Subject: [PATCH 6/8] added column name to error from convertAssign --- result.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/result.go b/result.go index 9ab997c..82769ef 100644 --- a/result.go +++ b/result.go @@ -131,7 +131,7 @@ func (r *Result) ScanColumn(name string, dest interface{}) error { err = convertAssign(dest, r.Rows[r.currentRow][i]) if err != nil { - return err + return fmt.Errorf("%s, column %s", err.Error(), name) } return nil From f3998c81b35761fe0781b113c458d83429a5419a Mon Sep 17 00:00:00 2001 From: Mark Songhurst Date: Tue, 26 Sep 2017 17:11:56 +0300 Subject: [PATCH 7/8] Added single quotes around column name --- result.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/result.go b/result.go index 82769ef..4fe891e 100644 --- a/result.go +++ b/result.go @@ -131,7 +131,7 @@ func (r *Result) ScanColumn(name string, dest interface{}) error { err = convertAssign(dest, r.Rows[r.currentRow][i]) if err != nil { - return fmt.Errorf("%s, column %s", err.Error(), name) + return fmt.Errorf("%s, column '%s'", err.Error(), name) } return nil From e79e30e83f5c7bee43908cc19f9a49983c126889 Mon Sep 17 00:00:00 2001 From: Mark Songhurst Date: Wed, 27 Sep 2017 11:48:16 +0300 Subject: [PATCH 8/8] Added another column name to error message --- result.go | 4 ++-- sp_result.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/result.go b/result.go index 4fe891e..b32063b 100644 --- a/result.go +++ b/result.go @@ -97,7 +97,7 @@ func (r *Result) MustScan(cnt int, dest ...interface{}) error { return err } if cnt != r.scanCount { - return errors.New(fmt.Sprintf("Worng scan count, expected %d, actual %d.", cnt, r.scanCount)) + return errors.New(fmt.Sprintf("Wrong scan count, expected %d, actual %d.", cnt, r.scanCount)) } return nil } @@ -145,7 +145,7 @@ func (r *Result) scanStruct(s *reflect.Value) error { if f.IsValid() { if f.CanSet() { if err := convertAssign(f.Addr().Interface(), r.Rows[r.currentRow][i]); err != nil { - return err + return fmt.Errorf("%s, column '%s'", err.Error(), col.Name) } r.scanCount++ } diff --git a/sp_result.go b/sp_result.go index 64c6c8f..8bfba8a 100644 --- a/sp_result.go +++ b/sp_result.go @@ -89,7 +89,7 @@ func (r *SpResult) Next() bool { return rst.Next() } -//Sacaning output parameters of stored procedure +//Scanning output parameters of stored procedure func (r *SpResult) ParamScan(values ...interface{}) error { outputValues := make([]interface{}, len(r.outputParams)) for i := 0; i < len(r.outputParams); i++ {