Skip to content

Commit

Permalink
SNOW-1728000 Create abstractions for e2e tests p.2
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Oct 17, 2024
1 parent d96603a commit d99d44b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 63 deletions.
32 changes: 17 additions & 15 deletions test/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
from datetime import datetime
import sys
import traceback

# TestExecutor is responsible for running a given subset of tests
class TestExecutor:
def execution(testSuitList, testSuitEnableList, driver, nameSalt, round=1):

def execute(self, testSuitList, driver, nameSalt, round=1):
try:
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
driver.createConnector(test.getConfigFileName(), nameSalt)
for test in testSuitList:
driver.createConnector(test.getConfigFileName(), nameSalt)

driver.startConnectorWaitTime()

for r in range(round):
print(datetime.now().strftime("\n%H:%M:%S "), "=== round {} ===".format(r))
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "),
"=== Sending " + test.__class__.__name__ + " data ===")
test.send()
print(datetime.now().strftime("%H:%M:%S "), "=== Done " + test.__class__.__name__ + " ===", flush=True)
for test in testSuitList:
print(datetime.now().strftime("\n%H:%M:%S "),
"=== Sending " + test.__class__.__name__ + " data ===")
test.send()
print(datetime.now().strftime("%H:%M:%S "), "=== Done " + test.__class__.__name__ + " ===", flush=True)


driver.verifyWaitTime()

for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===", flush=True)
for test in testSuitList:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===", flush=True)

print(datetime.now().strftime("\n%H:%M:%S "), "=== All test passed ===")
except Exception as e:
Expand Down
18 changes: 18 additions & 0 deletions test/test_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from test_suites import create_end_to_end_test_suites
import test_suit

# TestSelector is responsible for selecting a subset of tests to be run
# It is meant to filter tests by platform, cloud vendor or any other predicate needed
class TestSelector:

def selectTestsToBeRun(self, driver, nameSalt, schemaRegistryAddress, testPlatform, allowedTestsCsv):
test_suites = create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testPlatform, allowedTestsCsv)

if testPlatform == "apache":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values() if single_end_to_end_test.run_in_apache == True]
elif testPlatform == "confluent":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values() if single_end_to_end_test.run_in_confluent == True]
elif testPlatform == "clean":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values()]
else:
raise test_suit.test_utils.NonRetryableError("unknown testPlatform={}".format(testPlatform))
58 changes: 10 additions & 48 deletions test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from confluent_kafka.avro import AvroProducer
from test_suites import create_end_to_end_test_suites
from test_executor import TestExecutor
from test_selector import TestSelector
import time

import test_suit
Expand Down Expand Up @@ -458,56 +459,26 @@ def runStressTests(driver, testSet, nameSalt):
############################ Stress Tests Round 1 ############################
# TestPressure and TestPressureRestart will only run when Running StressTests
print(datetime.now().strftime("\n%H:%M:%S "), "=== Stress Tests Round 1 ===")
testSuitList = [testPressureRestart]

testSuitEnableList = []
if testSet == "confluent":
testSuitEnableList = [True]
elif testSet == "apache":
testSuitEnableList = [True]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1)
execution(testSet, [testPressureRestart], driver, nameSalt, round=1)
############################ Stress Tests Round 1 ############################

############################ Stress Tests Round 2 ############################
print(datetime.now().strftime("\n%H:%M:%S "), "=== Stress Tests Round 2 ===")
testSuitList = [testPressure]

testSuitEnableList = []
if testSet == "confluent":
testSuitEnableList = [True]
elif testSet == "apache":
testSuitEnableList = [True]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1)
execution(testSet, [testPressure], driver, nameSalt, round=1)
############################ Stress Tests Round 2 ############################


def runTestSet(driver, testSet, nameSalt, enable_stress_test, skipProxy, allowedTestsCsv):
if enable_stress_test:
runStressTests(driver, testSet, nameSalt)
else:
test_suites = create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testSet, allowedTestsCsv)

############################ round 1 ############################
print(datetime.now().strftime("\n%H:%M:%S "), "=== Round 1 ===")

end_to_end_tests_suite = [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values()]
testSelector = TestSelector()
end_to_end_tests_suite = testSelector.selectTestsToBeRun(driver, nameSalt, schemaRegistryAddress, testSet, allowedTestsCsv)

end_to_end_tests_suite_runner = []

if testSet == "confluent":
end_to_end_tests_suite_runner = [single_end_to_end_test.run_in_confluent for single_end_to_end_test in test_suites.values()]
elif testSet == "apache":
end_to_end_tests_suite_runner = [single_end_to_end_test.run_in_apache for single_end_to_end_test in test_suites.values()]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, end_to_end_tests_suite, end_to_end_tests_suite_runner, driver, nameSalt)
execution(testSet, end_to_end_tests_suite, driver, nameSalt)

############################ Always run Proxy tests in the end ############################

Expand All @@ -530,27 +501,18 @@ def runTestSet(driver, testSet, nameSalt, enable_stress_test, skipProxy, allowed

end_to_end_proxy_tests_suite = [single_end_to_end_test.test_instance for single_end_to_end_test in proxy_tests_suite]

proxy_suite_runner = []

if testSet == "confluent":
proxy_suite_runner = [single_end_to_end_test.run_in_confluent for single_end_to_end_test in proxy_tests_suite]
elif testSet == "apache":
proxy_suite_runner = [single_end_to_end_test.run_in_apache for single_end_to_end_test in proxy_tests_suite]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, end_to_end_proxy_tests_suite, proxy_suite_runner, driver, nameSalt)
execution(testSet, end_to_end_proxy_tests_suite, driver, nameSalt)
############################ Proxy End To End Test End ############################


def execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1):
def execution(testSet, testSuitList, driver, nameSalt, round=1):
if testSet == "clean":
for i, test in enumerate(testSuitList):
for test in testSuitList:
test.clean()
print(datetime.now().strftime("\n%H:%M:%S "), "=== All clean done ===")
else:
testExecutor = TestExecutor()
testExecutor.execution(testSuitList, testSuitEnableList, driver, nameSalt, round)
testExecutor.execute(testSuitList, driver, nameSalt, round)


def run_test_set_with_parameters(kafka_test: KafkaTest, testSet, nameSalt, pressure, skipProxy, allowedTestsCsv):
Expand Down

0 comments on commit d99d44b

Please sign in to comment.