From 1f0f6ed0f089c303561b03f9bbe0a58547b34651 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 18 Nov 2024 12:41:40 +0200 Subject: [PATCH 1/2] DBClient.IsClosed() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/binlog/binlogplayer/dbclient.go | 5 +++++ go/vt/binlog/binlogplayer/fake_dbclient.go | 4 ++++ go/vt/binlog/binlogplayer/mock_dbclient.go | 4 ++++ go/vt/vttablet/tabletmanager/vdiff/framework_test.go | 4 ++++ go/vt/vttablet/tabletmanager/vreplication/framework_test.go | 4 ++++ go/vt/wrangler/fake_dbclient_test.go | 4 ++++ 6 files changed, 25 insertions(+) diff --git a/go/vt/binlog/binlogplayer/dbclient.go b/go/vt/binlog/binlogplayer/dbclient.go index c3463b4cc2c..4cbfd962528 100644 --- a/go/vt/binlog/binlogplayer/dbclient.go +++ b/go/vt/binlog/binlogplayer/dbclient.go @@ -40,6 +40,7 @@ type DBClient interface { Commit() error Rollback() error Close() + IsClosed() bool ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error) SupportsCapability(capability capabilities.FlavorCapability) (bool, error) @@ -125,6 +126,10 @@ func (dc *dbClientImpl) Close() { dc.dbConn.Close() } +func (dc *dbClientImpl) IsClosed() bool { + return dc.dbConn.IsClosed() +} + func (dc *dbClientImpl) SupportsCapability(capability capabilities.FlavorCapability) (bool, error) { return dc.dbConn.SupportsCapability(capability) } diff --git a/go/vt/binlog/binlogplayer/fake_dbclient.go b/go/vt/binlog/binlogplayer/fake_dbclient.go index 234dfd528e0..69bbd06f7c6 100644 --- a/go/vt/binlog/binlogplayer/fake_dbclient.go +++ b/go/vt/binlog/binlogplayer/fake_dbclient.go @@ -56,6 +56,10 @@ func (dc *fakeDBClient) Rollback() error { func (dc *fakeDBClient) Close() { } +func (dc *fakeDBClient) IsClosed() bool { + return false +} + func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) { query = strings.ToLower(query) switch { diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index f0a811a30cf..12005a16c2e 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -181,6 +181,10 @@ func (dc *MockDBClient) Rollback() error { func (dc *MockDBClient) Close() { } +func (dc *MockDBClient) IsClosed() bool { + return false +} + // ExecuteFetch is part of the DBClient interface func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) { // Serialize ExecuteFetch to enforce a strict order on shared dbClients. diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 33a0da8e23f..7d4cdb78c20 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -396,6 +396,10 @@ func (dbc *realDBClient) Close() { dbc.conn.Close() } +func (dbc *realDBClient) IsClosed() bool { + return dbc.conn.IsClosed() +} + func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { // Use Clone() because the contents of memory region referenced by // string can change when clients (e.g. vcopier) use unsafe string methods. diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 12d20e3a867..fe8b62d3cef 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -479,6 +479,10 @@ func (dbc *realDBClient) Close() { dbc.conn.Close() } +func (dbc *realDBClient) IsClosed() bool { + return dbc.conn.IsClosed() +} + func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { // Use Clone() because the contents of memory region referenced by // string can change when clients (e.g. vcopier) use unsafe string methods. diff --git a/go/vt/wrangler/fake_dbclient_test.go b/go/vt/wrangler/fake_dbclient_test.go index 14ef0913383..02ee79210d7 100644 --- a/go/vt/wrangler/fake_dbclient_test.go +++ b/go/vt/wrangler/fake_dbclient_test.go @@ -153,6 +153,10 @@ func (dc *fakeDBClient) Rollback() error { func (dc *fakeDBClient) Close() { } +func (dc *fakeDBClient) IsClosed() bool { + return false +} + // ExecuteFetch is part of the DBClient interface func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { dc.mu.Lock() From 4c9eb0381616b806eaf28afa690e31327e8b7a68 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 18 Nov 2024 12:42:25 +0200 Subject: [PATCH 2/2] vreplicator: check connection after error; if connection is closed, reopen it Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/vreplication/vreplicator.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 0c5c0b5b334..9ec274ab0ea 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -186,11 +186,18 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer // code. func (vr *vreplicator) Replicate(ctx context.Context) error { err := vr.replicate(ctx) - if err != nil { - if err := vr.setMessage(err.Error()); err != nil { - binlogplayer.LogError("Failed to set error state", err) + if err == nil { + return nil + } + if vr.dbClient.IsClosed() { + // Connection was possible terminated by the server. We should renew it. + if cerr := vr.dbClient.Connect(); cerr != nil { + return vterrors.Wrapf(err, "failed to reconnect to the database: %v", cerr) } } + if err := vr.setMessage(err.Error()); err != nil { + binlogplayer.LogError("Failed to set error state", err) + } return err }