Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the test scripts for resumption #2117

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion migtests/scripts/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ import_data() {
--target-db-name ${TARGET_DB_NAME}
--disable-pb true
--send-diagnostics=false
--truncate-splits true
--max-retries 1
"

Expand Down
260 changes: 260 additions & 0 deletions migtests/scripts/resumption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
#!/usr/bin/env python3

import os
import subprocess
import signal
import time
import random
import sys
import select
import yaml
sys.path.append(os.path.join(os.getcwd(), 'migtests/lib'))
import yb
import argparse

def parse_arguments():
parser = argparse.ArgumentParser(description="YB Voyager Resumption Test")
parser.add_argument('config_file', metavar='config.yaml', type=str,
help="Path to the YAML configuration file")
return parser.parse_args()

def load_config(config_file):
"""Load the configuration from the provided YAML file."""
if not os.path.exists(config_file):
raise FileNotFoundError(f"Config file not found: {config_file}")
with open(config_file, 'r') as file:
config = yaml.safe_load(file)
return config

def prepare_import_data_file_command(config):
"""
Prepares the yb-voyager import data file command based on the given configuration.
"""
file_table_map = config['file_table_map']
additional_flags = config.get('additional_flags', {})

args = [
'yb-voyager', 'import', 'data', 'file',
'--export-dir', os.getenv('EXPORT_DIR', ''),
'--target-db-host', os.getenv('TARGET_DB_HOST', ''),
'--target-db-port', os.getenv('TARGET_DB_PORT', ''),
'--target-db-user', os.getenv('TARGET_DB_USER', ''),
'--target-db-password', os.getenv('TARGET_DB_PASSWORD', ''),
'--target-db-schema', os.getenv('TARGET_DB_SCHEMA', ''),
'--target-db-name', os.getenv('TARGET_DB_NAME', ''),
'--disable-pb', 'true',
'--send-diagnostics', 'false',
'--data-dir', os.getenv('DATA_DIR', ''),
'--file-table-map', file_table_map
]

if os.getenv('RUN_WITHOUT_ADAPTIVE_PARALLELISM') == 'true':
args.extend(['--enable-adaptive-parallelism', 'false'])

for flag, value in additional_flags.items():
args.append(flag)
args.append(value)

return args


def prepare_import_data_command(config):
"""
Prepares the yb-voyager import data command based on the given configuration.
"""

additional_flags = config.get('additional_flags', {})

args = [
'yb-voyager', 'import', 'data',
'--export-dir', os.getenv('EXPORT_DIR', ''),
'--target-db-host', os.getenv('TARGET_DB_HOST', ''),
'--target-db-port', os.getenv('TARGET_DB_PORT', ''),
'--target-db-user', os.getenv('TARGET_DB_USER', ''),
'--target-db-password', os.getenv('TARGET_DB_PASSWORD', ''),
'--target-db-name', os.getenv('TARGET_DB_NAME', ''),
'--disable-pb', 'true',
'--send-diagnostics', 'false',
]

if os.getenv('SOURCE_DB_TYPE') != 'postgresql':
args.extend(['--target-db-schema', os.getenv('TARGET_DB_SCHEMA', '')])

if os.getenv('RUN_WITHOUT_ADAPTIVE_PARALLELISM') == 'true':
args.extend(['--enable-adaptive-parallelism', 'false'])

for flag, value in additional_flags.items():
args.append(flag)
args.append(value)

return args


def run_and_resume_voyager(command, resumption):
"""
Runs the yb-voyager command with support for resumption testing.
"""
for attempt in range(1, resumption['max_restarts'] + 1):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's get/define all the configs in the beginning. It will make it easier to understand what all configuration options are involved.

max_restarts = resumption['max_restarts']
min_interrupt_seconds = resumption['min_interrupt_seconds']
... 

print(f"\n--- Attempt {attempt} of {resumption['max_restarts']} ---")
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print("Running command:", ' '.join(command), flush=True)

start_time = time.time()
full_output = ''

while True:
rlist, _, _ = select.select([process.stdout, process.stderr], [], [], 5)
for ready in rlist:
output = ready.readline()
if not output: # Exit if output is empty (end of process output)
break
full_output += output
if time.time() - start_time > 5:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why break ? what is 5? seconds? minutes?

break

if full_output:
print(full_output.strip(), flush=True)

while True:
if process.poll() is not None:
break # Process has ended, exit loop

interrupt_interval_seconds = random.randint(
resumption['min_interrupt_seconds'],
resumption['max_interrupt_seconds']
)
print(f"\nProcess will be interrupted in {interrupt_interval_seconds // 60}m {interrupt_interval_seconds % 60}s")
time.sleep(interrupt_interval_seconds)
print(f"\nInterrupting the import process (PID: {process.pid})")
process.send_signal(signal.SIGINT)

restart_wait_time_seconds = random.randint(
resumption['min_restart_wait_seconds'],
resumption['max_restart_wait_seconds']
)
print(f"\nWaiting for {restart_wait_time_seconds // 60}m {restart_wait_time_seconds % 60}s before resuming...")
time.sleep(restart_wait_time_seconds)

except Exception as e:
print(f"Error occurred during import: {e}")
if process:
process.kill()
raise e

finally:
if process and process.poll() is None:
print(f"Terminating process (PID: {process.pid})")
process.kill()
process.wait(timeout=30)

# Final import retry logic
print("\n--- Final attempt to complete the import ---")

for _ in range(2):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 attempts finally?

try:
print("\nVoyager command output:")

process = subprocess.Popen(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separate function for starting command (can be called in above for-loop as well)

command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)

# Capture and print output
for line in iter(process.stdout.readline, ''):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the above for-loop, we're reading both stderr and stdout, here we're only reading stdout. Any particular reason? Would be good to be consistent here (call a common function that captures stdout/stderr)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also till when will you keep reading? How long will the loop run?

print(line.strip())
sys.stdout.flush()

process.wait()

if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command)

