Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: recover from closed connection #17249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DBClient interface {
Commit() error
Rollback() error
Close()
IsClosed() bool
ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error)
SupportsCapability(capability capabilities.FlavorCapability) (bool, error)
Expand Down Expand Up @@ -125,6 +126,10 @@ func (dc *dbClientImpl) Close() {
dc.dbConn.Close()
}

func (dc *dbClientImpl) IsClosed() bool {
return dc.dbConn.IsClosed()
}

func (dc *dbClientImpl) SupportsCapability(capability capabilities.FlavorCapability) (bool, error) {
return dc.dbConn.SupportsCapability(capability)
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (dc *fakeDBClient) Rollback() error {
func (dc *fakeDBClient) Close() {
}

func (dc *fakeDBClient) IsClosed() bool {
return false
}

func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
query = strings.ToLower(query)
switch {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (dc *MockDBClient) Rollback() error {
func (dc *MockDBClient) Close() {
}

func (dc *MockDBClient) IsClosed() bool {
return false
}

// ExecuteFetch is part of the DBClient interface
func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
// Serialize ExecuteFetch to enforce a strict order on shared dbClients.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ func (dbc *realDBClient) Close() {
dbc.conn.Close()
}

func (dbc *realDBClient) IsClosed() bool {
return dbc.conn.IsClosed()
}

func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
// Use Clone() because the contents of memory region referenced by
// string can change when clients (e.g. vcopier) use unsafe string methods.
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ func (dbc *realDBClient) Close() {
dbc.conn.Close()
}

func (dbc *realDBClient) IsClosed() bool {
return dbc.conn.IsClosed()
}

func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
// Use Clone() because the contents of memory region referenced by
// string can change when clients (e.g. vcopier) use unsafe string methods.
Expand Down
13 changes: 10 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,18 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
// code.
func (vr *vreplicator) Replicate(ctx context.Context) error {
err := vr.replicate(ctx)
if err != nil {
if err := vr.setMessage(err.Error()); err != nil {
binlogplayer.LogError("Failed to set error state", err)
if err == nil {
return nil
}
if vr.dbClient.IsClosed() {
// Connection was possible terminated by the server. We should renew it.
if cerr := vr.dbClient.Connect(); cerr != nil {
return vterrors.Wrapf(err, "failed to reconnect to the database: %v", cerr)
Comment on lines +192 to +195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check if the context was not canceled before trying to reconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation -- but not in this case. We absolutely must try and issue vr.setMessage(err.Error()) so as to persist the error and the state of the stream.

}
}
if err := vr.setMessage(err.Error()); err != nil {
binlogplayer.LogError("Failed to set error state", err)
}
return err
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (dc *fakeDBClient) Rollback() error {
func (dc *fakeDBClient) Close() {
}

func (dc *fakeDBClient) IsClosed() bool {
return false
}

// ExecuteFetch is part of the DBClient interface
func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
dc.mu.Lock()
Expand Down
Loading