diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index ffebf2ead1505f..8a27fc0be44614 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -83,1246 +83,1247 @@ import org.junit.Assert @Slf4j class Suite implements GroovyInterceptable { - final SuiteContext context - final SuiteCluster cluster - final DebugPoint debugPoint - - final String name - final String group - final Logger logger = LoggerFactory.getLogger(this.class) - static final Logger staticLogger = LoggerFactory.getLogger(Suite.class) - - // set this in suite to determine which hive docker to use - String hivePrefix = "hive2" - - final List successCallbacks = new Vector<>() - final List failCallbacks = new Vector<>() - final List finishCallbacks = new Vector<>() - final List lazyCheckExceptions = new Vector<>() - final List lazyCheckFutures = new Vector<>() - static Boolean isTrinoConnectorDownloaded = false - - Suite(String name, String group, SuiteContext context, SuiteCluster cluster) { - this.name = name - this.group = group - this.context = context - this.cluster = cluster; - this.debugPoint = new DebugPoint(this) - } +final SuiteContext context +final SuiteCluster cluster +final DebugPoint debugPoint + +final String name +final String group +final Logger logger = LoggerFactory.getLogger(this.class) +static final Logger staticLogger = LoggerFactory.getLogger(Suite.class) + +// set this in suite to determine which hive docker to use +String hivePrefix = "hive2" + +final List successCallbacks = new Vector<>() +final List failCallbacks = new Vector<>() +final List finishCallbacks = new Vector<>() +final List lazyCheckExceptions = new Vector<>() +final List lazyCheckFutures = new Vector<>() +static Boolean isTrinoConnectorDownloaded = false + +Suite(String name, String group, SuiteContext context, SuiteCluster cluster) { + this.name = name + this.group = group + this.context = context + this.cluster = cluster; + this.debugPoint = new DebugPoint(this) +} - String getConf(String key, String defaultValue = null) { - String value = context.config.otherConfigs.get(key) - return value == null ? defaultValue : value - } +String getConf(String key, String defaultValue = null) { + String value = context.config.otherConfigs.get(key) + return value == null ? defaultValue : value +} - String getSuiteConf(String key, String defaultValue = null) { - return getConf("suites." + name + "." + key, defaultValue) - } +String getSuiteConf(String key, String defaultValue = null) { + return getConf("suites." + name + "." + key, defaultValue) +} - Properties getConfs(String prefix) { - Properties p = new Properties() - for (String name : context.config.otherConfigs.stringPropertyNames()) { - if (name.startsWith(prefix + ".")) { - p.put(name.substring(prefix.length() + 1), context.config.getProperty(name)) - } +Properties getConfs(String prefix) { + Properties p = new Properties() + for (String name : context.config.otherConfigs.stringPropertyNames()) { + if (name.startsWith(prefix + ".")) { + p.put(name.substring(prefix.length() + 1), context.config.getProperty(name)) } - return p } + return p +} - void onSuccess(Closure callback) { - successCallbacks.add(callback) - } +void onSuccess(Closure callback) { + successCallbacks.add(callback) +} - void onFail(Closure callback) { - failCallbacks.add(callback) - } +void onFail(Closure callback) { + failCallbacks.add(callback) +} - void onFinish(Closure callback) { - finishCallbacks.add(callback) - } +void onFinish(Closure callback) { + finishCallbacks.add(callback) +} - LongStream range(long startInclusive, long endExclusive) { - return LongStream.range(startInclusive, endExclusive) - } +LongStream range(long startInclusive, long endExclusive) { + return LongStream.range(startInclusive, endExclusive) +} - LongStream rangeClosed(long startInclusive, long endInclusive) { - return LongStream.rangeClosed(startInclusive, endInclusive) - } +LongStream rangeClosed(long startInclusive, long endInclusive) { + return LongStream.rangeClosed(startInclusive, endInclusive) +} - String toCsv(List rows) { - StringBuilder sb = new StringBuilder() - for (int i = 0; i < rows.size(); ++i) { - Object row = rows.get(i) - if (!(row instanceof List)) { - row = ImmutableList.of(row) - } - sb.append(OutputUtils.toCsvString(row as List)).append("\n") +String toCsv(List rows) { + StringBuilder sb = new StringBuilder() + for (int i = 0; i < rows.size(); ++i) { + Object row = rows.get(i) + if (!(row instanceof List)) { + row = ImmutableList.of(row) } - sb.toString() - } - - Object parseJson(String str) { - def jsonSlurper = new JsonSlurper() - return jsonSlurper.parseText(str) + sb.append(OutputUtils.toCsvString(row as List)).append("\n") } + sb.toString() +} - public T lazyCheck(Closure closure) { - try { - T result = closure.call() - if (result instanceof Future) { - lazyCheckFutures.add(result) - } - return result - } catch (Throwable t) { - lazyCheckExceptions.add(t) - return null - } - } +Object parseJson(String str) { + def jsonSlurper = new JsonSlurper() + return jsonSlurper.parseText(str) +} - void doLazyCheck() { - if (!lazyCheckExceptions.isEmpty()) { - throw lazyCheckExceptions.get(0) +public T lazyCheck(Closure closure) { + try { + T result = closure.call() + if (result instanceof Future) { + lazyCheckFutures.add(result) } - lazyCheckFutures.forEach { it.get() } + return result + } catch (Throwable t) { + lazyCheckExceptions.add(t) + return null } +} - public Tuple2 timer(Closure actionSupplier) { - return SuiteUtils.timer(actionSupplier) +void doLazyCheck() { + if (!lazyCheckExceptions.isEmpty()) { + throw lazyCheckExceptions.get(0) } + lazyCheckFutures.forEach { it.get() } +} - public ListenableFuture extraThread( - String threadName = null, boolean daemon = false, Closure actionSupplier) { - def executorService = Executors.newFixedThreadPool(1, new ThreadFactory() { - @Override - Thread newThread(@NotNull Runnable r) { - def thread = new Thread(r, name) - thread.setDaemon(daemon) - return thread - } - }) +public Tuple2 timer(Closure actionSupplier) { + return SuiteUtils.timer(actionSupplier) +} - try { - def connInfo = context.threadLocalConn.get() - return MoreExecutors.listeningDecorator(executorService).submit( - buildThreadCallable(threadName, connInfo, actionSupplier) - ) - } finally { - executorService.shutdown() +public ListenableFuture extraThread( + String threadName = null, boolean daemon = false, Closure actionSupplier) { + def executorService = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + Thread newThread(@NotNull Runnable r) { + def thread = new Thread(r, name) + thread.setDaemon(daemon) + return thread } - } + }) - public ListenableFuture thread(String threadName = null, Closure actionSupplier) { + try { def connInfo = context.threadLocalConn.get() - return MoreExecutors.listeningDecorator(context.actionExecutors).submit( + return MoreExecutors.listeningDecorator(executorService).submit( buildThreadCallable(threadName, connInfo, actionSupplier) ) + } finally { + executorService.shutdown() } +} - private Callable buildThreadCallable(String threadName, ConnectionInfo connInfo, Closure actionSupplier) { - return new Callable() { - @Override - T call() throws Exception { - long startTime = System.currentTimeMillis() - def originThreadName = Thread.currentThread().name - try { - Thread.currentThread().setName(threadName == null ? originThreadName : threadName) - if (connInfo != null) { - context.connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password); - } - context.scriptContext.eventListeners.each { it.onThreadStarted(context) } +public ListenableFuture thread(String threadName = null, Closure actionSupplier) { + def connInfo = context.threadLocalConn.get() + return MoreExecutors.listeningDecorator(context.actionExecutors).submit( + buildThreadCallable(threadName, connInfo, actionSupplier) + ) +} + +private Callable buildThreadCallable(String threadName, ConnectionInfo connInfo, Closure actionSupplier) { + return new Callable() { + @Override + T call() throws Exception { + long startTime = System.currentTimeMillis() + def originThreadName = Thread.currentThread().name + try { + Thread.currentThread().setName(threadName == null ? originThreadName : threadName) + if (connInfo != null) { + context.connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password); + } + context.scriptContext.eventListeners.each { it.onThreadStarted(context) } - return actionSupplier.call() + return actionSupplier.call() + } catch (Throwable t) { + context.scriptContext.eventListeners.each { it.onThreadFailed(context, t) } + throw t + } finally { + try { + context.closeThreadLocal() } catch (Throwable t) { - context.scriptContext.eventListeners.each { it.onThreadFailed(context, t) } - throw t - } finally { - try { - context.closeThreadLocal() - } catch (Throwable t) { - logger.warn("Close thread local context failed", t) - } - long finishTime = System.currentTimeMillis() - context.scriptContext.eventListeners.each { it.onThreadFinished(context, finishTime - startTime) } - Thread.currentThread().setName(originThreadName) + logger.warn("Close thread local context failed", t) } + long finishTime = System.currentTimeMillis() + context.scriptContext.eventListeners.each { it.onThreadFinished(context, finishTime - startTime) } + Thread.currentThread().setName(originThreadName) } - }; - } - - public ListenableFuture lazyCheckThread(String threadName = null, Closure actionSupplier) { - return lazyCheck { - thread(threadName, actionSupplier) } - } + }; +} - public ListenableFuture combineFutures(ListenableFuture ... futures) { - return Futures.allAsList(futures) +public ListenableFuture lazyCheckThread(String threadName = null, Closure actionSupplier) { + return lazyCheck { + thread(threadName, actionSupplier) } +} - public ListenableFuture> combineFutures(Iterable> futures) { - return Futures.allAsList(futures) - } +public ListenableFuture combineFutures(ListenableFuture ... futures) { + return Futures.allAsList(futures) +} - public T connect(String user = context.config.jdbcUser, String password = context.config.jdbcPassword, - String url = context.config.jdbcUrl, Closure actionSupplier) { - return context.connect(user, password, url, actionSupplier) - } +public ListenableFuture> combineFutures(Iterable> futures) { + return Futures.allAsList(futures) +} - public T connectInDocker(String user = context.config.jdbcUser, String password = context.config.jdbcPassword, - Closure actionSupplier) { - def connInfo = context.threadLocalConn.get() - return context.connect(user, password, connInfo.conn.getMetaData().getURL(), actionSupplier) - } +public T connect(String user = context.config.jdbcUser, String password = context.config.jdbcPassword, + String url = context.config.jdbcUrl, Closure actionSupplier) { + return context.connect(user, password, url, actionSupplier) +} - public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1, Closure actionSupplier) { - def connInfo = context.threadLocalConn.get() - Awaitility.await().atMost(atMostSeconds, SECONDS).pollInterval(intervalSecond, SECONDS).until( - { - connect(connInfo.username, connInfo.password, connInfo.conn.getMetaData().getURL(), actionSupplier) - } - ) - } +public T connectInDocker(String user = context.config.jdbcUser, String password = context.config.jdbcPassword, + Closure actionSupplier) { + def connInfo = context.threadLocalConn.get() + return context.connect(user, password, connInfo.conn.getMetaData().getURL(), actionSupplier) +} - // more explaination can see example file: demo_p0/docker_action.groovy - public void docker(ClusterOptions options = new ClusterOptions(), Closure actionSupplier) throws Exception { - if (context.config.excludeDockerTest) { - return +public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1, Closure actionSupplier) { + def connInfo = context.threadLocalConn.get() + Awaitility.await().atMost(atMostSeconds, SECONDS).pollInterval(intervalSecond, SECONDS).until( + { + connect(connInfo.username, connInfo.password, connInfo.conn.getMetaData().getURL(), actionSupplier) } + ) +} - if (RegressionTest.getGroupExecType(group) != RegressionTest.GroupExecType.DOCKER) { - throw new Exception("Need to add 'docker' to docker suite's belong groups, " - + "see example demo_p0/docker_action.groovy") - } +// more explaination can see example file: demo_p0/docker_action.groovy +public void docker(ClusterOptions options = new ClusterOptions(), Closure actionSupplier) throws Exception { + if (context.config.excludeDockerTest) { + return + } - if (options.cloudMode == null) { - if (context.config.runMode == RunMode.UNKNOWN) { - dockerImpl(options, false, actionSupplier) - dockerImpl(options, true, actionSupplier) - } else { - dockerImpl(options, context.config.runMode == RunMode.CLOUD, actionSupplier) - } + if (RegressionTest.getGroupExecType(group) != RegressionTest.GroupExecType.DOCKER) { + throw new Exception("Need to add 'docker' to docker suite's belong groups, " + + "see example demo_p0/docker_action.groovy") + } + + if (options.cloudMode == null) { + if (context.config.runMode == RunMode.UNKNOWN) { + dockerImpl(options, false, actionSupplier) + dockerImpl(options, true, actionSupplier) } else { - if (options.cloudMode == true && context.config.runMode == RunMode.NOT_CLOUD) { - return - } - if (options.cloudMode == false && context.config.runMode == RunMode.CLOUD) { - return - } - dockerImpl(options, options.cloudMode, actionSupplier) + dockerImpl(options, context.config.runMode == RunMode.CLOUD, actionSupplier) + } + } else { + if (options.cloudMode == true && context.config.runMode == RunMode.NOT_CLOUD) { + return } + if (options.cloudMode == false && context.config.runMode == RunMode.CLOUD) { + return + } + dockerImpl(options, options.cloudMode, actionSupplier) } +} - private void dockerImpl(ClusterOptions options, boolean isCloud, Closure actionSupplier) throws Exception { - logger.info("=== start run suite {} in {} mode. ===", name, (isCloud ? "cloud" : "not_cloud")) - try { - cluster.destroy(true) - cluster.init(options, isCloud) - - def user = context.config.jdbcUser - def password = context.config.jdbcPassword - Frontend fe = null - for (def i=0; fe == null && i<30; i++) { - if (options.connectToFollower) { - fe = cluster.getOneFollowerFe() - } else { - fe = cluster.getMasterFe() - } - Thread.sleep(1000) +private void dockerImpl(ClusterOptions options, boolean isCloud, Closure actionSupplier) throws Exception { + logger.info("=== start run suite {} in {} mode. ===", name, (isCloud ? "cloud" : "not_cloud")) + try { + cluster.destroy(true) + cluster.init(options, isCloud) + + def user = context.config.jdbcUser + def password = context.config.jdbcPassword + Frontend fe = null + for (def i=0; fe == null && i<30; i++) { + if (options.connectToFollower) { + fe = cluster.getOneFollowerFe() + } else { + fe = cluster.getMasterFe() } + Thread.sleep(1000) + } - logger.info("get fe {}", fe) - assertNotNull(fe) - if (!isCloud) { - for (def be : cluster.getAllBackends()) { - be_report_disk(be.host, be.httpPort) - } + logger.info("get fe {}", fe) + assertNotNull(fe) + if (!isCloud) { + for (def be : cluster.getAllBackends()) { + be_report_disk(be.host, be.httpPort) } + } - // wait be report - Thread.sleep(5000) - def url = String.format( - "jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false", - fe.host, fe.queryPort) - def conn = DriverManager.getConnection(url, user, password) - def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName - logger.info("try create database if not exists {}", context.dbName) - JdbcUtils.executeToList(conn, sql) + // wait be report + Thread.sleep(5000) + def url = String.format( + "jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false", + fe.host, fe.queryPort) + def conn = DriverManager.getConnection(url, user, password) + def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName + logger.info("try create database if not exists {}", context.dbName) + JdbcUtils.executeToList(conn, sql) - url = Config.buildUrlWithDb(url, context.dbName) - logger.info("connect to docker cluster: suite={}, url={}", name, url) - connect(user, password, url, actionSupplier) - } finally { - if (!context.config.dockerEndNoKill) { - cluster.destroy(context.config.dockerEndDeleteFiles) - } + url = Config.buildUrlWithDb(url, context.dbName) + logger.info("connect to docker cluster: suite={}, url={}", name, url) + connect(user, password, url, actionSupplier) + } finally { + if (!context.config.dockerEndNoKill) { + cluster.destroy(context.config.dockerEndDeleteFiles) } } +} - String get_ccr_body(String table, String db = null) { - if (db == null) { - db = context.dbName - } +String get_ccr_body(String table, String db = null) { + if (db == null) { + db = context.dbName + } - Gson gson = new Gson() + Gson gson = new Gson() - Map srcSpec = context.getSrcSpec(db) - srcSpec.put("table", table) + Map srcSpec = context.getSrcSpec(db) + srcSpec.put("table", table) - Map destSpec = context.getDestSpec(db) - destSpec.put("table", table) + Map destSpec = context.getDestSpec(db) + destSpec.put("table", table) - Map body = Maps.newHashMap() - body.put("name", context.suiteName + "_" + db + "_" + table) - body.put("src", srcSpec) - body.put("dest", destSpec) + Map body = Maps.newHashMap() + body.put("name", context.suiteName + "_" + db + "_" + table) + body.put("src", srcSpec) + body.put("dest", destSpec) - return gson.toJson(body) - } + return gson.toJson(body) +} - Syncer getSyncer() { - return context.getSyncer(this) - } +Syncer getSyncer() { + return context.getSyncer(this) +} - List> sql_impl(Connection conn, String sqlStr, boolean isOrder = false) { - logger.info("Execute ${isOrder ? "order_" : ""}sql: ${sqlStr}".toString()) - def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) - if (isOrder) { - result = DataUtils.sortByToString(result) - } - return result +List> sql_impl(Connection conn, String sqlStr, boolean isOrder = false) { + logger.info("Execute ${isOrder ? "order_" : ""}sql: ${sqlStr}".toString()) + def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) + if (isOrder) { + result = DataUtils.sortByToString(result) } + return result +} - List> jdbc_sql(String sqlStr, boolean isOrder = false) { - return sql_impl(context.getConnection(), sqlStr, isOrder) - } +List> jdbc_sql(String sqlStr, boolean isOrder = false) { + return sql_impl(context.getConnection(), sqlStr, isOrder) +} - List> arrow_flight_sql(String sqlStr, boolean isOrder = false) { - return sql_impl(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr), isOrder) - } +List> arrow_flight_sql(String sqlStr, boolean isOrder = false) { + return sql_impl(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr), isOrder) +} - List> sql(String sqlStr, boolean isOrder = false) { - if (context.useArrowFlightSql()) { - return arrow_flight_sql(sqlStr, isOrder) - } else { - return jdbc_sql(sqlStr, isOrder) - } +List> sql(String sqlStr, boolean isOrder = false) { + if (context.useArrowFlightSql()) { + return arrow_flight_sql(sqlStr, isOrder) + } else { + return jdbc_sql(sqlStr, isOrder) } +} - List> multi_sql(String sqlStr, boolean isOrder = false) { - String[] sqls = sqlStr.split(";") - def result = new ArrayList(); - for (String query : sqls) { - if (!query.trim().isEmpty()) { - result.add(sql(query, isOrder)); - } +List> multi_sql(String sqlStr, boolean isOrder = false) { + String[] sqls = sqlStr.split(";") + def result = new ArrayList(); + for (String query : sqls) { + if (!query.trim().isEmpty()) { + result.add(sql(query, isOrder)); } - return result } + return result +} - List> arrow_flight_sql_no_prepared (String sqlStr, boolean isOrder = false){ - logger.info("Execute ${isOrder ? "order_" : ""}sql: ${sqlStr}".toString()) - def (result, meta) = JdbcUtils.executeQueryToList(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr)) - if (isOrder) { - result = DataUtils.sortByToString(result) - } - return result +List> arrow_flight_sql_no_prepared (String sqlStr, boolean isOrder = false){ + logger.info("Execute ${isOrder ? "order_" : ""}sql: ${sqlStr}".toString()) + def (result, meta) = JdbcUtils.executeQueryToList(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr)) + if (isOrder) { + result = DataUtils.sortByToString(result) } + return result +} - List> insert_into_sql_impl(Connection conn, String sqlStr, int num) { - logger.info("insert into " + num + " records") - def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) - return result - } +List> insert_into_sql_impl(Connection conn, String sqlStr, int num) { + logger.info("insert into " + num + " records") + def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) + return result +} - List> jdbc_insert_into_sql(String sqlStr, int num) { - return insert_into_sql_impl(context.getConnection(), sqlStr, num) - } +List> jdbc_insert_into_sql(String sqlStr, int num) { + return insert_into_sql_impl(context.getConnection(), sqlStr, num) +} - List> arrow_flight_insert_into_sql(String sqlStr, int num) { - return insert_into_sql_impl(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr), num) - } +List> arrow_flight_insert_into_sql(String sqlStr, int num) { + return insert_into_sql_impl(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr), num) +} - List> insert_into_sql(String sqlStr, int num) { - if (context.useArrowFlightSql()) { - return arrow_flight_insert_into_sql(sqlStr, num) - } else { - return jdbc_insert_into_sql(sqlStr, num) - } +List> insert_into_sql(String sqlStr, int num) { + if (context.useArrowFlightSql()) { + return arrow_flight_insert_into_sql(sqlStr, num) + } else { + return jdbc_insert_into_sql(sqlStr, num) } +} - def sql_return_maparray_impl(String sqlStr, Connection conn = null) { - logger.info("Execute sql: ${sqlStr}".toString()) - if (conn == null) { - conn = context.getConnection() - } - def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) +def sql_return_maparray_impl(String sqlStr, Connection conn = null) { + logger.info("Execute sql: ${sqlStr}".toString()) + if (conn == null) { + conn = context.getConnection() + } + def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) - // get all column names as list - List columnNames = new ArrayList<>() - for (int i = 0; i < meta.getColumnCount(); i++) { - columnNames.add(meta.getColumnName(i + 1)) - } + // get all column names as list + List columnNames = new ArrayList<>() + for (int i = 0; i < meta.getColumnCount(); i++) { + columnNames.add(meta.getColumnName(i + 1)) + } - // add result to res map list, each row is a map with key is column name - List> res = new ArrayList<>() - for (int i = 0; i < result.size(); i++) { - Map row = new HashMap<>() - for (int j = 0; j < columnNames.size(); j++) { - row.put(columnNames.get(j), result.get(i).get(j)) - } - res.add(row) + // add result to res map list, each row is a map with key is column name + List> res = new ArrayList<>() + for (int i = 0; i < result.size(); i++) { + Map row = new HashMap<>() + for (int j = 0; j < columnNames.size(); j++) { + row.put(columnNames.get(j), result.get(i).get(j)) } - return res; + res.add(row) } + return res; +} - String getMasterIp(Connection conn = null) { - def result = sql_return_maparray_impl("select Host, QueryPort, IsMaster from frontends();", conn) - logger.info("get master fe: ${result}") +String getMasterIp(Connection conn = null) { + def result = sql_return_maparray_impl("select Host, QueryPort, IsMaster from frontends();", conn) + logger.info("get master fe: ${result}") - def masterHost = "" - for (def row : result) { - if (row.IsMaster == "true") { - masterHost = row.Host - break - } + def masterHost = "" + for (def row : result) { + if (row.IsMaster == "true") { + masterHost = row.Host + break } - - if (masterHost == "") { - throw new Exception("can not find master fe") - } - return masterHost; } - int getMasterPort(String type = "http") { - def result = sql_return_maparray_impl("select EditLogPort,HttpPort,QueryPort,RpcPort,ArrowFlightSqlPort from frontends() where IsMaster = 'true';") - if (result.size() != 1) { - throw new RuntimeException("could not find Master in this Cluster") - } - type = type.toLowerCase() - switch (type) { - case "editlog": - return result[0].EditLogPort as int - case "http": - return result[0].HttpPort as int - case ["query", "jdbc", "mysql"]: - return result[0].QueryPort as int - case ["rpc", "thrift"]: - return result[0].RpcPort as int - case ["arrow", "arrowflight"]: - return result[0].ArrowFlightSqlPort as int - default: - throw new RuntimeException("Unknown type: '${type}', you should select one of this type:[editlog, http, mysql, thrift, arrowflight]") - } + if (masterHost == "") { + throw new Exception("can not find master fe") } + return masterHost; +} - def jdbc_sql_return_maparray(String sqlStr) { - return sql_return_maparray_impl(sqlStr, context.getConnection()) +int getMasterPort(String type = "http") { + def result = sql_return_maparray_impl("select EditLogPort,HttpPort,QueryPort,RpcPort,ArrowFlightSqlPort from frontends() where IsMaster = 'true';") + if (result.size() != 1) { + throw new RuntimeException("could not find Master in this Cluster") + } + type = type.toLowerCase() + switch (type) { + case "editlog": + return result[0].EditLogPort as int + case "http": + return result[0].HttpPort as int + case ["query", "jdbc", "mysql"]: + return result[0].QueryPort as int + case ["rpc", "thrift"]: + return result[0].RpcPort as int + case ["arrow", "arrowflight"]: + return result[0].ArrowFlightSqlPort as int + default: + throw new RuntimeException("Unknown type: '${type}', you should select one of this type:[editlog, http, mysql, thrift, arrowflight]") } +} - def arrow_flight_sql_return_maparray(String sqlStr) { - return sql_return_maparray_impl((String) ("USE ${context.dbName};" + sqlStr), context.getArrowFlightSqlConnection()) - } +def jdbc_sql_return_maparray(String sqlStr) { + return sql_return_maparray_impl(sqlStr, context.getConnection()) +} - def sql_return_maparray(String sqlStr) { - if (context.useArrowFlightSql()) { - return arrow_flight_sql_return_maparray(sqlStr) - } else { - return jdbc_sql_return_maparray(sqlStr) - } - } +def arrow_flight_sql_return_maparray(String sqlStr) { + return sql_return_maparray_impl((String) ("USE ${context.dbName};" + sqlStr), context.getArrowFlightSqlConnection()) +} - List> target_sql(String sqlStr, boolean isOrder = false) { - logger.info("Execute ${isOrder ? "order_" : ""}target_sql: ${sqlStr}".toString()) - def (result, meta) = JdbcUtils.executeToList(context.getTargetConnection(this), sqlStr) - if (isOrder) { - result = DataUtils.sortByToString(result) - } - return result +def sql_return_maparray(String sqlStr) { + if (context.useArrowFlightSql()) { + return arrow_flight_sql_return_maparray(sqlStr) + } else { + return jdbc_sql_return_maparray(sqlStr) } +} - def target_sql_return_maparray(String sqlStr, boolean isOrder = false) { - logger.info("Execute ${isOrder ? "order_" : ""}target_sql: ${sqlStr}".toString()) - def (result, meta) = JdbcUtils.executeToList(context.getTargetConnection(this), sqlStr) - if (isOrder) { - result = DataUtils.sortByToString(result) - } +List> target_sql(String sqlStr, boolean isOrder = false) { + logger.info("Execute ${isOrder ? "order_" : ""}target_sql: ${sqlStr}".toString()) + def (result, meta) = JdbcUtils.executeToList(context.getTargetConnection(this), sqlStr) + if (isOrder) { + result = DataUtils.sortByToString(result) + } + return result +} - // get all column names as list - List columnNames = new ArrayList<>() - for (int i = 0; i < meta.getColumnCount(); i++) { - columnNames.add(meta.getColumnName(i + 1)) - } +def target_sql_return_maparray(String sqlStr, boolean isOrder = false) { + logger.info("Execute ${isOrder ? "order_" : ""}target_sql: ${sqlStr}".toString()) + def (result, meta) = JdbcUtils.executeToList(context.getTargetConnection(this), sqlStr) + if (isOrder) { + result = DataUtils.sortByToString(result) + } - // add result to res map list, each row is a map with key is column name - List> res = new ArrayList<>() - for (int i = 0; i < result.size(); i++) { - Map row = new HashMap<>() - for (int j = 0; j < columnNames.size(); j++) { - row.put(columnNames.get(j), result.get(i).get(j)) - } - res.add(row) - } - return res + // get all column names as list + List columnNames = new ArrayList<>() + for (int i = 0; i < meta.getColumnCount(); i++) { + columnNames.add(meta.getColumnName(i + 1)) } - List> sql_meta(String sqlStr, boolean isOrder = false) { - logger.info("Execute ${isOrder ? "order_" : ""}sql: ${sqlStr}".toString()) - def (tmp, rsmd) = JdbcUtils.executeToList(context.getConnection(), sqlStr) - int count = rsmd.getColumnCount(); - List> result = new ArrayList<>() - for (int i = 0; i < count; i++) { - List item = new ArrayList<>() - String columnName = rsmd.getColumnName(i + 1); - int columnType = rsmd.getColumnType(i+1); - String columnTypeName = rsmd.getColumnTypeName(i+1); - item.add(columnName); - item.add(columnTypeName); - result.add(item); + // add result to res map list, each row is a map with key is column name + List> res = new ArrayList<>() + for (int i = 0; i < result.size(); i++) { + Map row = new HashMap<>() + for (int j = 0; j < columnNames.size(); j++) { + row.put(columnNames.get(j), result.get(i).get(j)) } - return result; + res.add(row) } + return res +} +List> sql_meta(String sqlStr, boolean isOrder = false) { + logger.info("Execute ${isOrder ? "order_" : ""}sql: ${sqlStr}".toString()) + def (tmp, rsmd) = JdbcUtils.executeToList(context.getConnection(), sqlStr) + int count = rsmd.getColumnCount(); + List> result = new ArrayList<>() + for (int i = 0; i < count; i++) { + List item = new ArrayList<>() + String columnName = rsmd.getColumnName(i + 1); + int columnType = rsmd.getColumnType(i+1); + String columnTypeName = rsmd.getColumnTypeName(i+1); + item.add(columnName); + item.add(columnTypeName); + result.add(item); + } + return result; +} - long getTableId(String tableName) { - def dbInfo = sql "show proc '/dbs'" - for(List row : dbInfo) { - if (row[1].equals(context.dbName)) { - def tbInfo = sql "show proc '/dbs/${row[0]}' " - for (List tb : tbInfo) { - if (tb[1].equals(tableName)) { - println(tb[0]) - return tb[0].toLong() - } + +long getTableId(String tableName) { + def dbInfo = sql "show proc '/dbs'" + for(List row : dbInfo) { + if (row[1].equals(context.dbName)) { + def tbInfo = sql "show proc '/dbs/${row[0]}' " + for (List tb : tbInfo) { + if (tb[1].equals(tableName)) { + println(tb[0]) + return tb[0].toLong() } } } } +} - long getTableId(String dbName, String tableName) { - def dbInfo = sql "show proc '/dbs'" - for(List row : dbInfo) { - if (row[1].equals(dbName)) { - def tbInfo = sql "show proc '/dbs/${row[0]}' " - for (List tb : tbInfo) { - if (tb[1].equals(tableName)) { - return tb[0].toLong() - } +long getTableId(String dbName, String tableName) { + def dbInfo = sql "show proc '/dbs'" + for(List row : dbInfo) { + if (row[1].equals(dbName)) { + def tbInfo = sql "show proc '/dbs/${row[0]}' " + for (List tb : tbInfo) { + if (tb[1].equals(tableName)) { + return tb[0].toLong() } } } } +} - String getCurDbName() { - return context.dbName - } +String getCurDbName() { + return context.dbName +} - String getCurDbConnectUrl() { - return context.config.getConnectionUrlByDbName(getCurDbName()) - } +String getCurDbConnectUrl() { + return context.config.getConnectionUrlByDbName(getCurDbName()) +} - long getDbId() { - def dbInfo = sql "show proc '/dbs'" - for(List row : dbInfo) { - if (row[1].equals(context.dbName)) { - println(row[0]) - return row[0].toLong() - } +long getDbId() { + def dbInfo = sql "show proc '/dbs'" + for(List row : dbInfo) { + if (row[1].equals(context.dbName)) { + println(row[0]) + return row[0].toLong() } } +} - long getDbId(String dbName) { - def dbInfo = sql "show proc '/dbs'" - for (List row : dbInfo) { - if (row[1].equals(dbName)) { - return row[0].toLong() - } +long getDbId(String dbName) { + def dbInfo = sql "show proc '/dbs'" + for (List row : dbInfo) { + if (row[1].equals(dbName)) { + return row[0].toLong() } } +} - long getTableVersion(long dbId, String tableName) { - def result = sql_return_maparray """show proc '/dbs/${dbId}'""" - for (def res : result) { - if(res.TableName.equals(tableName)) { - log.info(res.toString()) - return res.VisibleVersion.toLong() - } +long getTableVersion(long dbId, String tableName) { + def result = sql_return_maparray """show proc '/dbs/${dbId}'""" + for (def res : result) { + if(res.TableName.equals(tableName)) { + log.info(res.toString()) + return res.VisibleVersion.toLong() } } +} - List> order_sql(String sqlStr) { - return sql(sqlStr, true) - } +List> order_sql(String sqlStr) { + return sql(sqlStr, true) +} - List> sortRows(List> result) { - if (result == null) { - return null - } - return DataUtils.sortByToString(result) +List> sortRows(List> result) { + if (result == null) { + return null } + return DataUtils.sortByToString(result) +} - String selectUnionAll(List list) { - def toSelectString = { Object value -> - if (value == null) { - return "null" - } else if (value instanceof Number) { - return value.toString() +String selectUnionAll(List list) { + def toSelectString = { Object value -> + if (value == null) { + return "null" + } else if (value instanceof Number) { + return value.toString() + } else { + return "'${value.toString()}'".toString() + } + } + AtomicBoolean isFirst = new AtomicBoolean(true) + String sql = list.stream() + .map({ row -> + StringBuilder sb = new StringBuilder("SELECT ") + if (row instanceof List) { + if (isFirst.get()) { + String columns = row.withIndex().collect({ column, index -> + "${toSelectString(column)} AS c${index + 1}" + }).join(", ") + sb.append(columns) + isFirst.set(false) + } else { + String columns = row.collect({ column -> + "${toSelectString(column)}" + }).join(", ") + sb.append(columns) + } } else { - return "'${value.toString()}'".toString() - } - } - AtomicBoolean isFirst = new AtomicBoolean(true) - String sql = list.stream() - .map({ row -> - StringBuilder sb = new StringBuilder("SELECT ") - if (row instanceof List) { - if (isFirst.get()) { - String columns = row.withIndex().collect({ column, index -> - "${toSelectString(column)} AS c${index + 1}" - }).join(", ") - sb.append(columns) - isFirst.set(false) - } else { - String columns = row.collect({ column -> - "${toSelectString(column)}" - }).join(", ") - sb.append(columns) - } + if (isFirst.get()) { + sb.append(toSelectString(row)).append(" AS c1") + isFirst.set(false) } else { - if (isFirst.get()) { - sb.append(toSelectString(row)).append(" AS c1") - isFirst.set(false) - } else { - sb.append(toSelectString(row)) - } + sb.append(toSelectString(row)) } - return sb.toString() - }).collect(Collectors.joining("\nUNION ALL\n")) - return sql - } + } + return sb.toString() + }).collect(Collectors.joining("\nUNION ALL\n")) + return sql +} - void explain(Closure actionSupplier) { - if (context.useArrowFlightSql()) { - runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier) - } else { - runAction(new ExplainAction(context), actionSupplier) - } +void explain(Closure actionSupplier) { + if (context.useArrowFlightSql()) { + runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier) + } else { + runAction(new ExplainAction(context), actionSupplier) } +} - void profile(String tag, Closure actionSupplier) { - runAction(new ProfileAction(context, tag), actionSupplier) - } +void profile(String tag, Closure actionSupplier) { + runAction(new ProfileAction(context, tag), actionSupplier) +} - void checkNereidsExecute(String sqlString) { - sql (sqlString) - } +void checkNereidsExecute(String sqlString) { + sql (sqlString) +} - String checkNereidsExecuteWithResult(String sqlString) { - String result = sql (sqlString); - return result - } +String checkNereidsExecuteWithResult(String sqlString) { + String result = sql (sqlString); + return result +} - void createMV(String sql) { - (new CreateMVAction(context, sql)).run() - } +void createMV(String sql) { + (new CreateMVAction(context, sql)).run() +} - void createMV(String sql, String expection) { - (new CreateMVAction(context, sql, expection)).run() - } +void createMV(String sql, String expection) { + (new CreateMVAction(context, sql, expection)).run() +} - void test(Closure actionSupplier) { - runAction(new TestAction(context), actionSupplier) - } +void test(Closure actionSupplier) { + runAction(new TestAction(context), actionSupplier) +} - void benchmark(Closure actionSupplier) { - runAction(new BenchmarkAction(context), actionSupplier) - } +void benchmark(Closure actionSupplier) { + runAction(new BenchmarkAction(context), actionSupplier) +} - void waitForSchemaChangeDone(Closure actionSupplier, String insertSql = null, boolean cleanOperator = false,String tbName=null) { - runAction(new WaitForAction(context), actionSupplier) - if (ObjectUtils.isNotEmpty(insertSql)){ - sql insertSql - } else { - sql "SYNC" - } - if (cleanOperator==true){ - if (ObjectUtils.isEmpty(tbName)) throw new RuntimeException("tbName cloud not be null") - quickTest("", """ SELECT * FROM ${tbName} """, true) - sql """ DROP TABLE ${tbName} """ - } +void waitForSchemaChangeDone(Closure actionSupplier, String insertSql = null, boolean cleanOperator = false,String tbName=null) { + runAction(new WaitForAction(context), actionSupplier) + if (ObjectUtils.isNotEmpty(insertSql)){ + sql insertSql + } else { + sql "SYNC" } + if (cleanOperator==true){ + if (ObjectUtils.isEmpty(tbName)) throw new RuntimeException("tbName cloud not be null") + quickTest("", """ SELECT * FROM ${tbName} """, true) + sql """ DROP TABLE ${tbName} """ + } +} - void waitForBrokerLoadDone(String label, int timeoutInSecond = 60) { - if (timeoutInSecond < 0 || label == null) { +void waitForBrokerLoadDone(String label, int timeoutInSecond = 60) { + if (timeoutInSecond < 0 || label == null) { + return + } + var start = System.currentTimeMillis() + var timeout = timeoutInSecond * 1000 + while (System.currentTimeMillis() - start < timeout) { + def lists = sql "show load where label = '${label}'" + if (lists.isEmpty()) { return } - var start = System.currentTimeMillis() - var timeout = timeoutInSecond * 1000 - while (System.currentTimeMillis() - start < timeout) { - def lists = sql "show load where label = '${label}'" - if (lists.isEmpty()) { - return - } - def state = lists[0][2] - if ("FINISHED".equals(state) || "CANCELLED".equals(state)) { - return - } - sleep(300) + def state = lists[0][2] + if ("FINISHED".equals(state) || "CANCELLED".equals(state)) { + return } - logger.warn("broker load with label `${label}` didn't finish in ${timeoutInSecond} second, please check it!") + sleep(300) } + logger.warn("broker load with label `${label}` didn't finish in ${timeoutInSecond} second, please check it!") +} - void expectException(Closure userFunction, String errorMessage = null) { - try { - userFunction() - } catch (Exception | Error e) { - if (e.getMessage()!= errorMessage) { - throw e - } +void expectException(Closure userFunction, String errorMessage = null) { + try { + userFunction() + } catch (Exception | Error e) { + if (e.getMessage()!= errorMessage) { + throw e } } +} - void checkTableData(String tbName1 = null, String tbName2 = null, String fieldName = null, String orderByFieldName = null) { - String orderByName = "" - if (ObjectUtils.isEmpty(orderByFieldName)){ - orderByName = fieldName; - }else { - orderByName = orderByFieldName; - } - def tb1Result = sql "select ${fieldName} FROM ${tbName1} order by ${orderByName}" - def tb2Result = sql "select ${fieldName} FROM ${tbName2} order by ${orderByName}" - List tbData1 = new ArrayList(); - for (List items:tb1Result){ - tbData1.add(items.get(0)) - } - List tbData2 = new ArrayList(); - for (List items:tb2Result){ - tbData2.add(items.get(0)) - } - for (int i =0; i tbData1 = new ArrayList(); + for (List items:tb1Result){ + tbData1.add(items.get(0)) + } + List tbData2 = new ArrayList(); + for (List items:tb2Result){ + tbData2.add(items.get(0)) + } + for (int i =0; i - try { - staticLogger.info("execute ${cmd}") - def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() - int exitcode = proc.waitFor() - if (exitcode != 0) { - staticLogger.info("exit code: ${exitcode}, output\n: ${proc.text}") - if (mustSuc == true) { - Assert.assertEquals(0, exitcode) - } +void dispatchTrinoConnectors(ArrayList host_ips) +{ + def dir_download = context.config.otherConfigs.get("trinoPluginsPath") + def s3_url = getS3Url() + def url = "${s3_url}/regression/trino-connectors.tar.gz" + dispatchTrinoConnectors_impl(host_ips, dir_download, url) +} + +/* + * download trino connectors, and sends to every fe and be. + * There are 3 configures to support this: trino_connectors in regression-conf.groovy, and trino_connector_plugin_dir in be and fe. + * fe and be's config must satisfy regression-conf.groovy's config. + * e.g. in regression-conf.groovy, trino_connectors = "/tmp/trino_connector", then in be.conf and fe.conf, must set trino_connector_plugin_dir="/tmp/trino_connector/connectors" + * + * this function must be not reentrant. + * + * If failed, will call assertTrue(false). + */ +static synchronized void dispatchTrinoConnectors_impl(ArrayList host_ips, String dir_download, String url) { + if (isTrinoConnectorDownloaded == true) { + staticLogger.info("trino connector downloaded") + return + } + + Assert.assertTrue(!dir_download.isEmpty()) + def path_tar = "${dir_download}/trino-connectors.tar.gz" + // extract to a tmp direcotry, and then scp to every host_ips, including self. + def dir_connector_tmp = "${dir_download}/connectors_tmp" + def path_connector_tmp = "${dir_connector_tmp}/connectors" + def path_connector = "${dir_download}/connectors" + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + staticLogger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + if (exitcode != 0) { + staticLogger.info("exit code: ${exitcode}, output\n: ${proc.text}") + if (mustSuc == true) { + Assert.assertEquals(0, exitcode) } - } catch (IOException e) { - Assert.assertTrue(false, "execute timeout") } + } catch (IOException e) { + Assert.assertTrue(false, "execute timeout") } + } - executeCommand("mkdir -p ${dir_download}", false) - executeCommand("rm -rf ${path_tar}", false) - executeCommand("rm -rf ${dir_connector_tmp}", false) - executeCommand("mkdir -p ${dir_connector_tmp}", false) - executeCommand("/usr/bin/curl --max-time 600 ${url} --output ${path_tar}", true) - executeCommand("tar -zxvf ${path_tar} -C ${dir_connector_tmp}", true) - - host_ips = host_ips.unique() - for (def ip in host_ips) { - staticLogger.info("scp to ${ip}") - executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"mkdir -p ${dir_download}\"", false) - executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"rm -rf ${path_connector}\"", false) - scpFiles("root", ip, path_connector_tmp, path_connector, false) // if failed, assertTrue(false) is executed. - } + executeCommand("mkdir -p ${dir_download}", false) + executeCommand("rm -rf ${path_tar}", false) + executeCommand("rm -rf ${dir_connector_tmp}", false) + executeCommand("mkdir -p ${dir_connector_tmp}", false) + executeCommand("/usr/bin/curl --max-time 600 ${url} --output ${path_tar}", true) + executeCommand("tar -zxvf ${path_tar} -C ${dir_connector_tmp}", true) - isTrinoConnectorDownloaded = true - staticLogger.info("dispatch trino connector to ${dir_download} succeed") + host_ips = host_ips.unique() + for (def ip in host_ips) { + staticLogger.info("scp to ${ip}") + executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"mkdir -p ${dir_download}\"", false) + executeCommand("ssh -o StrictHostKeyChecking=no root@${ip} \"rm -rf ${path_connector}\"", false) + scpFiles("root", ip, path_connector_tmp, path_connector, false) // if failed, assertTrue(false) is executed. } - void mkdirRemote(String username, String host, String path) { - String cmd = "ssh ${username}@${host} 'mkdir -p ${path}'" - logger.info("Execute: ${cmd}".toString()) - Process process = cmd.execute() - def code = process.waitFor() - Assert.assertEquals(0, code) - } + isTrinoConnectorDownloaded = true + staticLogger.info("dispatch trino connector to ${dir_download} succeed") +} - String cmd(String cmd, int timeoutSecond = 0) { - var processBuilder = new ProcessBuilder() - processBuilder.command("/bin/bash", "-c", cmd) - var process = processBuilder.start() - def outBuf = new StringBuilder() - def errBuf = new StringBuilder() - process.consumeProcessOutput(outBuf, errBuf) - var reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line - while ((line = reader.readLine()) != null) { - System.out.println(line) - } - // wait until cmd finish - if (timeoutSecond > 0) { - process.waitForOrKill(timeoutSecond * 1000) - } else { - process.waitFor() - } - if (process.exitValue() != 0) { - println outBuf - throw new RuntimeException(errBuf.toString()) - } - return outBuf.toString() - } +void mkdirRemote(String username, String host, String path) { + String cmd = "ssh ${username}@${host} 'mkdir -p ${path}'" + logger.info("Execute: ${cmd}".toString()) + Process process = cmd.execute() + def code = process.waitFor() + Assert.assertEquals(0, code) +} - void sshExec(String username, String host, String cmd, boolean alert=true) { - String command = "ssh ${username}@${host} '${cmd}'" - def cmds = ["/bin/bash", "-c", command] - logger.info("Execute: ${cmds}".toString()) - Process p = cmds.execute() - def errMsg = new StringBuilder() - def msg = new StringBuilder() - p.waitForProcessOutput(msg, errMsg) - if (alert) { - assert errMsg.length() == 0: "error occurred!\n" + errMsg - assert p.exitValue() == 0 - } - } +String cmd(String cmd, int timeoutSecond = 0) { + var processBuilder = new ProcessBuilder() + processBuilder.command("/bin/bash", "-c", cmd) + var process = processBuilder.start() + def outBuf = new StringBuilder() + def errBuf = new StringBuilder() + process.consumeProcessOutput(outBuf, errBuf) + var reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line + while ((line = reader.readLine()) != null) { + System.out.println(line) + } + // wait until cmd finish + if (timeoutSecond > 0) { + process.waitForOrKill(timeoutSecond * 1000) + } else { + process.waitFor() + } + if (process.exitValue() != 0) { + println outBuf + throw new RuntimeException(errBuf.toString()) + } + return outBuf.toString() +} - List getFrontendIpHttpPort() { - return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort }; +void sshExec(String username, String host, String cmd, boolean alert=true) { + String command = "ssh ${username}@${host} '${cmd}'" + def cmds = ["/bin/bash", "-c", command] + logger.info("Execute: ${cmds}".toString()) + Process p = cmds.execute() + def errMsg = new StringBuilder() + def msg = new StringBuilder() + p.waitForProcessOutput(msg, errMsg) + if (alert) { + assert errMsg.length() == 0: "error occurred!\n" + errMsg + assert p.exitValue() == 0 } +} - List getFrontendIpEditlogPort() { - return sql_return_maparray("show frontends").collect { it.Host + ":" + it.EditLogPort }; - } +List getFrontendIpHttpPort() { + return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort }; +} - void getBackendIpHttpPort(Map backendId_to_backendIP, Map backendId_to_backendHttpPort) { - List> backends = sql("show backends"); - logger.info("Content of backends: ${backends}") - for (List backend : backends) { - backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); - backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); - } - return; +List getFrontendIpEditlogPort() { + return sql_return_maparray("show frontends").collect { it.Host + ":" + it.EditLogPort }; +} + +void getBackendIpHttpPort(Map backendId_to_backendIP, Map backendId_to_backendHttpPort) { + List> backends = sql("show backends"); + logger.info("Content of backends: ${backends}") + for (List backend : backends) { + backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); + backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); } + return; +} - void getBackendIpHeartbeatPort(Map backendId_to_backendIP, - Map backendId_to_backendHeartbeatPort) { - List> backends = sql("show backends"); - logger.info("Content of backends: ${backends}") - for (List backend : backends) { - backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); - backendId_to_backendHeartbeatPort.put(String.valueOf(backend[0]), String.valueOf(backend[2])); - } - return; +void getBackendIpHeartbeatPort(Map backendId_to_backendIP, + Map backendId_to_backendHeartbeatPort) { + List> backends = sql("show backends"); + logger.info("Content of backends: ${backends}") + for (List backend : backends) { + backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); + backendId_to_backendHeartbeatPort.put(String.valueOf(backend[0]), String.valueOf(backend[2])); } + return; +} - void getBackendIpHttpAndBrpcPort(Map backendId_to_backendIP, - Map backendId_to_backendHttpPort, Map backendId_to_backendBrpcPort) { +void getBackendIpHttpAndBrpcPort(Map backendId_to_backendIP, + Map backendId_to_backendHttpPort, Map backendId_to_backendBrpcPort) { - List> backends = sql("show backends"); - for (List backend : backends) { - backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); - backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); - backendId_to_backendBrpcPort.put(String.valueOf(backend[0]), String.valueOf(backend[5])); - } - return; + List> backends = sql("show backends"); + for (List backend : backends) { + backendId_to_backendIP.put(String.valueOf(backend[0]), String.valueOf(backend[1])); + backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); + backendId_to_backendBrpcPort.put(String.valueOf(backend[0]), String.valueOf(backend[5])); } + return; +} - int getTotalLine(String filePath) { - def file = new File(filePath) - int lines = 0; - file.eachLine { - lines++; - } - return lines; +int getTotalLine(String filePath) { + def file = new File(filePath) + int lines = 0; + file.eachLine { + lines++; } + return lines; +} - Connection getTargetConnection() { - return context.getTargetConnection(this) - } - - boolean deleteFile(String filePath) { - def file = new File(filePath) - file.delete() - } +Connection getTargetConnection() { + return context.getTargetConnection(this) +} - List downloadExportFromHdfs(String label) { - String dataDir = context.config.dataPath + "/" + group + "/" - String hdfsFs = context.config.otherConfigs.get("hdfsFs") - String hdfsUser = context.config.otherConfigs.get("hdfsUser") - Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir) - return hdfs.downLoad(label) - } +boolean deleteFile(String filePath) { + def file = new File(filePath) + file.delete() +} - void runStreamLoadExample(String tableName, String coordidateBeHostPort = "") { - def backends = sql_return_maparray "show backends" - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - id int, - name varchar(255) - ) - DISTRIBUTED BY HASH(id) BUCKETS 1 - PROPERTIES ( - "replication_num" = "${backends.size()}" - ) - """ +List downloadExportFromHdfs(String label) { + String dataDir = context.config.dataPath + "/" + group + "/" + String hdfsFs = context.config.otherConfigs.get("hdfsFs") + String hdfsUser = context.config.otherConfigs.get("hdfsUser") + Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir) + return hdfs.downLoad(label) +} - streamLoad { - table tableName - set 'column_separator', ',' - file context.config.dataPath + "/demo_p0/streamload_input.csv" - time 10000 - if (!coordidateBeHostPort.equals("")) { - def pos = coordidateBeHostPort.indexOf(':') - def host = coordidateBeHostPort.substring(0, pos) - def httpPort = coordidateBeHostPort.substring(pos + 1).toInteger() - directToBe host, httpPort - } +void runStreamLoadExample(String tableName, String coordidateBeHostPort = "") { + def backends = sql_return_maparray "show backends" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id int, + name varchar(255) + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "${backends.size()}" + ) + """ + + streamLoad { + table tableName + set 'column_separator', ',' + file context.config.dataPath + "/demo_p0/streamload_input.csv" + time 10000 + if (!coordidateBeHostPort.equals("")) { + def pos = coordidateBeHostPort.indexOf(':') + def host = coordidateBeHostPort.substring(0, pos) + def httpPort = coordidateBeHostPort.substring(pos + 1).toInteger() + directToBe host, httpPort } } +} - void streamLoad(Closure actionSupplier) { - runAction(new StreamLoadAction(context), actionSupplier) - } +void streamLoad(Closure actionSupplier) { + runAction(new StreamLoadAction(context), actionSupplier) +} - void restore(Closure actionSupplier) { - runAction(new RestoreAction(context), actionSupplier) - } +void restore(Closure actionSupplier) { + runAction(new RestoreAction(context), actionSupplier) +} - void httpTest(Closure actionSupplier) { - runAction(new HttpCliAction(context), actionSupplier) - } +void httpTest(Closure actionSupplier) { + runAction(new HttpCliAction(context), actionSupplier) +} - void runAction(SuiteAction action, Closure actionSupplier) { - actionSupplier.setDelegate(action) - actionSupplier.setResolveStrategy(Closure.DELEGATE_FIRST) - actionSupplier.call(action) - action.run() - } +void runAction(SuiteAction action, Closure actionSupplier) { + actionSupplier.setDelegate(action) + actionSupplier.setResolveStrategy(Closure.DELEGATE_FIRST) + actionSupplier.call(action) + action.run() +} - PreparedStatement prepareStatement(String sql) { - logger.info("Execute sql: ${sql}".toString()) - return JdbcUtils.prepareStatement(context.getConnection(), sql) - } +PreparedStatement prepareStatement(String sql) { + logger.info("Execute sql: ${sql}".toString()) + return JdbcUtils.prepareStatement(context.getConnection(), sql) +} - void setHivePrefix(String hivePrefix) { - this.hivePrefix = hivePrefix - } +void setHivePrefix(String hivePrefix) { + this.hivePrefix = hivePrefix +} - List> hive_docker(String sqlStr, boolean isOrder = false) { - logger.info("Execute hive ql: ${sqlStr}".toString()) - String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") - def (result, meta) = JdbcUtils.executeToList(context.getHiveDockerConnection(hivePrefix), cleanedSqlStr) - if (isOrder) { - result = DataUtils.sortByToString(result) - } - return result +List> hive_docker(String sqlStr, boolean isOrder = false) { + logger.info("Execute hive ql: ${sqlStr}".toString()) + String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") + def (result, meta) = JdbcUtils.executeToList(context.getHiveDockerConnection(hivePrefix), cleanedSqlStr) + if (isOrder) { + result = DataUtils.sortByToString(result) } + return result +} - List> hive_remote(String sqlStr, boolean isOrder = false) { - String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") - def (result, meta) = JdbcUtils.executeToList(context.getHiveRemoteConnection(), cleanedSqlStr) - if (isOrder) { - result = DataUtils.sortByToString(result) - } - return result +List> hive_remote(String sqlStr, boolean isOrder = false) { + String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") + def (result, meta) = JdbcUtils.executeToList(context.getHiveRemoteConnection(), cleanedSqlStr) + if (isOrder) { + result = DataUtils.sortByToString(result) } + return result +} - List> db2_docker(String sqlStr, boolean isOrder = false) { - String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") - def (result, meta) = JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr) - if (isOrder) { - result = DataUtils.sortByToString(result) - } - return result - } - List> exec(Object stmt) { - logger.info("Execute sql: ${stmt}".toString()) - def (result, meta )= JdbcUtils.executeToList(context.getConnection(), (PreparedStatement) stmt) - return result +List> db2_docker(String sqlStr, boolean isOrder = false) { + String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") + def (result, meta) = JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr) + if (isOrder) { + result = DataUtils.sortByToString(result) } + return result +} +List> exec(Object stmt) { + logger.info("Execute sql: ${stmt}".toString()) + def (result, meta )= JdbcUtils.executeToList(context.getConnection(), (PreparedStatement) stmt) + return result +} - void quickRunTest(String tag, Object arg, boolean isOrder = false) { - if (context.config.generateOutputFile || context.config.forceGenerateOutputFile) { - Tuple2>, ResultSetMetaData> tupleResult = null - if (arg instanceof PreparedStatement) { - if (tag.contains("hive_docker")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (PreparedStatement) arg) - } else if (tag.contains("hive_remote")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (PreparedStatement) arg) - } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { - tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), (PreparedStatement) arg) - } else if (tag.contains("target_sql")) { - tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (PreparedStatement) arg) - } else { - tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (PreparedStatement) arg) - } +void quickRunTest(String tag, Object arg, boolean isOrder = false) { + if (context.config.generateOutputFile || context.config.forceGenerateOutputFile) { + Tuple2>, ResultSetMetaData> tupleResult = null + if (arg instanceof PreparedStatement) { + if (tag.contains("hive_docker")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (PreparedStatement) arg) + } else if (tag.contains("hive_remote")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (PreparedStatement) arg) + } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { + tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), (PreparedStatement) arg) + } else if (tag.contains("target_sql")) { + tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (PreparedStatement) arg) } else { - if (tag.contains("hive_docker")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (String) arg) - } else if (tag.contains("hive_remote")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (String) arg) - } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { - tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), - (String) ("USE ${context.dbName};" + (String) arg)) - } else if (tag.contains("target_sql")) { - tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (String) arg) - } else { - tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (String) arg) - } - } - def (result, meta) = tupleResult - if (isOrder) { - result = sortByToString(result) + tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (PreparedStatement) arg) } - Iterator> realResults = result.iterator() - // generate and save to .out file - def writer = context.getOutputWriter(context.config.forceGenerateOutputFile) - writer.write(realResults, tag) } else { - if (!context.outputFile.exists()) { - throw new IllegalStateException("Missing outputFile: ${context.outputFile.getAbsolutePath()}") - } - - if (!context.getOutputIterator().hasNextTagBlock(tag)) { - throw new IllegalStateException("Missing output block for tag '${tag}': ${context.outputFile.getAbsolutePath()}") + if (tag.contains("hive_docker")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (String) arg) + } else if (tag.contains("hive_remote")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (String) arg) + } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { + tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), + (String) ("USE ${context.dbName};" + (String) arg)) + } else if (tag.contains("target_sql")) { + tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (String) arg) + } else { + tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (String) arg) } - - OutputUtils.TagBlockIterator expectCsvResults = context.getOutputIterator().next() - Tuple2>, ResultSetMetaData> tupleResult = null - if (arg instanceof PreparedStatement) { - if (tag.contains("hive_docker")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (PreparedStatement) arg) - } else if (tag.contains("hive_remote")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (PreparedStatement) arg) - } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { - tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), (PreparedStatement) arg) - } else if (tag.contains("target_sql")) { - tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (PreparedStatement) arg) - } else { - tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (PreparedStatement) arg) - } + } + def (result, meta) = tupleResult + if (isOrder) { + result = sortByToString(result) + } + Iterator> realResults = result.iterator() + // generate and save to .out file + def writer = context.getOutputWriter(context.config.forceGenerateOutputFile) + writer.write(realResults, tag) + } else { + if (!context.outputFile.exists()) { + throw new IllegalStateException("Missing outputFile: ${context.outputFile.getAbsolutePath()}") + } + + if (!context.getOutputIterator().hasNextTagBlock(tag)) { + throw new IllegalStateException("Missing output block for tag '${tag}': ${context.outputFile.getAbsolutePath()}") + } + + OutputUtils.TagBlockIterator expectCsvResults = context.getOutputIterator().next() + Tuple2>, ResultSetMetaData> tupleResult = null + if (arg instanceof PreparedStatement) { + if (tag.contains("hive_docker")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (PreparedStatement) arg) + } else if (tag.contains("hive_remote")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (PreparedStatement) arg) + } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { + tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), (PreparedStatement) arg) + } else if (tag.contains("target_sql")) { + tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (PreparedStatement) arg) } else { - if (tag.contains("hive_docker")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (String) arg) - } else if (tag.contains("hive_remote")) { - tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (String) arg) - } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { - tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), - (String) ("USE ${context.dbName};" + (String) arg)) - } else if (tag.contains("target_sql")) { - tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (String) arg) - } else { - tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (String) arg) - } + tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (PreparedStatement) arg) } - def (realResults, meta) = tupleResult - if (isOrder) { - realResults = sortByToString(realResults) + } else { + if (tag.contains("hive_docker")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveDockerConnection(hivePrefix), (String) arg) + } else if (tag.contains("hive_remote")) { + tupleResult = JdbcUtils.executeToStringList(context.getHiveRemoteConnection(), (String) arg) + } else if (tag.contains("arrow_flight_sql") || context.useArrowFlightSql()) { + tupleResult = JdbcUtils.executeToStringList(context.getArrowFlightSqlConnection(), + (String) ("USE ${context.dbName};" + (String) arg)) + } else if (tag.contains("target_sql")) { + tupleResult = JdbcUtils.executeToStringList(context.getTargetConnection(this), (String) arg) + } else { + tupleResult = JdbcUtils.executeToStringList(context.getConnection(), (String) arg) } + } + def (realResults, meta) = tupleResult + if (isOrder) { + realResults = sortByToString(realResults) + } - Iterator> realResultsIter = realResults.iterator() - def realWriter = context.getRealOutputWriter(true) - realWriter.write(realResultsIter, tag) + Iterator> realResultsIter = realResults.iterator() + def realWriter = context.getRealOutputWriter(true) + realWriter.write(realResultsIter, tag) - String errorMsg = null - try { - errorMsg = OutputUtils.checkOutput(expectCsvResults, realResults.iterator(), - { row -> OutputUtils.toCsvString(row as List) }, - { row -> OutputUtils.toCsvString(row) }, - "Check tag '${tag}' failed", meta) - } catch (Throwable t) { - throw new IllegalStateException("Check tag '${tag}' failed, sql:\n${arg}", t) - } - if (errorMsg != null) { - def allPlan = "" - if (arg instanceof String) { - def query = (String) arg; - def pattern = Pattern.compile("^\\s*explain\\s+shape\\s*plan\\s*", Pattern.MULTILINE) - if (query =~ pattern) { - def physical = query.replaceAll(pattern, "explain all plan ") + String errorMsg = null + try { + errorMsg = OutputUtils.checkOutput(expectCsvResults, realResults.iterator(), + { row -> OutputUtils.toCsvString(row as List) }, + { row -> OutputUtils.toCsvString(row) }, + "Check tag '${tag}' failed", meta) + } catch (Throwable t) { + throw new IllegalStateException("Check tag '${tag}' failed, sql:\n${arg}", t) + } + if (errorMsg != null) { + def allPlan = "" + if (arg instanceof String) { + def query = (String) arg; + def pattern = Pattern.compile("^\\s*explain\\s+shape\\s*plan\\s*", Pattern.MULTILINE) + if (query =~ pattern) { + def physical = query.replaceAll(pattern, "explain all plan ") + try { allPlan = JdbcUtils.executeToStringList(context.getConnection(), physical)[0].join('\n') - } + } catch (Throwable ignore) {} } String csvRealResult = realResults.stream()