Skip to content

Commit

Permalink
1. Fixed mongo goroutine leak
Browse files Browse the repository at this point in the history
2. Fixed linters error
  • Loading branch information
maranqz committed Feb 4, 2024
1 parent c878cfb commit f2121c3
Show file tree
Hide file tree
Showing 28 changed files with 420 additions and 234 deletions.
3 changes: 2 additions & 1 deletion gorm/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
// Example demonstrates the implementation of the Repository pattern by trm.Manager.
func Example() {
db, err := gorm.Open(sqlite.Open("file:test.db?mode=memory"))

checkErr(err)

sqlDB, err := db.DB()
checkErr(err)

defer sqlDB.Close()

// Migrate the schema
Expand Down
2 changes: 1 addition & 1 deletion gorm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewDefaultFactory(db *gorm.DB) trm.TrFactory {
s, _ := trms.(Settings)

// TODO do update TRM config by settings gorm nested transaction
//db.DisableNestedTransaction = true
// db.DisableNestedTransaction = true

return NewTransaction(ctx, s.TxOpts(), db)
}
Expand Down
47 changes: 24 additions & 23 deletions gorm/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (

// Transaction is trm.Transaction for sqlx.Tx.
type Transaction struct {
tx *gorm.DB
txMutex sync.Mutex
active *drivers.IsClose
activeClosure *drivers.IsClose
tx *gorm.DB
txMutex sync.Mutex
isClosed *drivers.IsClosed
isClosedClosure *drivers.IsClosed
}

// NewTransaction creates trm.Transaction for sqlx.Tx.
Expand All @@ -31,10 +31,10 @@ func NewTransaction(
db *gorm.DB,
) (context.Context, *Transaction, error) {
t := &Transaction{
tx: nil,
txMutex: sync.Mutex{},
active: drivers.NewIsClosed(),
activeClosure: drivers.NewIsClosed(),
tx: nil,
txMutex: sync.Mutex{},
isClosed: drivers.NewIsClosed(),
isClosedClosure: drivers.NewIsClosed(),
}

var err error
Expand All @@ -50,9 +50,9 @@ func NewTransaction(

wg.Done()

<-t.activeClosure.Closed()
<-t.isClosedClosure.Closed()

return t.activeClosure.Err()
return t.isClosedClosure.Err()
}, opts)

t.txMutex.Lock()
Expand All @@ -67,7 +67,7 @@ func NewTransaction(
err = t.tx.Error
}

t.active.CloseWithCause(err)
t.isClosed.CloseWithCause(err)
} else {
wg.Done()
}
Expand All @@ -92,8 +92,8 @@ func (t *Transaction) awaitDone(ctx context.Context) {
select {
case <-ctx.Done():
// Rollback will be called by context.Err()
t.activeClosure.Close()
case <-t.active.Closed():
t.isClosedClosure.Close()
case <-t.isClosed.Closed():
}
}

Expand All @@ -114,34 +114,34 @@ func (t *Transaction) Begin(ctx context.Context, s trm.Settings) (context.Contex
// Commit closes the trm.Transaction.
func (t *Transaction) Commit(_ context.Context) error {
select {
case <-t.active.Closed():
case <-t.isClosed.Closed():
t.txMutex.Lock()
defer t.txMutex.Unlock()

return t.tx.Commit().Error
default:
t.activeClosure.Close()
t.isClosedClosure.Close()

<-t.active.Closed()
<-t.isClosed.Closed()

return t.active.Err()
return t.isClosed.Err()
}
}

// Rollback the trm.Transaction.
func (t *Transaction) Rollback(_ context.Context) error {
select {
case <-t.active.Closed():
case <-t.isClosed.Closed():
t.txMutex.Lock()
defer t.txMutex.Unlock()

return t.tx.Rollback().Error
default:
t.activeClosure.CloseWithCause(drivers.ErrRollbackTr)
t.isClosedClosure.CloseWithCause(drivers.ErrRollbackTr)

<-t.active.Closed()
<-t.isClosed.Closed()

err := t.active.Err()
err := t.isClosed.Err()
if errors.Is(err, drivers.ErrRollbackTr) {
return nil
}
Expand All @@ -152,9 +152,10 @@ func (t *Transaction) Rollback(_ context.Context) error {

// IsActive returns true if the transaction started but not committed or rolled back.
func (t *Transaction) IsActive() bool {
return t.active.IsActive()
return t.isClosed.IsActive()
}

// Closed returns a channel that's closed when transaction committed or rolled back.
func (t *Transaction) Closed() <-chan struct{} {
return t.active.Closed()
return t.isClosed.Closed()
}
3 changes: 2 additions & 1 deletion gorm/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,11 @@ func TestTransaction_awaitDone_byRollback(t *testing.T) {
require.NoError(t, err)

f := NewDefaultFactory(dbgorm)
ctx, _ := context.WithCancel(context.Background())
ctx, _ := context.WithCancel(context.Background()) //nolint:govet

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()

Expand Down
6 changes: 6 additions & 0 deletions internal/test/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import (
"github.com/stretchr/testify/require"
)

// NewDBMock returns a new sql.DB and sqlmock.
//
//nolint:ireturn
func NewDBMock() (*sql.DB, sqlmock.Sqlmock) {
db, dbmock, _ := sqlmock.New()

return db, dbmock
}

// NewDBMockWithClose returns a new sql.DB and sqlmock, and close it after test.
//
//nolint:ireturn
func NewDBMockWithClose(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
db, dbmock := NewDBMock()

Expand Down
26 changes: 15 additions & 11 deletions mongo/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package mongo

import (
"context"
"sync/atomic"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/avito-tech/go-transaction-manager/trm/drivers"
)

// Transaction is trm.Transaction for mongo.Client.
type Transaction struct {
session mongo.Session
isActive int64
isClosed *drivers.IsClosed
}

// NewTransaction creates trm.Transaction for mongo.Client.
Expand All @@ -33,7 +34,7 @@ func NewTransaction(
return ctx, nil, err
}

tr := &Transaction{session: s, isActive: 1}
tr := &Transaction{session: s, isClosed: drivers.NewIsClosed()}

go tr.awaitDone(ctx)

Expand All @@ -45,9 +46,11 @@ func (t *Transaction) awaitDone(ctx context.Context) {
return
}

<-ctx.Done()

t.deactivate()
select {
case <-ctx.Done():
t.isClosed.Close()
case <-t.isClosed.Closed():
}
}

// Transaction returns the real transaction mongo.Session.
Expand All @@ -57,7 +60,7 @@ func (t *Transaction) Transaction() interface{} {

// Commit the trm.Transaction.
func (t *Transaction) Commit(ctx context.Context) error {
defer t.deactivate()
defer t.isClosed.Close()

defer t.session.EndSession(ctx)

Expand All @@ -66,7 +69,7 @@ func (t *Transaction) Commit(ctx context.Context) error {

// Rollback the trm.Transaction.
func (t *Transaction) Rollback(ctx context.Context) error {
defer t.deactivate()
defer t.isClosed.Close()

defer t.session.EndSession(ctx)

Expand All @@ -75,9 +78,10 @@ func (t *Transaction) Rollback(ctx context.Context) error {

// IsActive returns true if the transaction started but not committed or rolled back.
func (t *Transaction) IsActive() bool {
return atomic.LoadInt64(&t.isActive) == 1
return t.isClosed.IsActive()
}

func (t *Transaction) deactivate() {
atomic.SwapInt64(&t.isActive, 0)
// Closed returns a channel that's closed when transaction committed or rolled back.
func (t *Transaction) Closed() <-chan struct{} {
return t.isClosed.Closed()
}
8 changes: 6 additions & 2 deletions mongo/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ func TestTransaction(t *testing.T) {
manager.WithSettings(f.settings),
)

var tr Transaction
var tr trm.Transaction
err := m.Do(tt.args.ctx, func(ctx context.Context) error {
tr = trmcontext.DefaultManager.Default(ctx)

var trNested trm.Transaction
err := m.Do(ctx, func(ctx context.Context) error {
trNested = trmcontext.DefaultManager.Default(ctx)
Expand All @@ -186,7 +188,9 @@ func TestTransaction(t *testing.T) {

return err
})
require.False(t, false, tr.IsActive())
if tr != nil {
require.False(t, tr.IsActive())
}

if !tt.wantErr(t, err) {
return
Expand Down
27 changes: 15 additions & 12 deletions pgxv4/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ package pgxv4

import (
"context"
"sync/atomic"

"github.com/jackc/pgx/v4"

"github.com/avito-tech/go-transaction-manager/trm"
"github.com/avito-tech/go-transaction-manager/trm/drivers"
)

// Transaction is trm.Transaction for pgx.Tx.
type Transaction struct {
tx pgx.Tx
isActive int64
isClosed *drivers.IsClosed
}

// NewTransaction creates trm.Transaction for pgx.Tx.
Expand All @@ -31,7 +31,7 @@ func NewTransaction(

tr := &Transaction{
tx: tx,
isActive: 1,
isClosed: drivers.NewIsClosed(),
}

go tr.awaitDone(ctx)
Expand All @@ -44,9 +44,11 @@ func (t *Transaction) awaitDone(ctx context.Context) {
return
}

<-ctx.Done()

t.deactivate()
select {
case <-ctx.Done():
t.isClosed.Close()
case <-t.isClosed.Closed():
}
}

// Transaction returns the real transaction pgx.Tx.
Expand All @@ -63,31 +65,32 @@ func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Contex

tr := &Transaction{
tx: tx,
isActive: 1,
isClosed: drivers.NewIsClosed(),
}

return ctx, tr, nil
}

// Commit the trm.Transaction.
func (t *Transaction) Commit(ctx context.Context) error {
defer t.deactivate()
defer t.isClosed.Close()

return t.tx.Commit(ctx)
}

// Rollback the trm.Transaction.
func (t *Transaction) Rollback(ctx context.Context) error {
defer t.deactivate()
defer t.isClosed.Close()

return t.tx.Rollback(ctx)
}

// IsActive returns true if the transaction started but not committed or rolled back.
func (t *Transaction) IsActive() bool {
return atomic.LoadInt64(&t.isActive) == 1
return t.isClosed.IsActive()
}

func (t *Transaction) deactivate() {
atomic.SwapInt64(&t.isActive, 0)
// Closed returns a channel that's closed when transaction committed or rolled back.
func (t *Transaction) Closed() <-chan struct{} {
return t.isClosed.Closed()
}
Loading

0 comments on commit f2121c3

Please sign in to comment.