Skip to content

Commit

Permalink
smoke framework support external stream (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhao0117 authored Nov 20, 2023
1 parent aaf5d03 commit 1b233cd
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 39 deletions.
132 changes: 99 additions & 33 deletions tests/stream/helpers/rockets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/python3
# _*_ coding: utf-8 _*_
import datetime, yaml, json, getopt, logging, logging.config, math, os, platform, random, requests,signal,subprocess, sys, threading, time, traceback, uuid
import datetime, yaml, json, getopt, logging, logging.config, math, os, platform, random, requests, signal, subprocess, sys, threading, time, traceback, uuid
import multiprocessing as mp
from proton_driver import Client, errors
from timeplus import Stream, Environment
Expand Down Expand Up @@ -128,7 +128,8 @@ def timeout_flag(
f"{str(datetime.datetime.now())}, {hit_context_info}, timeout_flag exception, error = {error}"
)

def scan_tests_file_path(tests_file_path):

def scan_tests_file_path(tests_file_path, proton_setting):
test_suites_selected = []
test_suite_names_selected = []
all_test_suites_json = []
Expand All @@ -143,7 +144,12 @@ def scan_tests_file_path(tests_file_path):
files = os.listdir(tests_file_path)
logger.debug(f"files = {files}")
for file_name in files:
if file_name.endswith(".json") or file_name.endswith(".yaml") or file_name.endswith(".yml"):
test_suite_settings_2_run = None
if (
file_name.endswith(".json")
or file_name.endswith(".yaml")
or file_name.endswith(".yml")
):
file_abs_path = f"{tests_file_path}/{file_name}"
logger.debug(f"file_abs_path = {file_abs_path}")
with open(file_abs_path) as test_suite_file:
Expand All @@ -156,27 +162,38 @@ def scan_tests_file_path(tests_file_path):
)
test_suite_name = test_suite.get("test_suite_name")
test_suite_tag = test_suite.get("tag")
test_suite_config = test_suite.get("test_suite_config")
if test_suite_config is not None:
test_suite_settings_2_run = test_suite_config.get("settings_to_run")
else:
test_suite_settings_2_run = None
if test_suite_name == None or test_suite_tag == "skip":
logger.debug(
f"test_suite_name is vacant or test_suite_tag == skip and ignore this json file"
)
pass
else:
logger.debug(
f"check if test_sute_name = {test_suite_name} in test_suites_set_list = {test_suites_set_list}"
f"check if test_sute_name = {test_suite_name}, test_suite_settings_2_run = {test_suite_settings_2_run}, in test_suites_set_list = {test_suites_set_list}"
)
if test_suites_set_list == None:
test_suites_selected.append(test_suite)
test_suite_names_selected.append(test_suite_name)
elif len(test_suites_set_list) == 0:
test_suites_selected.append(test_suite)
test_suite_names_selected.append(test_suite_name)
elif test_suite_name in test_suites_set_list:
test_suites_selected.append(test_suite)
test_suite_names_selected.append(test_suite_name)
else:
pass
logger.info(f"test_suite_names_selected = {test_suite_names_selected}")
if test_suite_settings_2_run is None or (
test_suite_settings_2_run is not None
and proton_setting in test_suite_settings_2_run
):
if test_suites_set_list == None:
test_suites_selected.append(test_suite)
test_suite_names_selected.append(test_suite_name)
elif len(test_suites_set_list) == 0:
test_suites_selected.append(test_suite)
test_suite_names_selected.append(test_suite_name)
elif test_suite_name in test_suites_set_list:
test_suites_selected.append(test_suite)
test_suite_names_selected.append(test_suite_name)
else:
pass
logger.info(
f"proton_setting = {proton_setting}, test_suite_settings_2_run = {test_suite_settings_2_run}, test_suite_names_selected = {test_suite_names_selected}"
)
return {
"test_suite_names_selected": test_suite_names_selected,
"test_suites_selected": test_suites_selected,
Expand Down Expand Up @@ -229,7 +246,9 @@ def rockets_context(config_file, tests_file_path, docker_compose_file):
configs = json.load(f)
elif config_file.endswith(".yaml") or config_file.endswith(".yml"):
configs = yaml.safe_load(f)
timeplus_event_stream = configs.get("timeplus_event_stream") #todo: distribute global configs into configs
timeplus_event_stream = configs.get(
"timeplus_event_stream"
) # todo: distribute global configs into configs
timeplus_event_version = configs.get("timeplus_event_version")
config = configs.get(proton_setting)
logger.debug(f"setting = {proton_setting},config = {config}")
Expand Down Expand Up @@ -265,7 +284,7 @@ def rockets_context(config_file, tests_file_path, docker_compose_file):
] = proton_cluster_query_node # put proton_cluster_query_node into config
config["proton_cluster_query_route_mode"] = proton_cluster_query_route_mode

