diff --git a/migtests/scripts/functions.sh b/migtests/scripts/functions.sh index 18f3ff9a28..b79c859f59 100644 --- a/migtests/scripts/functions.sh +++ b/migtests/scripts/functions.sh @@ -389,7 +389,6 @@ import_data() { --target-db-name ${TARGET_DB_NAME} --disable-pb true --send-diagnostics=false - --truncate-splits true --max-retries 1 " diff --git a/migtests/scripts/resumption.py b/migtests/scripts/resumption.py new file mode 100755 index 0000000000..68ccc452b2 --- /dev/null +++ b/migtests/scripts/resumption.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 + +import os +import subprocess +import signal +import time +import random +import sys +import select +import yaml +sys.path.append(os.path.join(os.getcwd(), 'migtests/lib')) +import yb +import argparse + +def parse_arguments(): + parser = argparse.ArgumentParser(description="YB Voyager Resumption Test") + parser.add_argument('config_file', metavar='config.yaml', type=str, + help="Path to the YAML configuration file") + return parser.parse_args() + +def load_config(config_file): + """Load the configuration from the provided YAML file.""" + if not os.path.exists(config_file): + raise FileNotFoundError(f"Config file not found: {config_file}") + with open(config_file, 'r') as file: + config = yaml.safe_load(file) + return config + +def prepare_import_data_file_command(config): + """ + Prepares the yb-voyager import data file command based on the given configuration. + """ + file_table_map = config['file_table_map'] + additional_flags = config.get('additional_flags', {}) + + args = [ + 'yb-voyager', 'import', 'data', 'file', + '--export-dir', os.getenv('EXPORT_DIR', ''), + '--target-db-host', os.getenv('TARGET_DB_HOST', ''), + '--target-db-port', os.getenv('TARGET_DB_PORT', ''), + '--target-db-user', os.getenv('TARGET_DB_USER', ''), + '--target-db-password', os.getenv('TARGET_DB_PASSWORD', ''), + '--target-db-schema', os.getenv('TARGET_DB_SCHEMA', ''), + '--target-db-name', os.getenv('TARGET_DB_NAME', ''), + '--disable-pb', 'true', + '--send-diagnostics', 'false', + '--data-dir', os.getenv('DATA_DIR', ''), + '--file-table-map', file_table_map + ] + + if os.getenv('RUN_WITHOUT_ADAPTIVE_PARALLELISM') == 'true': + args.extend(['--enable-adaptive-parallelism', 'false']) + + for flag, value in additional_flags.items(): + args.append(flag) + args.append(value) + + return args + + +def prepare_import_data_command(config): + """ + Prepares the yb-voyager import data command based on the given configuration. + """ + + additional_flags = config.get('additional_flags', {}) + + args = [ + 'yb-voyager', 'import', 'data', + '--export-dir', os.getenv('EXPORT_DIR', ''), + '--target-db-host', os.getenv('TARGET_DB_HOST', ''), + '--target-db-port', os.getenv('TARGET_DB_PORT', ''), + '--target-db-user', os.getenv('TARGET_DB_USER', ''), + '--target-db-password', os.getenv('TARGET_DB_PASSWORD', ''), + '--target-db-name', os.getenv('TARGET_DB_NAME', ''), + '--disable-pb', 'true', + '--send-diagnostics', 'false', + ] + + if os.getenv('SOURCE_DB_TYPE') != 'postgresql': + args.extend(['--target-db-schema', os.getenv('TARGET_DB_SCHEMA', '')]) + + if os.getenv('RUN_WITHOUT_ADAPTIVE_PARALLELISM') == 'true': + args.extend(['--enable-adaptive-parallelism', 'false']) + + for flag, value in additional_flags.items(): + args.append(flag) + args.append(value) + + return args + + +def run_and_resume_voyager(command, resumption): + """ + Runs the yb-voyager command with support for resumption testing. + """ + for attempt in range(1, resumption['max_restarts'] + 1): + print(f"\n--- Attempt {attempt} of {resumption['max_restarts']} ---") + try: + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + print("Running command:", ' '.join(command), flush=True) + + start_time = time.time() + full_output = '' + + while True: + rlist, _, _ = select.select([process.stdout, process.stderr], [], [], 5) + for ready in rlist: + output = ready.readline() + if not output: # Exit if output is empty (end of process output) + break + full_output += output + if time.time() - start_time > 5: + break + + if full_output: + print(full_output.strip(), flush=True) + + while True: + if process.poll() is not None: + break # Process has ended, exit loop + + interrupt_interval_seconds = random.randint( + resumption['min_interrupt_seconds'], + resumption['max_interrupt_seconds'] + ) + print(f"\nProcess will be interrupted in {interrupt_interval_seconds // 60}m {interrupt_interval_seconds % 60}s") + time.sleep(interrupt_interval_seconds) + print(f"\nInterrupting the import process (PID: {process.pid})") + process.send_signal(signal.SIGINT) + + restart_wait_time_seconds = random.randint( + resumption['min_restart_wait_seconds'], + resumption['max_restart_wait_seconds'] + ) + print(f"\nWaiting for {restart_wait_time_seconds // 60}m {restart_wait_time_seconds % 60}s before resuming...") + time.sleep(restart_wait_time_seconds) + + except Exception as e: + print(f"Error occurred during import: {e}") + if process: + process.kill() + raise e + + finally: + if process and process.poll() is None: + print(f"Terminating process (PID: {process.pid})") + process.kill() + process.wait(timeout=30) + + # Final import retry logic + print("\n--- Final attempt to complete the import ---") + + for _ in range(2): + try: + print("\nVoyager command output:") + + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Capture and print output + for line in iter(process.stdout.readline, ''): + print(line.strip()) + sys.stdout.flush() + + process.wait() + + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, command) + + break + except subprocess.CalledProcessError as e: + print("\nVoyager command error:") + for line in iter(process.stderr.readline, ''): + print(line.strip()) + sys.stdout.flush() + time.sleep(30) + else: + print("Final import failed after 2 attempts.") + sys.exit(1) + +def validate_row_counts(row_count, export_dir): + """ + Validates the row counts of the target tables after import. + If the row count validation fails, it logs details and exits. + """ + failed_validations = [] + + for table_identifier, expected_row_count in row_count.items(): + print(f"\nValidating row count for table '{table_identifier}'...") + + if '.' in table_identifier: + schema, table_name = table_identifier.split('.', 1) + else: + schema = "public" + table_name = table_identifier + + tgt = None + try: + tgt = yb.new_target_db() + tgt.connect() + print(f"Connected to target database. Using schema: {schema}") + actual_row_count = tgt.get_row_count(table_name, schema) + + if actual_row_count == expected_row_count: + print(f"\u2714 Validation successful: {table_identifier} - Expected: {expected_row_count}, Actual: {actual_row_count}") + else: + print(f"\u274C Validation failed: {table_identifier} - Expected: {expected_row_count}, Actual: {actual_row_count}") + failed_validations.append((table_identifier, expected_row_count, actual_row_count)) + except Exception as e: + print(f"Error during validation for table '{table_identifier}': {e}") + failed_validations.append((table_identifier, expected_row_count, "Error")) + finally: + if tgt: + tgt.close() + print("Disconnected from target database.") + + if failed_validations: + print("\nValidation failed for the following tables:") + for table, expected, actual in failed_validations: + print(f" Table: {table}, Expected: {expected}, Actual: {actual}") + print(f"\nFor more details, check {export_dir}/logs") + sys.exit(1) + else: + print("\nAll table row counts validated successfully.") + + + +def run_import_with_resumption(config): + + import_type = config.get('import_type', 'file') # Default to 'file' if not specified + + if import_type == 'file': + command = prepare_import_data_file_command(config) + elif import_type == 'offline': + command = prepare_import_data_command(config) + else: + raise ValueError(f"Unsupported import_type: {import_type}") + + run_and_resume_voyager(command, config['resumption']) + + validate_row_counts(config['row_count'], os.getenv('EXPORT_DIR', '')) + + +if __name__ == "__main__": + try: + args = parse_arguments() + config = load_config(args.config_file) + + print(f"Loaded configuration from {args.config_file}") + + run_import_with_resumption(config) + + except Exception as e: + print(f"Test failed: {e}") + sys.exit(1) diff --git a/migtests/scripts/resumption.sh b/migtests/scripts/resumption.sh new file mode 100755 index 0000000000..77aa5d01d4 --- /dev/null +++ b/migtests/scripts/resumption.sh @@ -0,0 +1,107 @@ +#!/usr/bin/env bash + +set -e + +if [ $# -gt 2 ] +then + echo "Usage: $0 TEST_NAME [env.sh]" + exit 1 +fi + +set -x + +export YB_VOYAGER_SEND_DIAGNOSTICS=false +export TEST_NAME=$1 + +export REPO_ROOT="${PWD}" +export SCRIPTS="${REPO_ROOT}/migtests/scripts" +export TESTS_DIR="${REPO_ROOT}/migtests/tests" +export TEST_DIR="${TESTS_DIR}/${TEST_NAME}" +export EXPORT_DIR=${EXPORT_DIR:-"${TEST_DIR}/export-dir"} + +export PYTHONPATH="${REPO_ROOT}/migtests/lib" + +# Order of env.sh import matters. +if [ $2 != "" ] #if env.sh is passed as an argument, source it +then + if [ ! -f "${TEST_DIR}/$2" ] + then + echo "$2 file not found in the test directory" + exit 1 + fi + source ${TEST_DIR}/$2 +else + source ${TEST_DIR}/env.sh +fi + +if [ "${SOURCE_DB_TYPE}" != "" ]; then + source ${SCRIPTS}/${SOURCE_DB_TYPE}/env.sh +fi + +source ${SCRIPTS}/yugabytedb/env.sh +source ${SCRIPTS}/functions.sh + +main() { + echo "Deleting the parent export-dir present in the test directory" + rm -rf ${EXPORT_DIR} + echo "Creating export-dir in the parent test directory" + mkdir -p ${EXPORT_DIR} + echo "Assigning permissions to the export-dir to execute init-db script" + + for script in init-db init-target-db generate_config.py; do + if [ -f "${TEST_DIR}/${script}" ]; then + chmod +x "${TEST_DIR}/${script}" + fi + done + + step "START: ${TEST_NAME}" + print_env + + pushd ${TEST_DIR} + + step "Check the Voyager version installed" + yb-voyager version + + step "Initialise databases" + + for script in init-db init-target-db; do + if [ -f "${TEST_DIR}/${script}" ]; then + "${TEST_DIR}/${script}" + fi + done + + step "Run additional steps in case of offline" + if [ "${SOURCE_DB_TYPE}" != "" ]; then + step "Grant source database user permissions" + grant_permissions ${SOURCE_DB_NAME} ${SOURCE_DB_TYPE} ${SOURCE_DB_SCHEMA} + + step "Export data." + # false if exit code of export_data is non-zero + export_data || { + cat_log_file "yb-voyager-export-data.log" + cat_log_file "debezium-source_db_exporter.log" + exit 1 + } + fi + + step "Generate the YAML file" + if [ -f "${TEST_DIR}/generate_config.py" ]; then + ./generate_config.py + fi + + step "Run import with resumptions" + + ${SCRIPTS}/resumption.py config.yaml + + step "Clean up" + rm -rf "${EXPORT_DIR}" + if [ -f "${TEST_DIR}/generate_config.py" ]; then + rm config.yaml + fi + if [ -n "${SOURCE_DB_NAME}" ]; then + run_psql postgres "DROP DATABASE ${SOURCE_DB_NAME};" + fi + run_ysql yugabyte "DROP DATABASE IF EXISTS ${TARGET_DB_NAME};" +} + +main diff --git a/migtests/tests/pg/partitions/init-db b/migtests/tests/pg/partitions/init-db index 2aa9fc94d6..83d93aebc4 100755 --- a/migtests/tests/pg/partitions/init-db +++ b/migtests/tests/pg/partitions/init-db @@ -12,7 +12,9 @@ run_psql postgres "CREATE DATABASE ${SOURCE_DB_NAME};" echo "Initialising source database." run_psql ${SOURCE_DB_NAME} "\i schema.sql;" -run_psql ${SOURCE_DB_NAME} "\i snapshot.sql;" + +chmod +x ./snapshot.sh +./snapshot.sh 1000 if [ -n "${SOURCE_REPLICA_DB_NAME}" ] && [ "${SOURCE_REPLICA_DB_NAME}" != "${SOURCE_DB_NAME}" ]; then diff --git a/migtests/tests/pg/partitions/snapshot.sh b/migtests/tests/pg/partitions/snapshot.sh new file mode 100755 index 0000000000..59fb52ea32 --- /dev/null +++ b/migtests/tests/pg/partitions/snapshot.sh @@ -0,0 +1,133 @@ +#!/bin/bash + +set -e +set -x + +source ${SCRIPTS}/functions.sh + +# Set default row count (can be overridden by user input) +ROW_COUNT=${1:-1000} # Default to 1000 if no argument is provided + +REGIONS=('London' 'Boston' 'Sydney') +AMOUNTS=(1000 2000 5000) + +# Insert into sales_region table +sql_sales_region=" +WITH region_list AS ( + SELECT ARRAY['${REGIONS[0]}', '${REGIONS[1]}', '${REGIONS[2]}']::TEXT[] region +), amount_list AS ( + SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount +) +INSERT INTO sales_region +(id, amount, branch, region) +SELECT + n, + amount[1 + mod(n, array_length(amount, 1))], + 'Branch ' || n as branch, + region[1 + mod(n, array_length(region, 1))] +FROM amount_list, region_list, generate_series(1, $ROW_COUNT) as n; +" +run_psql "${SOURCE_DB_NAME}" "$sql_sales_region" + +# Insert into test_partitions_sequences table +sql_test_partitions_sequences=" +WITH region_list AS ( + SELECT ARRAY['${REGIONS[0]}', '${REGIONS[1]}', '${REGIONS[2]}']::TEXT[] region +), amount_list AS ( + SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount +) +INSERT INTO test_partitions_sequences +(amount, branch, region) +SELECT + amount[1 + mod(n, array_length(amount, 1))], + 'Branch ' || n as branch, + region[1 + mod(n, array_length(region, 1))] +FROM amount_list, region_list, generate_series(1, $ROW_COUNT) as n; +" +run_psql "${SOURCE_DB_NAME}" "$sql_test_partitions_sequences" + +# Insert into p1.sales_region table +sql_p1_sales_region=" +WITH region_list AS ( + SELECT ARRAY['${REGIONS[0]}', '${REGIONS[1]}', '${REGIONS[2]}']::TEXT[] region +), amount_list AS ( + SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount +) +INSERT INTO p1.sales_region +(id, amount, branch, region) +SELECT + n, + amount[1 + mod(n, array_length(amount, 1))], + 'Branch ' || n as branch, + region[1 + mod(n, array_length(region, 1))] +FROM amount_list, region_list, generate_series(1, $ROW_COUNT) as n; +" +run_psql "${SOURCE_DB_NAME}" "$sql_p1_sales_region" + +# Insert into sales table +sql_sales=" +WITH amount_list AS ( + SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount +), date_list AS ( + SELECT ARRAY['2019-11-01'::TIMESTAMP, '2020-02-01'::TIMESTAMP, '2020-05-01'::TIMESTAMP] sale_date +) +INSERT INTO sales +(id, p_name, amount, sale_date) +SELECT + n, + 'Person ' || n as p_name, + amount[1 + mod(n, array_length(amount, 1))], + sale_date[1 + mod(n, array_length(amount, 1))] +FROM +amount_list, +date_list, +generate_series(1, $ROW_COUNT) as n; +" +run_psql "${SOURCE_DB_NAME}" "$sql_sales" + +# Insert into range_columns_partition_test table +sql_range_columns_partition_test=" +INSERT INTO range_columns_partition_test +VALUES + (5, 5), + (3, 4), + (5, 11), + (5, 12), + (4, 3), + (3, 1); +" +run_psql "${SOURCE_DB_NAME}" "$sql_range_columns_partition_test" + +sql_select_range_columns_partition_test=" +SELECT + tableoid :: regclass, + * +FROM + range_columns_partition_test; +" +run_psql "${SOURCE_DB_NAME}" "$sql_select_range_columns_partition_test" + +# Insert into emp table +sql_emp=" +INSERT INTO emp +SELECT num, 'user_' || num , (RANDOM()*50)::INTEGER +FROM generate_series(1, $ROW_COUNT) AS num; +" +run_psql "${SOURCE_DB_NAME}" "$sql_emp" + +# Insert into customers table +sql_customers=" +WITH status_list AS ( + SELECT '{"ACTIVE", "RECURRING", "REACTIVATED", "EXPIRED"}'::TEXT[] statuses + ), arr_list AS ( + SELECT '{100, 200, 50, 250}'::INT[] arr + ) + INSERT INTO customers + (id, statuses, arr) + SELECT n, + statuses[1 + mod(n, array_length(statuses, 1))], + arr[1 + mod(n, array_length(arr, 1))] + FROM arr_list, generate_series(1,$ROW_COUNT) AS n, status_list; +" +run_psql "${SOURCE_DB_NAME}" "$sql_customers" + diff --git a/migtests/tests/pg/partitions/snapshot.sql b/migtests/tests/pg/partitions/snapshot.sql deleted file mode 100755 index c34bc8edbf..0000000000 --- a/migtests/tests/pg/partitions/snapshot.sql +++ /dev/null @@ -1,92 +0,0 @@ - -WITH region_list AS ( - SELECT '{"London", "Boston", "Sydney"}'::TEXT[] region - ), amount_list AS ( - SELECT '{1000, 2000, 5000}'::INT[] amount - ) - INSERT INTO sales_region - (id, amount, branch, region) - SELECT - n, - amount[1 + mod(n, array_length(amount, 1))], - 'Branch ' || n as branch, - region[1 + mod(n, array_length(region, 1))] - FROM amount_list, region_list, generate_series(1,1000) as n; - - -WITH region_list AS ( - SELECT '{"London", "Boston", "Sydney"}'::TEXT[] region - ), amount_list AS ( - SELECT '{1000, 2000, 5000}'::INT[] amount - ) - INSERT INTO test_partitions_sequences - (amount, branch, region) - SELECT - amount[1 + mod(n, array_length(amount, 1))], - 'Branch ' || n as branch, - region[1 + mod(n, array_length(region, 1))] - FROM amount_list, region_list, generate_series(1,1000) as n; - -WITH region_list AS ( - SELECT '{"London", "Boston", "Sydney"}'::TEXT[] region - ), amount_list AS ( - SELECT '{1000, 2000, 5000}'::INT[] amount - ) - INSERT INTO p1.sales_region - (id, amount, branch, region) - SELECT - n, - amount[1 + mod(n, array_length(amount, 1))], - 'Branch ' || n as branch, - region[1 + mod(n, array_length(region, 1))] - FROM amount_list, region_list, generate_series(1,1000) as n; - -WITH amount_list AS ( - SELECT '{1000, 2000, 5000}'::INT[] amount - ), date_list AS ( - SELECT '{"2019-11-01", "2020-02-01", "2020-05-01"}'::TIMESTAMP[] sale_date - ) - INSERT INTO sales - (id, p_name, amount, sale_date) - SELECT - n, - 'Person ' || n as p_name, - amount[1 + mod(n, array_length(amount, 1))], - sale_date[1 + mod(n, array_length(amount, 1))] - FROM - amount_list, - date_list, - generate_series(1,1000) as n; - -INSERT INTO - range_columns_partition_test -VALUES - (5, 5), - (3, 4), - (5, 11), - (5, 12), - (4, 3), - (3, 1); - -\ d + range_columns_partition_test -SELECT - tableoid :: regclass, - * -FROM - range_columns_partition_test; - -INSERT INTO emp SELECT num, 'user_' || num , (RANDOM()*50)::INTEGER FROM generate_series(1,1000) AS num; - - -WITH status_list AS ( - SELECT '{"ACTIVE", "RECURRING", "REACTIVATED", "EXPIRED"}'::TEXT[] statuses - ), arr_list AS ( - SELECT '{100, 200, 50, 250}'::INT[] arr - ) - INSERT INTO customers - (id, statuses, arr) - SELECT n, - statuses[1 + mod(n, array_length(statuses, 1))], - arr[1 + mod(n, array_length(arr, 1))] - FROM arr_list, generate_series(1,1000) AS n, status_list; - diff --git a/migtests/tests/resumption/import-file/large-number-of-tables/env.sh b/migtests/tests/resumption/import-file/large-number-of-tables/env.sh new file mode 100644 index 0000000000..daab474c33 --- /dev/null +++ b/migtests/tests/resumption/import-file/large-number-of-tables/env.sh @@ -0,0 +1,3 @@ +export TARGET_DB_NAME=${TARGET_DB_NAME:-"resumption_target"} +export TARGET_DB_SCHEMA="public" +export DATA_DIR=${TESTS_DIR}/import-file \ No newline at end of file diff --git a/migtests/tests/resumption/import-file/large-number-of-tables/generate_config.py b/migtests/tests/resumption/import-file/large-number-of-tables/generate_config.py new file mode 100755 index 0000000000..ce645f2e94 --- /dev/null +++ b/migtests/tests/resumption/import-file/large-number-of-tables/generate_config.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import yaml + +def generate_yaml(num_tables=1250): + config = { + "file_table_map": "", + "additional_flags": { + "--delimiter": ",", + "--format": "csv", + "--has-header": "true" + }, + "row_count": {}, + "resumption": { + "max_restarts": 50, + "min_interrupt_seconds": 15, + "max_interrupt_seconds": 30, + "min_restart_wait_seconds": 15, + "max_restart_wait_seconds": 30 + } + } + + # Generate table mappings + table_mappings = [] + for i in range(1, num_tables + 1): + table_name = f"survey{i}" + table_mappings.append(f"FY2021_Survey.csv:{table_name}") + config["row_count"][table_name] = 41715 # Default row count for each table + + config["file_table_map"] = ",".join(table_mappings) + + # Generate YAML content + yaml_content = yaml.dump(config, default_flow_style=False) + + with open("config.yaml", "w") as yaml_file: + yaml_file.write(yaml_content) + + print("YAML file generated successfully.") + +# Generate for 1250 tables +generate_yaml(1250) diff --git a/migtests/tests/resumption/import-file/large-number-of-tables/init-target-db b/migtests/tests/resumption/import-file/large-number-of-tables/init-target-db new file mode 100755 index 0000000000..f9a830ab66 --- /dev/null +++ b/migtests/tests/resumption/import-file/large-number-of-tables/init-target-db @@ -0,0 +1,45 @@ +#!/usr/bin/env bash + +set -e +set -x + +source "${SCRIPTS}/functions.sh" + +function create_tables_sql_function() { + local db_name=$1 + local table_count=$2 + + run_ysql ${db_name} " + DO \$\$ + BEGIN + FOR i IN 1..${table_count} LOOP + EXECUTE format( + 'CREATE TABLE survey%1\$s ( + Industry_Year INT, + Industry_Aggregation_Level VARCHAR(100), + Industry_Code VARCHAR(10), + Industry_Type TEXT, + Dollar_Percentage TEXT, + Industry_name CHAR(10), + Variable_Sub_Category VARCHAR(100), + Variable_Category VARCHAR(100), + Industry_Valuation TEXT, + Industry_Class TEXT + );', + i + ); + END LOOP; + END + \$\$; + " + echo "All ${table_count} tables created successfully." +} + +step "Create target database." +run_ysql yugabyte "DROP DATABASE IF EXISTS ${TARGET_DB_NAME};" +run_ysql yugabyte "CREATE DATABASE ${TARGET_DB_NAME} with COLOCATION=TRUE;" + +# Step: Initialize Target Database with 1250 tables +create_tables_sql_function ${TARGET_DB_NAME} 1250 + +echo "End of init-db script" diff --git a/migtests/tests/resumption/import-file/single-large-table/config.yaml b/migtests/tests/resumption/import-file/single-large-table/config.yaml new file mode 100644 index 0000000000..025686e1a9 --- /dev/null +++ b/migtests/tests/resumption/import-file/single-large-table/config.yaml @@ -0,0 +1,21 @@ +# File Table Map +file_table_map: "accounts_350m_data.sql:accounts_large" + +# Additional Flags +# Uncomment the below to add any additional flags to the command + +additional_flags: + --delimiter: "\\t" + --format: "text" + +# Row Count Validation +row_count: + accounts_large: 350000000 + +# Resumption Settings +resumption: + max_restarts: 30 + min_interrupt_seconds: 300 + max_interrupt_seconds: 720 + min_restart_wait_seconds: 30 + max_restart_wait_seconds: 60 diff --git a/migtests/tests/resumption/import-file/single-large-table/env.sh b/migtests/tests/resumption/import-file/single-large-table/env.sh new file mode 100644 index 0000000000..0dda12d032 --- /dev/null +++ b/migtests/tests/resumption/import-file/single-large-table/env.sh @@ -0,0 +1,4 @@ +export TARGET_DB_NAME=${TARGET_DB_NAME:-"resumption_target"} +export TARGET_DB_SCHEMA="public" +export DATA_DIR="s3://yb-voyager-test-data" +export AWS_DEFAULT_REGION="us-west-2" \ No newline at end of file diff --git a/migtests/tests/resumption/import-file/single-large-table/init-target-db b/migtests/tests/resumption/import-file/single-large-table/init-target-db new file mode 100755 index 0000000000..16e60ae083 --- /dev/null +++ b/migtests/tests/resumption/import-file/single-large-table/init-target-db @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +set -e +set -x + +source ${SCRIPTS}/functions.sh + +step "Create target database." +run_ysql yugabyte "DROP DATABASE IF EXISTS ${TARGET_DB_NAME};" +run_ysql yugabyte "CREATE DATABASE ${TARGET_DB_NAME};" + +echo "Initialising target database." + +run_ysql ${TARGET_DB_NAME} " +CREATE TABLE public.accounts_large ( + block bigint NOT NULL, + address text NOT NULL, + dc_balance bigint DEFAULT 0 NOT NULL, + dc_nonce bigint DEFAULT 0 NOT NULL, + security_balance bigint DEFAULT 0 NOT NULL, + security_nonce bigint DEFAULT 0 NOT NULL, + balance bigint DEFAULT 0 NOT NULL, + nonce bigint DEFAULT 0 NOT NULL, + staked_balance bigint, + PRIMARY KEY (block, address) +); +" + +echo "End of init-db script" \ No newline at end of file diff --git a/migtests/tests/resumption/pg/resumption/config.yaml b/migtests/tests/resumption/pg/resumption/config.yaml new file mode 100644 index 0000000000..5b197b4b53 --- /dev/null +++ b/migtests/tests/resumption/pg/resumption/config.yaml @@ -0,0 +1,57 @@ +# Additional Flags +# Uncomment the below to add any additional flags to the command + +# additional_flags: +# --delimiter: "\\t" + +# Import Type +import_type: offline + +# Row Count Validation +row_count: + Case_Sensitive_Table: 5000000 + case: 5000000 + Table: 5000000 + schema2.Case_Sensitive_Table: 5000000 + schema2.case: 5000000 + schema2.Table: 5000000 + public.boston: 2500000 + public.cust_active: 5625000 + public.cust_arr_small: 3750000 + public.cust_arr_large: 1875000 + public.cust_other: 1875000 + public.cust_part11: 1873299 + public.cust_part12: 1876701 + public.cust_part21: 937509 + public.cust_part22: 937491 + public.customers: 7500000 + public.emp: 7500000 + public.emp_0: 2500997 + public.emp_1: 2499183 + public.emp_2: 2499820 + public.london: 2500000 + public.sales: 7500000 + public.sales_2019_q4: 2500000 + public.sales_2020_q1: 2500000 + public.sales_2020_q2: 2500000 + public.sales_region: 7500000 + public.sydney: 2500000 + p1.sales_region: 7500000 + p2.sydney: 2500000 + p2.london: 2500000 + p2.boston: 2500000 + public.range_columns_partition_test: 6 + public.range_columns_partition_test_p0: 3 + public.range_columns_partition_test_p1: 3 + public.test_partitions_sequences: 7500000 + public.test_partitions_sequences_l: 2500000 + public.test_partitions_sequences_s: 2500000 + public.test_partitions_sequences_b: 2500000 + +# Resumption Settings +resumption: + max_restarts: 15 + min_interrupt_seconds: 20 + max_interrupt_seconds: 45 + min_restart_wait_seconds: 15 + max_restart_wait_seconds: 30 diff --git a/migtests/tests/resumption/pg/resumption/env.sh b/migtests/tests/resumption/pg/resumption/env.sh new file mode 100644 index 0000000000..d749b02c7d --- /dev/null +++ b/migtests/tests/resumption/pg/resumption/env.sh @@ -0,0 +1,4 @@ +export SOURCE_DB_TYPE="postgresql" +export SOURCE_DB_NAME=${SOURCE_DB_NAME:-"pg_resumption_source"} +export SOURCE_DB_SCHEMA="public,p1,p2,schema2" +export TARGET_DB_NAME=${TARGET_DB_NAME:-"pg_resumption_target"} \ No newline at end of file diff --git a/migtests/tests/resumption/pg/resumption/init-db b/migtests/tests/resumption/pg/resumption/init-db new file mode 100755 index 0000000000..8836e0b822 --- /dev/null +++ b/migtests/tests/resumption/pg/resumption/init-db @@ -0,0 +1,67 @@ +#!/usr/bin/env bash + +set -e +set -x + +source "${SCRIPTS}/functions.sh" + +echo "Creating Source and Target databases." +run_psql postgres "DROP DATABASE IF EXISTS ${SOURCE_DB_NAME};" +run_psql postgres "CREATE DATABASE ${SOURCE_DB_NAME};" +run_ysql yugabyte "DROP DATABASE IF EXISTS ${TARGET_DB_NAME};" +run_ysql yugabyte "CREATE DATABASE ${TARGET_DB_NAME} with COLOCATION=TRUE;" + +TEMP_SQL=$(mktemp "${TEST_DIR}/temp_sql.XXXXXX") + +if [ ! -f "$TEMP_SQL" ]; then + echo "Failed to create temporary file in ${TEST_DIR}" + exit 1 +fi + +# Writing temporary SQL script to create DB objects for the test +cat < "$TEMP_SQL" +\i schema.sql; +\i snapshot.sql; +\i ${TESTS_DIR}/pg/partitions/schema.sql; + +CREATE SCHEMA IF NOT EXISTS schema2; +SET SEARCH_PATH TO schema2; +\i schema.sql; +\i snapshot.sql; +EOF + +# Run the temporary SQL script +run_psql "${SOURCE_DB_NAME}" "\i ${TEMP_SQL}" + +# Clean up the temporary SQL script +rm "$TEMP_SQL" + +chmod +x ${TESTS_DIR}/pg/partitions/snapshot.sh +${TESTS_DIR}/pg/partitions/snapshot.sh 7500000 + + +# Create tables in the target database +TEMP_SQL=$(mktemp "${TEST_DIR}/temp_sql.XXXXXX") + +if [ ! -f "$TEMP_SQL" ]; then + echo "Failed to create temporary file in ${TEST_DIR}" + exit 1 +fi + +# Writing temporary SQL script to create DB objects for the test +cat < "$TEMP_SQL" +\i schema.sql; +\i ${TESTS_DIR}/pg/partitions/schema.sql; + +CREATE SCHEMA IF NOT EXISTS schema2; +SET SEARCH_PATH TO schema2; +\i schema.sql; +EOF + +# Run the temporary SQL script +run_ysql ${TARGET_DB_NAME} "\i ${TEMP_SQL}" + +# Clean up the temporary SQL script +rm "$TEMP_SQL" + +echo "End of init-db script" diff --git a/migtests/tests/resumption/pg/resumption/schema.sql b/migtests/tests/resumption/pg/resumption/schema.sql new file mode 100644 index 0000000000..77919b2b98 --- /dev/null +++ b/migtests/tests/resumption/pg/resumption/schema.sql @@ -0,0 +1,63 @@ +-- Numeric and Decimal Types Table +-- Case Sensitive Name + +CREATE TABLE "Case_Sensitive_Table" ( + id SERIAL PRIMARY KEY, + v1 SMALLINT, + v2 INTEGER, + v3 BIGINT, + v4 DECIMAL(6,3), + v5 NUMERIC, + v6 MONEY, + n1 NUMERIC(108,9), + n2 NUMERIC(19,2) +); + +-- Unique Index on v1 +CREATE UNIQUE INDEX idx_Case_Sensitive_Table_id_unique ON "Case_Sensitive_Table" (id); + +-- Expression Index on v4 (DECIMAL) +CREATE INDEX idx_Case_Sensitive_Table_v4_expr ON "Case_Sensitive_Table" ((v4 > 10)); + + +-- String and Enum Types Table +-- Reserved Word Name + +CREATE TYPE week AS ENUM ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'); + +CREATE TABLE "case" ( + id SERIAL PRIMARY KEY, + bool_type BOOLEAN, + char_type1 CHAR(1), + varchar_type VARCHAR(100), + byte_type BYTEA, + enum_type WEEK +); + +-- Partial Index on bool_type (Only TRUE values) +CREATE INDEX idx_case_bool_type_true_partial ON "case" (bool_type) WHERE bool_type = TRUE; + +-- Multi-column Index on char_type1 and enum_type +CREATE INDEX idx_case_char_type1_enum_type ON "case" (char_type1, enum_type); + +-- Datetime and Complex Types Table +-- Case Sensitive Reserved Word Name + +CREATE TABLE "Table" ( + id SERIAL PRIMARY KEY, + v1 DATE, + v2 TIME, + v3 TIMESTAMP, + v4 TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP(0), + v5 JSON, + json_data JSON, + bit_data BIT(10), + int_array INT ARRAY[4], + text_matrix TEXT[][], + bit_varying_data BIT VARYING, + default_bool BOOLEAN DEFAULT FALSE, + default_int INTEGER DEFAULT 10, + default_varchar VARCHAR DEFAULT 'testdefault' +); + +CREATE INDEX idx_Table_int_array_gin ON "Table" USING GIN (int_array); diff --git a/migtests/tests/resumption/pg/resumption/snapshot.sql b/migtests/tests/resumption/pg/resumption/snapshot.sql new file mode 100644 index 0000000000..c1d7b20b34 --- /dev/null +++ b/migtests/tests/resumption/pg/resumption/snapshot.sql @@ -0,0 +1,68 @@ +CREATE OR REPLACE FUNCTION insert_numeric_and_decimal_types(row_count INT) +RETURNS void AS $$ +BEGIN + FOR i IN 1..row_count LOOP + INSERT INTO "Case_Sensitive_Table" (v1, v2, v3, v4, v5, v6, n1, n2) + VALUES ( + FLOOR(RANDOM() * 65535 - 32768)::SMALLINT, -- Smallint range + FLOOR(RANDOM() * 2147483647)::INTEGER, -- Integer range + FLOOR(RANDOM() * 9223372036854775807)::BIGINT, -- Bigint range + ROUND((RANDOM() * 999.999)::NUMERIC, 3)::DECIMAL(6,3), -- Decimal + RANDOM() * 1e6::NUMERIC, -- Numeric for v5 + ((RANDOM() * 1000)::NUMERIC)::MONEY, -- Money + RANDOM() * 1e90::NUMERIC(108,9), -- Numeric(108,9) + RANDOM() * 1e15::NUMERIC(19,2) -- Numeric(19,2), within 10^15 + ); + END LOOP; +END; +$$ LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION insert_string_and_enum_types(row_count INT) +RETURNS void AS $$ +BEGIN + FOR i IN 1..row_count LOOP + INSERT INTO "case" (bool_type, char_type1, varchar_type, byte_type, enum_type) + VALUES ( + RANDOM() > 0.5, -- Boolean + CHR(FLOOR(RANDOM() * 26 + 65)::INT), -- Char(1) (A-Z) + CHR(FLOOR(RANDOM() * 26 + 97)::INT) || '_data', -- Varchar + ('random_bytes_' || i)::BYTEA, -- Bytea + (ARRAY['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'])[FLOOR(RANDOM() * 7 + 1)::INT]::week -- Enum + ); + END LOOP; +END; +$$ LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION insert_datetime_and_complex_types(row_count INT) +RETURNS void AS $$ +BEGIN + FOR i IN 1..row_count LOOP + INSERT INTO "Table" ( + v1, v2, v3, v4, v5, json_data, bit_data, int_array, text_matrix, bit_varying_data, default_bool, default_int, default_varchar + ) + VALUES ( + CURRENT_DATE + (FLOOR(RANDOM() * 365) || ' days')::INTERVAL, -- Random Date within a year + CURRENT_TIME + INTERVAL '1 second' * FLOOR(RANDOM() * 86400), -- Random Time within a day + CURRENT_TIMESTAMP + INTERVAL '1 second' * FLOOR(RANDOM() * 31536000), -- Random Timestamp within a year + CURRENT_TIMESTAMP + INTERVAL '1 second' * FLOOR(RANDOM() * 31536000), -- Timestamp without Time Zone + '{"key":"value"}'::json, -- JSON for v5 + '{"name":"example"}'::json, -- JSON for json_data + (SELECT STRING_AGG(CASE WHEN RANDOM() > 0.5 THEN '1' ELSE '0' END, '') + FROM generate_series(1, 10))::BIT(10), -- Generate 10 random bits (0 or 1) + ARRAY[FLOOR(RANDOM() * 100), FLOOR(RANDOM() * 100), FLOOR(RANDOM() * 100), FLOOR(RANDOM() * 100)], -- Integer array + ARRAY[ARRAY['text1', 'text2'], ARRAY['text3', 'text4']], -- Text matrix (2x2) + (SELECT STRING_AGG(CASE WHEN RANDOM() > 0.5 THEN '1' ELSE '0' END, '') + FROM generate_series(1, 10))::BIT VARYING(10), -- Random valid BIT VARYING (10 bits) + RANDOM() > 0.5, -- Random Boolean for default_bool + FLOOR(RANDOM() * 100), -- Random Integer for default_int + 'test' || i::TEXT -- Random String for default_varchar + ); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +SELECT insert_numeric_and_decimal_types(5000000); +SELECT insert_string_and_enum_types(5000000); +SELECT insert_datetime_and_complex_types(5000000);