break
except subprocess.CalledProcessError as e:
print("\nVoyager command error:")
for line in iter(process.stderr.readline, ''):
print(line.strip())
sys.stdout.flush()
time.sleep(30)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sleep?

else:
print("Final import failed after 2 attempts.")
sys.exit(1)

def validate_row_counts(row_count, export_dir):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for future: you can create a common python file that has such helper
functions.

"""
Validates the row counts of the target tables after import.
If the row count validation fails, it logs details and exits.
"""
failed_validations = []

for table_identifier, expected_row_count in row_count.items():
print(f"\nValidating row count for table '{table_identifier}'...")

if '.' in table_identifier:
schema, table_name = table_identifier.split('.', 1)
else:
schema = "public"
table_name = table_identifier

tgt = None
try:
tgt = yb.new_target_db()
tgt.connect()
print(f"Connected to target database. Using schema: {schema}")
actual_row_count = tgt.get_row_count(table_name, schema)

if actual_row_count == expected_row_count:
print(f"\u2714 Validation successful: {table_identifier} - Expected: {expected_row_count}, Actual: {actual_row_count}")
else:
print(f"\u274C Validation failed: {table_identifier} - Expected: {expected_row_count}, Actual: {actual_row_count}")
failed_validations.append((table_identifier, expected_row_count, actual_row_count))
except Exception as e:
print(f"Error during validation for table '{table_identifier}': {e}")
failed_validations.append((table_identifier, expected_row_count, "Error"))
finally:
if tgt:
tgt.close()
print("Disconnected from target database.")

if failed_validations:
print("\nValidation failed for the following tables:")
for table, expected, actual in failed_validations:
print(f" Table: {table}, Expected: {expected}, Actual: {actual}")
print(f"\nFor more details, check {export_dir}/logs")
sys.exit(1)
else:
print("\nAll table row counts validated successfully.")



def run_import_with_resumption(config):

import_type = config.get('import_type', 'file') # Default to 'file' if not specified

if import_type == 'file':
command = prepare_import_data_file_command(config)
elif import_type == 'offline':
command = prepare_import_data_command(config)
else:
raise ValueError(f"Unsupported import_type: {import_type}")

run_and_resume_voyager(command, config['resumption'])

validate_row_counts(config['row_count'], os.getenv('EXPORT_DIR', ''))


