Skip to content

Commit

Permalink
Multi stage int tests (apache#11404)
Browse files Browse the repository at this point in the history
* Run OfflineClusterIntegrationTest test with both query engines.

* Improved some tests in V2

* Improve some tests in OfflineClusterIntegrationTest

* Run more integration test with multi stage engine

* Run OfflineClusterIntegrationTest test with both query engines.

* Improved some tests in V2

* Fix MultiStageEngineIntegrationTest

* Apply some changes in BaseClusterIntegrationTestSet

* Extra improvements in OfflineClusterIntegrationTest

* Adapted error code expected on a specific query

* Fix or skip all tests in OfflineClusterIntegrationTest

* Skip all tests that fail in BaseRealtimeClusterIntegrationTest

* Skip tests that doesn't work in V2 using a specific function

* Fix MultiStageEngineIntegrationTest to run with the correct transport

* Add a TODO

* Fix several error codes

* Enable one now supported V2 tests

---------

Co-authored-by: Xiang Fu <[email protected]>
  • Loading branch information
gortiz and xiangfu0 authored Sep 7, 2023
1 parent 61a81ab commit be496bc
Show file tree
Hide file tree
Showing 19 changed files with 691 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public static Connection fromHostList(Properties properties, List<String> broker
}

private static PinotClientTransport getDefault(Properties connectionProperties) {
// TODO: This code incorrectly assumes that connection properties are always the same
if (_defaultTransport == null) {
synchronized (ConnectionFactory.class) {
if (_defaultTransport == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -32,9 +33,11 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
Expand Down Expand Up @@ -509,17 +512,24 @@ protected TableConfig getRealtimeTableConfig() {
* @return Pinot connection
*/
protected org.apache.pinot.client.Connection getPinotConnection() {
// TODO: This code is assuming getPinotConnectionProperties() will always return the same values
if (useMultiStageQueryEngine()) {
if (_pinotConnectionV2 == null) {
Properties properties = getPinotConnectionProperties();
properties.put("useMultiStageEngine", "true");
_pinotConnectionV2 = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName());
_pinotConnectionV2 = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(),
new JsonAsyncHttpPinotClientTransportFactory()
.withConnectionProperties(properties)
.buildTransport());
}
return _pinotConnectionV2;
}
if (_pinotConnection == null) {
_pinotConnection =
ConnectionFactory.fromZookeeper(getPinotConnectionProperties(), getZkUrl() + "/" + getHelixClusterName());
ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(),
new JsonAsyncHttpPinotClientTransportFactory()
.withConnectionProperties(getPinotConnectionProperties())
.buildTransport());
}
return _pinotConnection;
}
Expand Down Expand Up @@ -789,4 +799,21 @@ protected void testQueryWithMatchingRowCount(String pinotQuery, String h2Query)
ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, getBrokerBaseApiUrl(), getPinotConnection(),
h2Query, getH2Connection(), null, getExtraQueryProperties(), useMultiStageQueryEngine());
}

protected String getType(JsonNode jsonNode, int colIndex) {
return jsonNode.get("resultTable")
.get("dataSchema")
.get("columnDataTypes")
.get(colIndex)
.asText();
}

protected <T> T getCellValue(JsonNode jsonNode, int colIndex, int rowIndex, Function<JsonNode, T> extract) {
JsonNode cellResult = jsonNode.get("resultTable").get("rows").get(rowIndex).get(colIndex);
return extract.apply(cellResult);
}

protected long getLongCellValue(JsonNode jsonNode, int colIndex, int rowIndex) {
return getCellValue(jsonNode, colIndex, rowIndex, JsonNode::asLong).longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;

import static org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
Expand Down Expand Up @@ -572,4 +573,10 @@ public Object[][] useV2QueryEngine() {
{true}
};
}

protected void notSupportedInV2() {
if (useMultiStageQueryEngine()) {
throw new SkipException("Some queries fail when using multi-stage engine");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ protected void waitForAllDocsLoaded(long timeoutMs) {
}, 100L, timeoutMs, "Failed to load all documents");
}

@Test
public void testQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
testQuery(query);
query = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch ORDER BY SUM(AirTime) DESC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand All @@ -67,6 +68,11 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
"On_Time_On_Time_Performance_2014_100k_subset.test_queries_200.sql";
private static final int DEFAULT_NUM_QUERIES_TO_GENERATE = 100;

@BeforeMethod
public void resetMultiStage() {
setUseMultiStageQueryEngine(false);
}

/**
* Can be overridden to change default setting
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ protected List<String> getNoDictionaryColumns() {
* to ensure the right result is computed, wherein dictionary is not read if it is mutable
* @throws Exception
*/
@Test
public void testDictionaryBasedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);

// Dictionary columns
// int
Expand Down Expand Up @@ -142,30 +143,35 @@ private void testDictionaryBasedFunctions(String column)
String.format("SELECT MAX(%s)-MIN(%s) FROM %s", column, column, getTableName()));
}

@Test
public void testHardcodedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testHardcodedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testHardcodedQueries();
}

