Skip to content

Commit

Permalink
branch-3.0: [fix](suites) Fix atomic restore alter suite with master_…
Browse files Browse the repository at this point in the history
…sql #46550 (#46652)

Cherry-picked from #46550

Co-authored-by: walter <[email protected]>
  • Loading branch information
github-actions[bot] and w41ter authored Jan 9, 2025
1 parent 008df5f commit 5afdf61
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ class Suite implements GroovyInterceptable {
}
}

List<List<Object>> master_sql(String sqlStr, boolean isOrder = false) {
return sql_impl(context.getMasterConnection(), sqlStr, isOrder)
}

List<List<Object>> multi_sql(String sqlStr, boolean isOrder = false) {
String[] sqls = sqlStr.split(";")
def result = new ArrayList<Object>();
Expand Down Expand Up @@ -513,6 +517,10 @@ class Suite implements GroovyInterceptable {
return sql_return_maparray_impl(sqlStr, context.getConnection())
}

def master_sql_return_maparray(String sqlStr) {
return sql_return_maparray_impl(sqlStr, context.getMasterConnection())
}

def arrow_flight_sql_return_maparray(String sqlStr) {
return sql_return_maparray_impl((String) ("USE ${context.dbName};" + sqlStr), context.getArrowFlightSqlConnection())
}
Expand Down Expand Up @@ -1238,6 +1246,8 @@ class Suite implements GroovyInterceptable {
tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), (PreparedStatement) arg)
} else if (tag.contains("target_sql")) {
tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (PreparedStatement) arg)
} else if (tag.contains("master_sql")) {
tupleResult = JdbcUtils.executeToStringList(context.getMasterConnection(), (PreparedStatement) arg)
} else {
tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (PreparedStatement) arg)
}
Expand All @@ -1251,6 +1261,8 @@ class Suite implements GroovyInterceptable {
(String) ("USE ${context.dbName};" + (String) arg))
} else if (tag.contains("target_sql")) {
tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (String) arg)
} else if (tag.contains("master_sql")) {
tupleResult = JdbcUtils.executeToStringList(context.getMasterConnection(), (PreparedStatement) arg)
} else {
tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (String) arg)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SuiteContext implements Closeable {
public final String group
public final String dbName
public final ThreadLocal<ConnectionInfo> threadLocalConn = new ThreadLocal<>()
public final ThreadLocal<ConnectionInfo> threadLocalMasterConn = new ThreadLocal<>()
public final ThreadLocal<ConnectionInfo> threadArrowFlightSqlConn = new ThreadLocal<>()
public final ThreadLocal<Connection> threadHive2DockerConn = new ThreadLocal<>()
public final ThreadLocal<Connection> threadHive3DockerConn = new ThreadLocal<>()
Expand Down Expand Up @@ -145,13 +146,26 @@ class SuiteContext implements Closeable {
if (threadConnInfo == null) {
threadConnInfo = new ConnectionInfo()
threadConnInfo.conn = config.getConnectionByDbName(dbName)
threadConnInfo.username = config.jdbcUser
threadConnInfo.username = config.jdbcUser
threadConnInfo.password = config.jdbcPassword
threadLocalConn.set(threadConnInfo)
}
return threadConnInfo.conn
}

// like getConnection, but connect to FE master
Connection getMasterConnection() {
def threadConnInfo = threadLocalMasterConn.get()
if (threadConnInfo == null) {
threadConnInfo = new ConnectionInfo()
threadConnInfo.conn = getMasterConnectionByDbName(dbName)
threadConnInfo.username = config.jdbcUser
threadConnInfo.password = config.jdbcPassword
threadLocalMasterConn.set(threadConnInfo)
}
return threadConnInfo.conn
}

Connection getArrowFlightSqlConnection() {
def threadConnInfo = threadArrowFlightSqlConn.get()
if (threadConnInfo == null) {
Expand Down Expand Up @@ -316,6 +330,27 @@ class SuiteContext implements Closeable {
}
}

Connection getMasterConnectionByDbName(String dbName) {
def result = JdbcUtils.executeToMapArray(getConnection(), "SHOW FRONTENDS")
def master = null
for (def row : result) {
if (row.IsMaster == "true") {
master = row
break
}
}
if (master) {
log.info("master found: ${master.Host}:${master.HttpPort}")
def url = Config.buildUrlWithDb(master.Host as String, master.QueryPort as Integer, dbName)
def username = config.jdbcUser
def password = config.jdbcPassword

return DriverManager.getConnection(url, username, password)
} else {
throw new Exception("No master found to reconnect")
}
}

def reconnectToMasterFe = { ->
log.info("Reconnecting to a new master frontend...")
def result = JdbcUtils.executeToMapArray(getConnection(), "SHOW FRONTENDS")
Expand Down Expand Up @@ -468,6 +503,16 @@ class SuiteContext implements Closeable {
}
}

ConnectionInfo master_conn = threadLocalMasterConn.get()
if (master_conn != null) {
threadLocalMasterConn.remove()
try {
master_conn.conn.close()
} catch (Throwable t) {
log.warn("Close master connection failed", t)
}
}

ConnectionInfo arrow_flight_sql_conn = threadArrowFlightSqlConn.get()
if (arrow_flight_sql_conn != null) {
threadArrowFlightSqlConn.remove()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ suite("test_backup_restore_atomic_with_alter", "backup_restore") {
sql "SYNC"

// 0. table_1 has in_atomic_restore property
def show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """
def show_result = master_sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """
logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}")
assertTrue(show_result[0][1].contains("in_atomic_restore"))

Expand Down Expand Up @@ -230,7 +230,7 @@ suite("test_backup_restore_atomic_with_alter", "backup_restore") {
sql "SYNC"

// 5. The restore job is cancelled, the in_atomic_restore property has been removed.
show_result = sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """
show_result = master_sql """ SHOW CREATE TABLE ${dbName}.${tableNamePrefix}_1 """
logger.info("SHOW CREATE TABLE ${tableNamePrefix}_1: ${show_result}")
assertFalse(show_result[0][1].contains("in_atomic_restore"))

Expand Down

0 comments on commit 5afdf61

Please sign in to comment.