if __name__ == "__main__":
try:
args = parse_arguments()
config = load_config(args.config_file)

print(f"Loaded configuration from {args.config_file}")

run_import_with_resumption(config)

except Exception as e:
print(f"Test failed: {e}")
sys.exit(1)
107 changes: 107 additions & 0 deletions migtests/scripts/resumption.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env bash

set -e

if [ $# -gt 2 ]
then
echo "Usage: $0 TEST_NAME [env.sh]"
exit 1
fi

set -x

export YB_VOYAGER_SEND_DIAGNOSTICS=false
export TEST_NAME=$1

export REPO_ROOT="${PWD}"
export SCRIPTS="${REPO_ROOT}/migtests/scripts"
export TESTS_DIR="${REPO_ROOT}/migtests/tests"
export TEST_DIR="${TESTS_DIR}/${TEST_NAME}"
export EXPORT_DIR=${EXPORT_DIR:-"${TEST_DIR}/export-dir"}

export PYTHONPATH="${REPO_ROOT}/migtests/lib"

# Order of env.sh import matters.
if [ $2 != "" ] #if env.sh is passed as an argument, source it
then
if [ ! -f "${TEST_DIR}/$2" ]
then
echo "$2 file not found in the test directory"
exit 1
fi
source ${TEST_DIR}/$2
else
source ${TEST_DIR}/env.sh
fi

if [ "${SOURCE_DB_TYPE}" != "" ]; then
source ${SCRIPTS}/${SOURCE_DB_TYPE}/env.sh
fi

source ${SCRIPTS}/yugabytedb/env.sh
source ${SCRIPTS}/functions.sh

main() {
echo "Deleting the parent export-dir present in the test directory"
rm -rf ${EXPORT_DIR}
echo "Creating export-dir in the parent test directory"
mkdir -p ${EXPORT_DIR}
echo "Assigning permissions to the export-dir to execute init-db script"

for script in init-db init-target-db generate_config.py; do
if [ -f "${TEST_DIR}/${script}" ]; then
chmod +x "${TEST_DIR}/${script}"
fi
done

step "START: ${TEST_NAME}"
print_env

pushd ${TEST_DIR}

step "Check the Voyager version installed"
yb-voyager version

step "Initialise databases"

for script in init-db init-target-db; do
if [ -f "${TEST_DIR}/${script}" ]; then
"${TEST_DIR}/${script}"
fi
done

step "Run additional steps in case of offline"
if [ "${SOURCE_DB_TYPE}" != "" ]; then
step "Grant source database user permissions"
grant_permissions ${SOURCE_DB_NAME} ${SOURCE_DB_TYPE} ${SOURCE_DB_SCHEMA}

step "Export data."
# false if exit code of export_data is non-zero
export_data || {
cat_log_file "yb-voyager-export-data.log"
cat_log_file "debezium-source_db_exporter.log"
exit 1
}
fi

step "Generate the YAML file"
if [ -f "${TEST_DIR}/generate_config.py" ]; then
./generate_config.py
fi

step "Run import with resumptions"

${SCRIPTS}/resumption.py config.yaml

step "Clean up"
rm -rf "${EXPORT_DIR}"
if [ -f "${TEST_DIR}/generate_config.py" ]; then
rm config.yaml
fi
if [ -n "${SOURCE_DB_NAME}" ]; then
run_psql postgres "DROP DATABASE ${SOURCE_DB_NAME};"
fi
run_ysql yugabyte "DROP DATABASE IF EXISTS ${TARGET_DB_NAME};"
}

main
4 changes: 3 additions & 1 deletion migtests/tests/pg/partitions/init-db
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ run_psql postgres "CREATE DATABASE ${SOURCE_DB_NAME};"
echo "Initialising source database."

run_psql ${SOURCE_DB_NAME} "\i schema.sql;"
run_psql ${SOURCE_DB_NAME} "\i snapshot.sql;"

chmod +x ./snapshot.sh
./snapshot.sh 1000

if [ -n "${SOURCE_REPLICA_DB_NAME}" ] && [ "${SOURCE_REPLICA_DB_NAME}" != "${SOURCE_DB_NAME}" ];
then
Expand Down
Loading