@Test
@Override
public void testQueriesFromQueryFile()
@Test(dataProvider = "useBothQueryEngines")
public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testQueriesFromQueryFile();
}

@Test
@Override
public void testGeneratedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testGeneratedQueries(boolean useMultiStageQueryEngine)
throws Exception {
testGeneratedQueries(true, false);
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
testGeneratedQueries(true, useMultiStageQueryEngine);
}

@Test
@Override
public void testQueryExceptions()
@Test(dataProvider = "useBothQueryEngines")
public void testQueryExceptions(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testQueryExceptions();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ protected void startHybridCluster()
createServerTenant(TENANT_NAME, 1, 1);
}

@Test
public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery()
@Test(dataProvider = "useBothQueryEngines")
public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String query = "SELECT * FROM mytable LIMIT 1000000";
testQuery(query);
query = "SELECT * FROM mytable WHERE DaysSinceEpoch > 16312 LIMIT 10000000";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ public void testBrokerDebugOutput()
Assert.assertNotNull(getDebugInfo("debug/routingTable/" + TableNameBuilder.REALTIME.tableNameWithType(tableName)));
}

@Test
public void testBrokerDebugRoutingTableSQL()
@Test(dataProvider = "useBothQueryEngines")
public void testBrokerDebugRoutingTableSQL(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
String tableName = getTableName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
Expand All @@ -198,9 +199,11 @@ public void testBrokerDebugRoutingTableSQL()
Assert.assertNotNull(getDebugInfo("debug/routingTable/sql?query=" + encodedSQL));
}

@Test
public void testQueryTracing()
@Test(dataProvider = "useBothQueryEngines")
public void testQueryTracing(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
JsonNode jsonNode = postQuery("SET trace = true; SELECT COUNT(*) FROM " + getTableName());
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), getCountStarResult());
Assert.assertTrue(jsonNode.get("exceptions").isEmpty());
Expand All @@ -210,9 +213,11 @@ public void testQueryTracing()
Assert.assertTrue(traceInfo.has("localhost_R"));
}

@Test
public void testQueryTracingWithLiteral()
@Test(dataProvider = "useBothQueryEngines")
public void testQueryTracingWithLiteral(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
JsonNode jsonNode =
postQuery("SET trace = true; SELECT 1, \'test\', ArrDelay FROM " + getTableName() + " LIMIT 10");
long countStarResult = 10;
Expand All @@ -228,9 +233,10 @@ public void testQueryTracingWithLiteral()
Assert.assertTrue(traceInfo.has("localhost_R"));
}

@Test
public void testDropResults()
@Test(dataProvider = "useBothQueryEngines")
public void testDropResults(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
final String query = String.format("SELECT * FROM %s limit 10", getTableName());
final String resultTag = "resultTable";

Expand All @@ -244,31 +250,35 @@ public void testDropResults()
Assert.assertTrue(postQueryWithOptions(query, "dropResults=truee").has(resultTag));
}

@Test
@Override
public void testHardcodedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testHardcodedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testHardcodedQueries();
}

@Test
@Override
public void testQueriesFromQueryFile()
@Test(dataProvider = "useBothQueryEngines")
public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testQueriesFromQueryFile();
}

@Test
@Override
public void testGeneratedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testGeneratedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testGeneratedQueries();
}

@Test
@Override
public void testQueryExceptions()
@Test(dataProvider = "useBothQueryEngines")
public void testQueryExceptions(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
super.testQueryExceptions();
}

Expand All @@ -286,9 +296,9 @@ public void testBrokerResponseMetadata()
super.testBrokerResponseMetadata();
}

@Test
@Override
public void testVirtualColumnQueries() {
@Test(dataProvider = "useBothQueryEngines")
public void testVirtualColumnQueries(boolean useMultiStageQueryEngine)
throws Exception {
super.testVirtualColumnQueries();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ public void setUp()
waitForAllDocsLoaded(600_000L);
}

@Test
public void testQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
// Select column created with transform function
String sqlQuery = "Select millisSinceEpoch from " + DEFAULT_TABLE_NAME;
JsonNode response = postQuery(sqlQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,11 @@ public void testReload()
testReload(false);
}

@Test
public void testAddRemoveDictionaryAndInvertedIndex()
@Test(dataProvider = "useBothQueryEngines")
public void testAddRemoveDictionaryAndInvertedIndex(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
notSupportedInV2();
String query = "SELECT COUNT(*) FROM myTable WHERE ActualElapsedTime = -9999";
long numTotalDocs = getCountStarResult();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,29 +194,28 @@ private void testCountStarQuery(int expectedNumServersQueried, boolean exception

// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testStarTreeTriggering() {
public void testStarTreeTriggering(boolean useMultiStageQueryEngine) {
// Ignored
}

// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testDefaultColumns() {
public void testDefaultColumns(boolean useMultiStageQueryEngineg) {
// Ignored
}

// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testBloomFilterTriggering() {
// Ignored
}

// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testRangeIndexTriggering() {
public void testRangeIndexTriggering(boolean useMultiStageQueryEngine)
throws Exception {
// Ignored
}

Expand Down
Loading

0 comments on commit be496bc

Please sign in to comment.