res_scan_tests_file_path = scan_tests_file_path(tests_file_path)
res_scan_tests_file_path = scan_tests_file_path(tests_file_path, proton_setting)
test_suite_names_selected = res_scan_tests_file_path.get(
"test_suite_names_selected"
)
Expand Down Expand Up @@ -571,11 +590,11 @@ def run_test_suites(
logger.debug(
f"proton_setting = {proton_setting}, total {len(test_suites_selected_sets)} test suties are launched: {test_suite_names}"
)
test_run_list_len_total = 0
test_suite_result_summary_list = []
test_run_list_len_total = 0
test_suite_result_summary_list = []
try:
test_suite_run_ctl_queue.join()
test_suite_result_collect_done = 0
test_suite_result_collect_done = 0
logger.debug(
f"test_suite_result_collect_done = {test_suite_result_collect_done},len(test_suites_selected_sets) = {len(test_suites_selected_sets)} "
)
Expand Down Expand Up @@ -2175,10 +2194,10 @@ def create_table_rest(cls, config, table_schema, retry=3):
exception_retry = retry # set the retry times of exception catching
table_name = table_schema.get("name")
type = table_schema.get("type")
create_start_time = datetime.datetime.now()
res = None
create_start_time = datetime.datetime.now()
res = None
while retry > 0 and exception_retry > 0:
query_parameters = table_schema.get("query_parameters")
query_parameters = table_schema.get("query_parameters")
try:
logger.debug(f"create_table_rest starts...")
if query_parameters != None:
Expand Down Expand Up @@ -2237,7 +2256,7 @@ def create_table_rest(cls, config, table_schema, retry=3):
time.sleep(1)
exception_retry = retry # reset exception_retry for table_exit check
create_table_time_out = 200 # set how many times wait and list table to check if table creation completed.
while create_table_time_out > 0 and exception_retry > 0:
while create_table_time_out > 0 and exception_retry > 0:
try:
if QueryClientRest.table_exist(table_ddl_url, table_name):
logger.info(f"table {table_name} is created successfully.")
Expand Down Expand Up @@ -2553,12 +2572,19 @@ def run(self, statement_2_run, config):
logger.debug(
f"local running: handler of logger = {logger.handlers}, logger.level = {logger.level}"
)
proton_setting = config.get("proton_setting")
rest_setting = config.get("rest_setting")
query_url = rest_setting.get("query_url")
query_results = {}
query_id = str(statement_2_run.get("query_id"))
query_id_type = statement_2_run.get("query_id_type")
query_type = statement_2_run.get("query_type")
echo = statement_2_run.get("echo")
query = statement_2_run.get("query")
test_suite_name = statement_2_run.get("test_suite_name")
test_id = statement_2_run.get("test_id")
query_client = statement_2_run.get("client")
depends_on = statement_2_run.get("depends_on")
depends_on_stream = statement_2_run.get("depends_on_stream")
query_start_time_str = str(datetime.datetime.now())
query_end_time_str = str(datetime.datetime.now())
Expand All @@ -2574,7 +2600,14 @@ def run(self, statement_2_run, config):
table_ddl_url = rest_setting.get("table_ddl_url")
commands = []
for proton_server_container_name in proton_server_container_name_list:
command = f'docker exec {proton_server_container_name} proton-client --host 127.0.0.1 -u {user} --password {password} --query="{query}"'
if query_type is not None and query_type == "docker":
if echo is not None:
echo_str = json.dumps(echo)
command = f"echo '{echo_str}' | {query}"
else:
command = f"{query}"
else:
command = f'docker exec {proton_server_container_name} proton-client --host 127.0.0.1 -u {user} --password {password} --query="{query}"'
commands.append(command)
logger.debug(f"commands = {commands}")
try:
Expand All @@ -2584,8 +2617,26 @@ def run(self, statement_2_run, config):
depends_on_stream_info_list = QueryClientRest.depends_on_stream_exist(
table_ddl_url, depends_on_stream_list, query_id
)
if depends_on != None: # todo: support depends_on multiple query_id
depends_on_exists = False
depends_on_exists = QueryClientRest.query_exists(depends_on, query_url)
if not depends_on_exists: # todo: error handling logic and error code
error_msg = f"QUERY_DEPENDS_ON_FAILED_TO_START FATAL exception: proton_setting = {proton_setting}, proton_server_container_name = {proton_server_container_name},test_suite_name = {test_suite_name}, test_id = {test_id}, query_id = {query_id}, depends_on = {depends_on} of query_id = {query_id} does not be found during 30s after {query_id} was started, query_states_dict = {query_states_dict}, raise Fatal Error, the depends_on query may failed to start in 30s or exits/ends unexpectedly."
logger.error(error_msg)
query_depends_on_exception = TestException(
error_msg, ErrorCodes.QUERY_DEPENDS_ON_FAILED_TO_START
)
raise query_depends_on_exception
else:
logger.info(
f"proton_setting = {proton_setting},test_suite_name = {test_suite_name}, test_id = {test_id}, proton_server_container_name = {proton_server_container_name}, query_id = {query_id}, depends_on = {depends_on} of query_id = {query_id} exists"
)
time.sleep(1) # for waiting the depends_on query ready.
query_result_str = ""
for command in commands:
# if query_type is not None and query_type == 'rpk' and msg is not None:
# msg_str = json.dumps(msg)
# command = f'echo {msg_str} | {command}'
query_result_str += str(self.exec_command(command))
query_end_time_str = str(datetime.datetime.now())
query_results = {
Expand Down Expand Up @@ -2690,7 +2741,7 @@ def get_proton_client_config(cls, config):
proton_setting = config.get("proton_setting")
proton_cluster_query_node = config.get("proton_cluster_query_node")
proton_server = None
proton_server_native_port = ''
proton_server_native_port = ""
if proton_setting is None or "cluster" not in proton_setting:
proton_server = config.get("proton_server")
proton_server_native_ports = config.get("proton_server_native_port")
Expand Down Expand Up @@ -2832,7 +2883,15 @@ def run(self):
) # create python client
i = 0 # query
auto_terminate_queries = []
test_id, query_id, query_type, query, query_start_time_str, query_end_time_str, message_recv = None, None, None, None, None, None, None
(
test_id,
query_id,
query_type,
query,
query_start_time_str,
query_end_time_str,
message_recv,
) = (None, None, None, None, None, None, None)
while (not tear_down) and query_run_count > 0 and self.alive.value:
try:
query_proc = None
Expand Down Expand Up @@ -3348,11 +3407,18 @@ def case_result_check(cls, test_set, order_check=False, logging_level="INFO"):
"float"
in query_result_column_types[i][1]
):
if (math.isclose(
float(expected_result_field),
float(query_result_field),
rel_tol=1e-2,)
or math.isnan(float(expected_result_field)) and math.isnan(float(query_result_field))
if (
math.isclose(
float(expected_result_field),
float(query_result_field),
rel_tol=1e-2,
)
or math.isnan(
float(expected_result_field)
)
and math.isnan(
float(query_result_field)
)
):
expected_result_row_field_check_arry[
i
Expand Down
36 changes: 36 additions & 0 deletions tests/stream/test_stream_smoke/0097_external_stream_sample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"test_suite_name": "external_stream_sample",
"tag": "smoke",
"test_suite_config":{
"settings_to_run":["redp"],
"tests_2_run": {"ids_2_run": ["all"], "tags_2_run":[], "tags_2_skip":{"default":["todo", "to_support", "change", "bug", "sample"]}}
},
"comments":
"Test example for external stream case.",
"tests": [

{
"id": 0,
"tags": ["external_stream"],
"name": "json string parse from external stream",
"description": "create external stream external1, and run stream query to parse json field from field raw of external1 and insert data into redpanda topic test and verify result",
"steps":[
{"statements": [
{"client":"exec", "query_type":"docker", "query":"docker exec redpanda-1 rpk topic delete external_stream_test"},
{"client":"exec", "query_type":"docker","wait":2, "query":"docker exec redpanda-1 rpk topic create external_stream_test"},
{"client":"python", "query_type": "table","wait":1, "query":"drop stream if exists external1"},
{"client":"python", "query_type": "table","query_id":"9700", "wait":2, "query":"create external stream if not exists external1 (raw string) settings type='kafka', brokers='redpanda-1:9092', topic = 'external_stream_test'"},
{"client":"python","query_id":"9701", "depends_on_stream":"external1", "wait":1, "terminate":"manual","query_type": "stream", "query":"select raw:process from external1"},
{"client":"exec", "query_type": "docker", "depends_on": "9701", "wait":3, "query":"docker exec -i redpanda-1 rpk topic produce external_stream_test", "echo":{"process": "powershell.exe","entity_id": "{42FC7E13-CB3E-5C05-0000-0010A0125101}"}},
{"client":"python","query_type":"table", "query_id":"9702", "wati":3, "query":"kill query where query_id='9701'"}
]}
],

"expected_results": [
{"query_id":"9701", "expected_results":[
["powershell.exe"]]}
]
}

]
}
2 changes: 1 addition & 1 deletion tests/stream/test_stream_smoke/configs/.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
PROTON_VERSION=testing-address-x64-620285ccce69d669f813d9701df583950a2e663b
PROTON_VERSION=
Loading

0 comments on commit 1b233cd

Please sign in to comment.