diff --git a/tests/stream/helpers/rockets.py b/tests/stream/helpers/rockets.py index 4cffb210e9f..cb43f601235 100755 --- a/tests/stream/helpers/rockets.py +++ b/tests/stream/helpers/rockets.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 # _*_ coding: utf-8 _*_ -import datetime, yaml, json, getopt, logging, logging.config, math, os, platform, random, requests,signal,subprocess, sys, threading, time, traceback, uuid +import datetime, yaml, json, getopt, logging, logging.config, math, os, platform, random, requests, signal, subprocess, sys, threading, time, traceback, uuid import multiprocessing as mp from proton_driver import Client, errors from timeplus import Stream, Environment @@ -128,7 +128,8 @@ def timeout_flag( f"{str(datetime.datetime.now())}, {hit_context_info}, timeout_flag exception, error = {error}" ) -def scan_tests_file_path(tests_file_path): + +def scan_tests_file_path(tests_file_path, proton_setting): test_suites_selected = [] test_suite_names_selected = [] all_test_suites_json = [] @@ -143,7 +144,12 @@ def scan_tests_file_path(tests_file_path): files = os.listdir(tests_file_path) logger.debug(f"files = {files}") for file_name in files: - if file_name.endswith(".json") or file_name.endswith(".yaml") or file_name.endswith(".yml"): + test_suite_settings_2_run = None + if ( + file_name.endswith(".json") + or file_name.endswith(".yaml") + or file_name.endswith(".yml") + ): file_abs_path = f"{tests_file_path}/{file_name}" logger.debug(f"file_abs_path = {file_abs_path}") with open(file_abs_path) as test_suite_file: @@ -156,6 +162,11 @@ def scan_tests_file_path(tests_file_path): ) test_suite_name = test_suite.get("test_suite_name") test_suite_tag = test_suite.get("tag") + test_suite_config = test_suite.get("test_suite_config") + if test_suite_config is not None: + test_suite_settings_2_run = test_suite_config.get("settings_to_run") + else: + test_suite_settings_2_run = None if test_suite_name == None or test_suite_tag == "skip": logger.debug( f"test_suite_name is vacant or test_suite_tag == skip and ignore this json file" @@ -163,20 +174,26 @@ def scan_tests_file_path(tests_file_path): pass else: logger.debug( - f"check if test_sute_name = {test_suite_name} in test_suites_set_list = {test_suites_set_list}" + f"check if test_sute_name = {test_suite_name}, test_suite_settings_2_run = {test_suite_settings_2_run}, in test_suites_set_list = {test_suites_set_list}" ) - if test_suites_set_list == None: - test_suites_selected.append(test_suite) - test_suite_names_selected.append(test_suite_name) - elif len(test_suites_set_list) == 0: - test_suites_selected.append(test_suite) - test_suite_names_selected.append(test_suite_name) - elif test_suite_name in test_suites_set_list: - test_suites_selected.append(test_suite) - test_suite_names_selected.append(test_suite_name) - else: - pass - logger.info(f"test_suite_names_selected = {test_suite_names_selected}") + if test_suite_settings_2_run is None or ( + test_suite_settings_2_run is not None + and proton_setting in test_suite_settings_2_run + ): + if test_suites_set_list == None: + test_suites_selected.append(test_suite) + test_suite_names_selected.append(test_suite_name) + elif len(test_suites_set_list) == 0: + test_suites_selected.append(test_suite) + test_suite_names_selected.append(test_suite_name) + elif test_suite_name in test_suites_set_list: + test_suites_selected.append(test_suite) + test_suite_names_selected.append(test_suite_name) + else: + pass + logger.info( + f"proton_setting = {proton_setting}, test_suite_settings_2_run = {test_suite_settings_2_run}, test_suite_names_selected = {test_suite_names_selected}" + ) return { "test_suite_names_selected": test_suite_names_selected, "test_suites_selected": test_suites_selected, @@ -229,7 +246,9 @@ def rockets_context(config_file, tests_file_path, docker_compose_file): configs = json.load(f) elif config_file.endswith(".yaml") or config_file.endswith(".yml"): configs = yaml.safe_load(f) - timeplus_event_stream = configs.get("timeplus_event_stream") #todo: distribute global configs into configs + timeplus_event_stream = configs.get( + "timeplus_event_stream" + ) # todo: distribute global configs into configs timeplus_event_version = configs.get("timeplus_event_version") config = configs.get(proton_setting) logger.debug(f"setting = {proton_setting},config = {config}") @@ -265,7 +284,7 @@ def rockets_context(config_file, tests_file_path, docker_compose_file): ] = proton_cluster_query_node # put proton_cluster_query_node into config config["proton_cluster_query_route_mode"] = proton_cluster_query_route_mode - res_scan_tests_file_path = scan_tests_file_path(tests_file_path) + res_scan_tests_file_path = scan_tests_file_path(tests_file_path, proton_setting) test_suite_names_selected = res_scan_tests_file_path.get( "test_suite_names_selected" ) @@ -571,11 +590,11 @@ def run_test_suites( logger.debug( f"proton_setting = {proton_setting}, total {len(test_suites_selected_sets)} test suties are launched: {test_suite_names}" ) - test_run_list_len_total = 0 - test_suite_result_summary_list = [] + test_run_list_len_total = 0 + test_suite_result_summary_list = [] try: test_suite_run_ctl_queue.join() - test_suite_result_collect_done = 0 + test_suite_result_collect_done = 0 logger.debug( f"test_suite_result_collect_done = {test_suite_result_collect_done},len(test_suites_selected_sets) = {len(test_suites_selected_sets)} " ) @@ -2175,10 +2194,10 @@ def create_table_rest(cls, config, table_schema, retry=3): exception_retry = retry # set the retry times of exception catching table_name = table_schema.get("name") type = table_schema.get("type") - create_start_time = datetime.datetime.now() - res = None + create_start_time = datetime.datetime.now() + res = None while retry > 0 and exception_retry > 0: - query_parameters = table_schema.get("query_parameters") + query_parameters = table_schema.get("query_parameters") try: logger.debug(f"create_table_rest starts...") if query_parameters != None: @@ -2237,7 +2256,7 @@ def create_table_rest(cls, config, table_schema, retry=3): time.sleep(1) exception_retry = retry # reset exception_retry for table_exit check create_table_time_out = 200 # set how many times wait and list table to check if table creation completed. - while create_table_time_out > 0 and exception_retry > 0: + while create_table_time_out > 0 and exception_retry > 0: try: if QueryClientRest.table_exist(table_ddl_url, table_name): logger.info(f"table {table_name} is created successfully.") @@ -2553,12 +2572,19 @@ def run(self, statement_2_run, config): logger.debug( f"local running: handler of logger = {logger.handlers}, logger.level = {logger.level}" ) + proton_setting = config.get("proton_setting") + rest_setting = config.get("rest_setting") + query_url = rest_setting.get("query_url") query_results = {} query_id = str(statement_2_run.get("query_id")) query_id_type = statement_2_run.get("query_id_type") query_type = statement_2_run.get("query_type") + echo = statement_2_run.get("echo") query = statement_2_run.get("query") + test_suite_name = statement_2_run.get("test_suite_name") + test_id = statement_2_run.get("test_id") query_client = statement_2_run.get("client") + depends_on = statement_2_run.get("depends_on") depends_on_stream = statement_2_run.get("depends_on_stream") query_start_time_str = str(datetime.datetime.now()) query_end_time_str = str(datetime.datetime.now()) @@ -2574,7 +2600,14 @@ def run(self, statement_2_run, config): table_ddl_url = rest_setting.get("table_ddl_url") commands = [] for proton_server_container_name in proton_server_container_name_list: - command = f'docker exec {proton_server_container_name} proton-client --host 127.0.0.1 -u {user} --password {password} --query="{query}"' + if query_type is not None and query_type == "docker": + if echo is not None: + echo_str = json.dumps(echo) + command = f"echo '{echo_str}' | {query}" + else: + command = f"{query}" + else: + command = f'docker exec {proton_server_container_name} proton-client --host 127.0.0.1 -u {user} --password {password} --query="{query}"' commands.append(command) logger.debug(f"commands = {commands}") try: @@ -2584,8 +2617,26 @@ def run(self, statement_2_run, config): depends_on_stream_info_list = QueryClientRest.depends_on_stream_exist( table_ddl_url, depends_on_stream_list, query_id ) + if depends_on != None: # todo: support depends_on multiple query_id + depends_on_exists = False + depends_on_exists = QueryClientRest.query_exists(depends_on, query_url) + if not depends_on_exists: # todo: error handling logic and error code + error_msg = f"QUERY_DEPENDS_ON_FAILED_TO_START FATAL exception: proton_setting = {proton_setting}, proton_server_container_name = {proton_server_container_name},test_suite_name = {test_suite_name}, test_id = {test_id}, query_id = {query_id}, depends_on = {depends_on} of query_id = {query_id} does not be found during 30s after {query_id} was started, query_states_dict = {query_states_dict}, raise Fatal Error, the depends_on query may failed to start in 30s or exits/ends unexpectedly." + logger.error(error_msg) + query_depends_on_exception = TestException( + error_msg, ErrorCodes.QUERY_DEPENDS_ON_FAILED_TO_START + ) + raise query_depends_on_exception + else: + logger.info( + f"proton_setting = {proton_setting},test_suite_name = {test_suite_name}, test_id = {test_id}, proton_server_container_name = {proton_server_container_name}, query_id = {query_id}, depends_on = {depends_on} of query_id = {query_id} exists" + ) + time.sleep(1) # for waiting the depends_on query ready. query_result_str = "" for command in commands: + # if query_type is not None and query_type == 'rpk' and msg is not None: + # msg_str = json.dumps(msg) + # command = f'echo {msg_str} | {command}' query_result_str += str(self.exec_command(command)) query_end_time_str = str(datetime.datetime.now()) query_results = { @@ -2690,7 +2741,7 @@ def get_proton_client_config(cls, config): proton_setting = config.get("proton_setting") proton_cluster_query_node = config.get("proton_cluster_query_node") proton_server = None - proton_server_native_port = '' + proton_server_native_port = "" if proton_setting is None or "cluster" not in proton_setting: proton_server = config.get("proton_server") proton_server_native_ports = config.get("proton_server_native_port") @@ -2832,7 +2883,15 @@ def run(self): ) # create python client i = 0 # query auto_terminate_queries = [] - test_id, query_id, query_type, query, query_start_time_str, query_end_time_str, message_recv = None, None, None, None, None, None, None + ( + test_id, + query_id, + query_type, + query, + query_start_time_str, + query_end_time_str, + message_recv, + ) = (None, None, None, None, None, None, None) while (not tear_down) and query_run_count > 0 and self.alive.value: try: query_proc = None @@ -3348,11 +3407,18 @@ def case_result_check(cls, test_set, order_check=False, logging_level="INFO"): "float" in query_result_column_types[i][1] ): - if (math.isclose( - float(expected_result_field), - float(query_result_field), - rel_tol=1e-2,) - or math.isnan(float(expected_result_field)) and math.isnan(float(query_result_field)) + if ( + math.isclose( + float(expected_result_field), + float(query_result_field), + rel_tol=1e-2, + ) + or math.isnan( + float(expected_result_field) + ) + and math.isnan( + float(query_result_field) + ) ): expected_result_row_field_check_arry[ i diff --git a/tests/stream/test_stream_smoke/0097_external_stream_sample.json b/tests/stream/test_stream_smoke/0097_external_stream_sample.json new file mode 100644 index 00000000000..28773ee6146 --- /dev/null +++ b/tests/stream/test_stream_smoke/0097_external_stream_sample.json @@ -0,0 +1,36 @@ +{ + "test_suite_name": "external_stream_sample", + "tag": "smoke", + "test_suite_config":{ + "settings_to_run":["redp"], + "tests_2_run": {"ids_2_run": ["all"], "tags_2_run":[], "tags_2_skip":{"default":["todo", "to_support", "change", "bug", "sample"]}} + }, + "comments": + "Test example for external stream case.", + "tests": [ + + { + "id": 0, + "tags": ["external_stream"], + "name": "json string parse from external stream", + "description": "create external stream external1, and run stream query to parse json field from field raw of external1 and insert data into redpanda topic test and verify result", + "steps":[ + {"statements": [ + {"client":"exec", "query_type":"docker", "query":"docker exec redpanda-1 rpk topic delete external_stream_test"}, + {"client":"exec", "query_type":"docker","wait":2, "query":"docker exec redpanda-1 rpk topic create external_stream_test"}, + {"client":"python", "query_type": "table","wait":1, "query":"drop stream if exists external1"}, + {"client":"python", "query_type": "table","query_id":"9700", "wait":2, "query":"create external stream if not exists external1 (raw string) settings type='kafka', brokers='redpanda-1:9092', topic = 'external_stream_test'"}, + {"client":"python","query_id":"9701", "depends_on_stream":"external1", "wait":1, "terminate":"manual","query_type": "stream", "query":"select raw:process from external1"}, + {"client":"exec", "query_type": "docker", "depends_on": "9701", "wait":3, "query":"docker exec -i redpanda-1 rpk topic produce external_stream_test", "echo":{"process": "powershell.exe","entity_id": "{42FC7E13-CB3E-5C05-0000-0010A0125101}"}}, + {"client":"python","query_type":"table", "query_id":"9702", "wati":3, "query":"kill query where query_id='9701'"} + ]} + ], + + "expected_results": [ + {"query_id":"9701", "expected_results":[ + ["powershell.exe"]]} + ] + } + + ] +} diff --git a/tests/stream/test_stream_smoke/configs/.env b/tests/stream/test_stream_smoke/configs/.env index 5f452434219..f21eb26da0a 100755 --- a/tests/stream/test_stream_smoke/configs/.env +++ b/tests/stream/test_stream_smoke/configs/.env @@ -1 +1 @@ -PROTON_VERSION=testing-address-x64-620285ccce69d669f813d9701df583950a2e663b +PROTON_VERSION= diff --git a/tests/stream/test_stream_smoke/configs/config.json b/tests/stream/test_stream_smoke/configs/config.json index 5d460e1e6c2..b1d006cfb30 100644 --- a/tests/stream/test_stream_smoke/configs/config.json +++ b/tests/stream/test_stream_smoke/configs/config.json @@ -45,6 +45,30 @@ }, "proton_server_container_name": "proton-server" }, + "redpanda_single": { + "rest_setting": { + "host_url": "http://localhost:13218", + "http_snapshot_url": "http://localhost:18123", + "table_ddl_url": "http://localhost:13218/proton/v1/ddl/streams", + "ingest_url": "http://localhost:13218/proton/v1/ingest/streams", + "query_url": "http://localhost:13218/proton/v1/search", + "health_check_url": "http://localhost:13218/proton/ping", + "info_url": "http://localhost:13218/proton/info", + "params": { + "headers": { + "Content-Type": "application/json" + } + } + }, + "proton_server": "localhost", + "proton_server_native_port": "18463", + "proton_admin": { + "name": "proton", + "password": "proton@t+" + }, + "proton_server_container_name": "proton-redp", + "redp_server_container_name": "redpanda-1" + }, "new_setting_sample": { "description": "sample of a new setting", "comment": "setting is for ci_runner to learn how to connect to proton, while how to start the env mapping to settings need to be configured in test_stream_smoke/configs/docker-compose.yaml, so far all setting env is all to be set in this unified docker compose file, #todo: split docker-compose for different settings", @@ -232,6 +256,7 @@ "password": "proton@t+" }, "proton_server_container_name": "proton-redp", + "redp_server_container_name": "redpanda-1", "ci_runner_params": [ { "PROTON_TEST_SUITES": "Not_spport_in_this_level_yet" @@ -260,6 +285,7 @@ "password": "proton@t+" }, "proton_server_container_name": "proton-redp1", + "redp_server_container_name": "redpanda-2", "ci_runner_params": [ { "PROTON_TEST_SUITES": "Not_spport_in_this_level_yet" @@ -288,6 +314,7 @@ "password": "proton@t+" }, "proton_server_container_name": "proton-redp2", + "redp_server_container_name": "redpanda-3", "ci_runner_params": [ { "PROTON_TEST_SUITES": "Not_spport_in_this_level_yet" @@ -316,6 +343,7 @@ "password": "proton@t+" }, "proton_server_container_name": "proton-redp3", + "redp_server_container_name": "redpanda-4", "ci_runner_params": [ { "PROTON_TEST_SUITES": "Not_spport_in_this_level_yet" @@ -344,6 +372,7 @@ "password": "proton@t+" }, "proton_server_container_name": "proton-redp4", + "redp_server_container_name": "redpanda-5", "ci_runner_params": [ { "PROTON_TEST_SUITES": "Not_spport_in_this_level_yet" diff --git a/tests/stream/test_stream_smoke/configs/docker-compose-redp-single.yaml b/tests/stream/test_stream_smoke/configs/docker-compose-redp-single.yaml new file mode 100644 index 00000000000..a90ad3cc9e0 --- /dev/null +++ b/tests/stream/test_stream_smoke/configs/docker-compose-redp-single.yaml @@ -0,0 +1,64 @@ +version: "3.7" +services: + proton-redp: + image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-latest} + pull_policy: always + container_name: proton-redp + volumes: + - ../../proton-redp/datas:/var/lib/proton + - ../../proton-redp/log:/var/log/proton-server + ports: + - "13218:3218" # HTTP Streaming + - "18123:8123" # HTTP Snapshot + - "18463:8463" # TCP Streaming + - "15432:5432" # Postgres Snapshot + - "17587:7587" # TCP Snapshot + deploy: + replicas: 1 + restart_policy: + condition: on-failure + # `proton` depends on STREAM_STORAGE_BROKERS env variable + # to discover stream store + environment: + - STREAM_STORAGE_BROKERS=stream-store:9092 + - MAX_CONCURRENT_QUERIES=100 # Default: 100 + - MAX_CONCURRENT_SELECT_QUERIES=100 # Default: 100 + - MAX_CONCURRENT_INSERT_QUERIES=100 # Default: 100 + - MAX_CONCURRENT_STREAMING_QUERIES=100 # Default: 100 + - MAX_SERVER_MEMORY_USAGE_TO_RAM_RATIO=0.9 # Default: 0.9 + - MAX_SERVER_MEMORY_CACHE_TO_RAM_RATIO=0.5 # Default: 0.5 + - STREAM_STORAGE_TYPE=kafka + + command: > + /bin/bash -c "echo sleeping; sleep 2; /entrypoint.sh" + depends_on: + - stream-store + + + stream-store: + image: vectorized/redpanda:v21.11.19 + ports: + - "9092:9092" + - "29092:29092" + command: + - redpanda + - start + - --smp + - '1' + - --memory + - 1G + - --reserve-memory + - 0M + - --overprovisioned + - --node-id + - '0' + - --kafka-addr + - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr + - PLAINTEXT://stream-store:29092,OUTSIDE://stream-store:9092 + - --set + - "redpanda.auto_create_topics_enabled=false" + - --set + - "redpanda.enable_idempotence=true" + - --check=false + container_name: redpanda-1 diff --git a/tests/stream/test_stream_smoke/configs/docker-compose-redp.yaml b/tests/stream/test_stream_smoke/configs/docker-compose-redp.yaml index 5f49d6bea63..134fa030151 100644 --- a/tests/stream/test_stream_smoke/configs/docker-compose-redp.yaml +++ b/tests/stream/test_stream_smoke/configs/docker-compose-redp.yaml @@ -1,7 +1,7 @@ version: "3.7" services: proton-redp: - image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-develop} + image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-latest} pull_policy: always container_name: proton-redp volumes: @@ -36,7 +36,7 @@ services: - stream-store proton-redp1: - image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-develop} + image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-latest} pull_policy: always container_name: proton-redp1 volumes: @@ -72,7 +72,7 @@ services: proton-redp2: - image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-develop} + image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-latest} pull_policy: always container_name: proton-redp2 volumes: @@ -107,7 +107,7 @@ services: - stream-store2 proton-redp3: - image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-develop} + image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-latest} pull_policy: always container_name: proton-redp3 volumes: @@ -142,7 +142,7 @@ services: - stream-store3 proton-redp4: - image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-develop} + image: ghcr.io/timeplus-io/proton:${PROTON_VERSION:-latest} pull_policy: always container_name: proton-redp4 volumes: