diff --git a/suites/squid/cephfs/tier-3_cephfs_system_test.yaml b/suites/squid/cephfs/tier-3_cephfs_system_test.yaml index a828132275..efa563f0de 100644 --- a/suites/squid/cephfs/tier-3_cephfs_system_test.yaml +++ b/suites/squid/cephfs/tier-3_cephfs_system_test.yaml @@ -216,6 +216,16 @@ tests: ENABLE_LOGS : 1 daemon_list : ['mds','client','osd','mgr','mon'] daemon_dbg_level : {'mds':20,'client':20,'osd':10,'mgr':10,'mon':10} + - + test: + abort-on-fail: false + desc: "Setup Crash configuration" + module: cephfs_crash_util.py + name: cephfs-crash-setup + config: + crash_setup : 1 + daemon_list : ['mds','osd','mgr','mon'] + - test: name: CephFS_System_test module: test_parallel.py @@ -242,6 +252,15 @@ tests: name: "CephFS System Test Client IO 7" config: test_name : io_test_workflow_7 + - + test: + abort-on-fail: false + desc: "Check for Crash" + module: cephfs_crash_util.py + name: cephfs-crash-check + config: + crash_check : 1 + daemon_list : ['mds','osd','mgr','mon'] - test: abort-on-fail: false diff --git a/tests/cephfs/cephfs_crash_util.py b/tests/cephfs/cephfs_crash_util.py new file mode 100644 index 0000000000..860906a403 --- /dev/null +++ b/tests/cephfs/cephfs_crash_util.py @@ -0,0 +1,81 @@ +import os +import traceback + +from tests.cephfs.cephfs_system.cephfs_system_utils import CephFSSystemUtils +from utility.log import Log + +log = Log(__name__) + + +def run(ceph_cluster, **kw): + """ + This script is a wrapper to Logs enablement and Logs collection module available in cephfs_uitlsV1 + It can be included prior to test case execution to enable debug logs and post testcase execution to collect logs, + PRETEST: To enable logs + ----------------------- + - test: + name: Enable system debug logs + module: cephfs_logs_util.py + config: + ENABLE_LOGS : 1 + daemon_dbg_level : {'mds':5} + POSTTEST: To collect logs + ------------------------- + - test: + name: Collect and upload system logs + module: cephfs_logs_util.py + config: + UPLOAD_LOGS : 1 + daemon_list : ['mds'] + POSTTEST: To disable logs + ------------------------- + - test: + name: Disable debug logs + module: cephfs_logs_util.py + config: + DISABLE_LOGS : 1 + daemon_list : ['mds'] + + This script will read input params ENABLE_LOGS,UPLOAD_LOGS and DISABLE_LOGS and invoke corresponding + cephfs_utilsV1 module to perform the task. If UPLOAD_LOGS, script will print the path were logs are uploadded. + + """ + try: + fs_system_utils = CephFSSystemUtils(ceph_cluster) + config = kw.get("config") + clients = ceph_cluster.get_ceph_objects("client") + client = clients[1] + log.info("checking Pre-requisites") + + if not clients: + log.info( + f"This test requires minimum 1 client nodes.This has only {len(clients)} clients" + ) + return 1 + + daemon_list = config.get("daemon_list", ["mds"]) + crash_setup = config.get("crash_setup", 0) + crash_check = config.get("crash_check", 0) + crash_copy = config.get("crash_copy", 1) + log_str = ( + f"Test Params : Crash Setup : {crash_setup}, Crash check:{crash_check}" + ) + log_str += f", daemon_list : {daemon_list}" + log.info(log_str) + if crash_setup == 1: + log.info(f"Setup Crash configuration for : {daemon_list}") + fs_system_utils.crash_setup(client, daemon_list=daemon_list) + + if crash_check == 1: + log_dir = os.path.dirname(log.logger.handlers[0].baseFilename) + log.info(f"log path:{log_dir}") + log.info(f"Check for crash from : {daemon_list}") + fs_system_utils.crash_check( + client, crash_copy=crash_copy, daemon_list=daemon_list + ) + return 0 + + except Exception as e: + log.info(e) + log.info(traceback.format_exc()) + return 1 diff --git a/tests/cephfs/cephfs_system/cephfs_system_utils.py b/tests/cephfs/cephfs_system/cephfs_system_utils.py index 17b4fc8f3d..c33439ce3c 100644 --- a/tests/cephfs/cephfs_system/cephfs_system_utils.py +++ b/tests/cephfs/cephfs_system/cephfs_system_utils.py @@ -3,6 +3,7 @@ import os import random import threading +import time from tests.cephfs.cephfs_utilsV1 import FsUtils from utility.log import Log @@ -26,7 +27,7 @@ def __init__(self, ceph_cluster): """ self.mons = ceph_cluster.get_ceph_objects("mon") self.mgrs = ceph_cluster.get_ceph_objects("mgr") - self._mdss = ceph_cluster.get_ceph_objects("mds") + self.mdss = ceph_cluster.get_ceph_objects("mds") self.osds = ceph_cluster.get_ceph_objects("osd") self.clients = ceph_cluster.get_ceph_objects("client") self.fs_util = FsUtils(ceph_cluster) @@ -45,16 +46,17 @@ def get_test_object(self, cephfs_config, req_type="shared"): """ sv_objs = [] for i in cephfs_config: - for j in cephfs_config[i]["group"]: - sv_info = cephfs_config[i]["group"][j][req_type] - for k in sv_info: - if k not in ["sv_prefix", "sv_cnt"]: - sv_obj = {} - sv_obj.update({k: sv_info[k]}) - sv_obj[k].update({"fs_name": i}) - if "default" not in j: - sv_obj[k].update({"group_name": j}) - sv_objs.append(sv_obj) + if "CLUS_MONITOR" not in i: + for j in cephfs_config[i]["group"]: + sv_info = cephfs_config[i]["group"][j][req_type] + for k in sv_info: + if k not in ["sv_prefix", "sv_cnt"]: + sv_obj = {} + sv_obj.update({k: sv_info[k]}) + sv_obj[k].update({"fs_name": i}) + if "default" not in j: + sv_obj[k].update({"group_name": j}) + sv_objs.append(sv_obj) sv_obj = random.choice(sv_objs) if req_type == "unique": @@ -122,3 +124,109 @@ def get_mds_requests(self, fs_name, client): return max(mds_reqs) else: return 0 + + def crash_setup(self, client, daemon_list=["mds"]): + """ + Enable crash module, create crash user and copy keyring file to cluster nodes + """ + cmd = "ceph mgr module enable crash" + client.exec_command(sudo=True, cmd=cmd) + daemon_nodes = { + "mds": self.mdss, + "mgr": self.mgrs, + "mon": self.mons, + "osd": self.osds, + } + log_base_dir = os.path.dirname(log.logger.handlers[0].baseFilename) + + for file_name in ["ceph.conf", "ceph.client.admin.keyring"]: + dst_path = f"{log_base_dir}/{file_name}" + src_path = f"/etc/ceph/{file_name}" + client.download_file(src=src_path, dst=dst_path, sudo=True) + crash_ready_nodes = [] + for daemon in daemon_list: + nodes = daemon_nodes[daemon] + for node in nodes: + if node.node.hostname not in crash_ready_nodes: + cmd = "ls /etc/ceph/ceph.client.crash.keyring" + try: + node.exec_command(sudo=True, cmd=cmd) + crash_ready_nodes.append(node.node.hostname) + except BaseException as ex: + if "No such file" in str(ex): + for file_name in ["ceph.conf", "ceph.client.admin.keyring"]: + src_path = f"{log_base_dir}/{file_name}" + dst_path = f"/etc/ceph/{file_name}" + node.upload_file(src=src_path, dst=dst_path, sudo=True) + node.exec_command( + sudo=True, + cmd="yum install -y --nogpgcheck ceph-common", + ) + cmd = "ceph auth get-or-create client.crash mon 'profile crash' mgr 'profile crash'" + cmd += " > /etc/ceph/ceph.client.crash.keyring" + node.exec_command(sudo=True, cmd=cmd) + crash_ready_nodes.append(node.node.hostname) + return 0 + + def crash_check(self, client, crash_copy=1, daemon_list=["mds"]): + """ + Check if Crash dir exists in all daemon hosting nodes, save meta file if crash exists + """ + daemon_nodes = { + "mds": self.mdss, + "mgr": self.mgrs, + "mon": self.mons, + "osd": self.osds, + } + + out, _ = client.exec_command(sudo=True, cmd="ceph fsid") + fsid = out.strip() + crash_dir = f"/var/lib/ceph/{fsid}/crash" + crash_data = {} + crash_checked_nodes = [] + for daemon in daemon_list: + nodes = daemon_nodes[daemon] + for node in nodes: + if node.node.hostname not in crash_checked_nodes: + crash_list = [] + cmd = f"ls {crash_dir}" + out, _ = node.exec_command(sudo=True, cmd=cmd) + crash_items = out.split() + crash_items.remove("posted") + if len(crash_items) > 0: + for crash_item in crash_items: + crash_path = f"{crash_dir}/{crash_item}" + node.exec_command( + sudo=True, cmd=f"ceph crash post -i {crash_path}/meta" + ) + crash_list.append(crash_item) + crash_data.update({node: crash_list}) + crash_checked_nodes.append(node.node.hostname) + + log_base_dir = os.path.dirname(log.logger.handlers[0].baseFilename) + crash_log_path = f"{log_base_dir}/crash_info/" + try: + os.mkdir(crash_log_path) + except BaseException as ex: + log.info(ex) + log.info(f"crash_data:{crash_data}") + + if crash_copy == 1: + for crash_node in crash_data: + crash_list = crash_data[crash_node] + node_name = crash_node.node.hostname + tmp_path = f"{crash_log_path}/{node_name}" + os.mkdir(tmp_path) + for crash_item in crash_list: + crash_dst_path = f"{crash_log_path}/{node_name}/{crash_item}" + os.mkdir(crash_dst_path) + crash_path = f"{crash_dir}/{crash_item}" + + out, _ = crash_node.exec_command(sudo=True, cmd=f"ls {crash_path}") + crash_files = out.split() + for crash_file in crash_files: + src_path = f"{crash_path}/{crash_file}" + dst_path = f"{crash_dst_path}/{crash_file}" + crash_node.download_file(src=src_path, dst=dst_path, sudo=True) + log.info(f"Copied {crash_path} to {crash_dst_path}") + return 0 diff --git a/tests/cephfs/cephfs_system/cephfs_systest_client_io.py b/tests/cephfs/cephfs_system/cephfs_systest_client_io.py index 9dd3dbb3bc..154ccfcc09 100644 --- a/tests/cephfs/cephfs_system/cephfs_systest_client_io.py +++ b/tests/cephfs/cephfs_system/cephfs_systest_client_io.py @@ -77,6 +77,7 @@ def run(ceph_cluster, **kw): "io_test_workflow_11": "Download large file to cephfs mount that does read/write locks", } test_case_name = config.get("test_name", "all_tests") + cleanup = config.get("cleanup", 0) if test_case_name in io_tests: test_list = [test_case_name] else: @@ -112,6 +113,7 @@ def run(ceph_cluster, **kw): io_test, run_time, log_path, + cleanup, clients, fs_system_utils, cephfs_config, @@ -142,6 +144,7 @@ def io_test_runner( io_test, run_time, log_path, + cleanup, clients, fs_system_utils, cephfs_config, @@ -162,7 +165,7 @@ def io_test_runner( if io_test == "io_test_workflow_9" or io_test == "io_test_workflow_11": sv_info = fs_system_utils.get_test_object(cephfs_config, "shared") io_proc_check_status = io_tests[io_test]( - run_time, log_path, clients, fs_system_utils, sv_info + run_time, log_path, cleanup, clients, fs_system_utils, sv_info ) else: sv_info_list = [] @@ -181,7 +184,7 @@ def io_test_runner( sv_name_list.append(sv_name) k += 1 io_proc_check_status = io_tests[io_test]( - run_time, log_path, clients, fs_system_utils, sv_info_list + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list ) if io_test == "io_test_workflow_7": log.info("Setup Ephemeral Random pinning") @@ -193,10 +196,13 @@ def io_test_runner( return io_proc_check_status -def io_test_workflow_1(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_1( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Read, write and getattr to same file from different client sessions log_name = "parallel_read_write_getattr" log1 = fs_system_utils.configure_logger(log_path, log_name) + log1.info(f"Start {log_name} on {sv_info_list}") attr_list = [ "ceph.file.layout", "ceph.file.layout.pool", @@ -207,20 +213,39 @@ def io_test_workflow_1(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name1 = sv_info[sv_name]["mnt_client1"] + client_name2 = sv_info[sv_name]["mnt_client2"] for i in clients: - if client_name in i.node.hostname: - client = i - break - mnt_pt = sv_info[sv_name]["mnt_pt"] - dir_path = f"{mnt_pt}/client_io" - cmd = f"mkdir -p {dir_path}" + if client_name1 == i.node.hostname: + client1 = i + if client_name2 == i.node.hostname: + client2 = i + + mnt_pt1 = sv_info[sv_name]["mnt_pt1"] + dir_path1 = f"{mnt_pt1}client_io" + cmd = f"mkdir -p {dir_path1}" log1.info(f"Executing cmd {cmd}") try: - client.exec_command(sudo=True, cmd=cmd) + client1.exec_command(sudo=True, cmd=cmd) except BaseException as ex: log1.info(ex) - sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) + mnt_pt2 = sv_info[sv_name]["mnt_pt2"] + dir_path2 = f"{mnt_pt2}client_io" + cmd = f"mkdir -p {dir_path2}" + try: + client2.exec_command(sudo=True, cmd=cmd) + except BaseException as ex: + log1.info(ex) + sv_info_objs.update( + { + sv_name: { + "dir_path1": dir_path1, + "client1": client1, + "dir_path2": dir_path2, + "client2": client2, + } + } + ) # Run Read,Write and getattr in parallel log1.info( f"Run write,read and getattr in parallel, Repeat test until {run_time}secs" @@ -228,102 +253,163 @@ def io_test_workflow_1(run_time, log_path, clients, fs_system_utils, sv_info_lis end_time = datetime.datetime.now() + datetime.timedelta(seconds=run_time) cluster_healthy = 1 while datetime.datetime.now() < end_time and cluster_healthy: + rand_str = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(3)) + ) with parallel() as p: for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] - file_path = f"{dir_path}/io_test_workflow_1" + file_path1 = f"{dir_path1}/io_test_workflow_1_{rand_str}" + file_path2 = f"{dir_path2}/io_test_workflow_1_{rand_str}" file_ops = { - "write": f"fio --name={file_path} --ioengine=libaio --size 1Gb --rw=write --direct=0", - "read": f"fio --name={file_path} --ioengine=libaio --size 1Gb --rw=read --direct=0 --startdelay=1", + "write": f"fio --name={file_path1} --ioengine=libaio --size 10M --rw=write --direct=0", + "read": f"fio --name={file_path2} --ioengine=libaio --size 10M --rw=read --direct=0 --startdelay=1", } - get_attr_cmds = "sleep 2;" + get_attr_cmds1 = "sleep 2;" + get_attr_cmds2 = "sleep 2;" for attr in attr_list: - get_attr_cmds += f"getfattr -n {attr} {file_path}*;" - if "nfs" not in mnt_pt: - file_ops.update({"getattr": f"{get_attr_cmds}"}) + get_attr_cmds1 += f"getfattr -n {attr} {file_path1}*;" + get_attr_cmds2 += f"getfattr -n {attr} {file_path2}*;" + if "nfs" not in mnt_pt1: + file_ops.update({"getattr1": f"{get_attr_cmds1}"}) + if "nfs" not in mnt_pt2: + file_ops.update({"getattr2": f"{get_attr_cmds2}"}) + client_op = { + "write": client1, + "read": client2, + "getattr1": client1, + "getattr2": client2, + } for io_type in file_ops: cmd = file_ops[io_type] + client = client_op[io_type] p.spawn(run_cmd, cmd, client, log1) - with parallel() as p: - for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] - file_path = f"{dir_path}/io_test_workflow_1" - cmd = f"rm -f {file_path}*" - log1.info(f"Running cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + file_path1 = f"{dir_path1}/io_test_workflow_1_{rand_str}" + cmd = f"rm -f {file_path1}*" + log1.info(f"Running cmd {cmd}") + p.spawn(run_cmd, cmd, client1, log1) + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + file_path2 = f"{dir_path2}/io_test_workflow_1_{rand_str}" + cmd = f"rm -f {file_path2}*" + log1.info(f"Running cmd {cmd}") + p.spawn(run_cmd, cmd, client2, log1) + time.sleep(30) cluster_healthy = is_cluster_healthy(clients[0]) + log1.info(f"cluster_health{cluster_healthy}") + log1.info("io_test_workflow_1 completed") return 0 -def io_test_workflow_2(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_2( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Overwrite and Read to same file from different client sessions log_name = "parallel_overwrite_read" log1 = fs_system_utils.configure_logger(log_path, log_name) smallfile_cmd = "python3 /home/cephuser/smallfile/smallfile_cli.py" sv_info_objs = {} - with parallel() as p: - for sv_info in sv_info_list: - for i in sv_info: - sv_name = i - - client_name = sv_info[sv_name]["mnt_client"] - for i in clients: - if client_name in i.node.hostname: - client = i - break - mnt_pt = sv_info[sv_name]["mnt_pt"] - dir_path = f"{mnt_pt}/client_io/io_test_workflow_2" - - cmd = f"mkdir -p {dir_path}" - log1.info(f"Executing cmd {cmd}") - client.exec_command(sudo=True, cmd=cmd) - sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) - cmd = f"{smallfile_cmd} --operation create --threads 1 --file-size 10240 --files 1 --top {dir_path}" - log1.info(f"Executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) - end_time = datetime.datetime.now() + datetime.timedelta(seconds=run_time) log1.info(f"Run overwrite and read in parallel, Repeat test until {run_time}secs") cluster_healthy = 1 + log1.info(f"Start {log_name} on {sv_info_list}") while datetime.datetime.now() < end_time and cluster_healthy: rand_str = "".join( random.choice(string.ascii_lowercase + string.digits) for _ in list(range(3)) ) with parallel() as p: - for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] - client.exec_command( - sudo=True, cmd=f"mkdir -p /var/tmp/smallfile_dir_{rand_str}" + for sv_info in sv_info_list: + for i in sv_info: + sv_name = i + client_name1 = sv_info[sv_name]["mnt_client1"] + client_name2 = sv_info[sv_name]["mnt_client2"] + for i in clients: + if client_name1 == i.node.hostname: + client1 = i + if client_name2 == i.node.hostname: + client2 = i + mnt_pt1 = sv_info[sv_name]["mnt_pt1"] + dir_path1 = f"{mnt_pt1}/client_io/io_test_workflow_2_{rand_str}" + cmd = f"mkdir -p {dir_path1}" + log1.info(f"Executing cmd {cmd}") + client1.exec_command(sudo=True, cmd=cmd) + mnt_pt2 = sv_info[sv_name]["mnt_pt2"] + dir_path2 = f"{mnt_pt2}/client_io/io_test_workflow_2_{rand_str}" + + sv_info_objs.update( + { + sv_name: { + "dir_path1": dir_path1, + "client1": client1, + "dir_path2": dir_path2, + "client2": client2, + } + } ) + cmd = f"{smallfile_cmd} --operation create --threads 1 --file-size 10240 --files 1 --top {dir_path1}" + log1.info(f"Executing cmd {cmd}") + p.spawn(run_cmd, cmd, client1, log1) + + with parallel() as p: + for sv_name in sv_info_objs: + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + for client in [client1, client2]: + client.exec_command( + sudo=True, cmd=f"mkdir -p /var/tmp/smallfile_dir_{rand_str}" + ) for io_type in ["overwrite", "read"]: + if io_type == "overwrite": + client = client1 + dir_path = dir_path1 + else: + client = client2 + dir_path = dir_path2 cmd = f"{smallfile_cmd} --operation {io_type} --threads 1 --file-size 10240 --files 1 " cmd += f"--top {dir_path} --network-sync-dir /var/tmp/smallfile_dir_{rand_str}" - p.spawn(run_cmd, cmd, client, log1) for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - client.exec_command( - sudo=True, cmd=f"rm -rf /var/tmp/smallfile_dir_{rand_str}" - ) + client1 = sv_info_objs[sv_name]["client1"] + client2 = sv_info_objs[sv_name]["client2"] + for client in [client1, client2]: + client.exec_command( + sudo=True, cmd=f"rm -rf /var/tmp/smallfile_dir_{rand_str}" + ) + log1.info("Completed test iteration") cluster_healthy = is_cluster_healthy(clients[0]) - - with parallel() as p: - for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] - cmd = f"rm -rf {dir_path}" - p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + cmd = f"rm -rf {dir_path1}" + p.spawn(run_cmd, cmd, client1, log1) + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + cmd = f"rm -rf {dir_path2}" + p.spawn(run_cmd, cmd, client2, log1) log1.info("io_test_workflow_2 completed") return 0 -def io_test_workflow_3(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_3( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Truncate & Read to same file from different client sessions log_name = "parallel_truncate_read" log1 = fs_system_utils.configure_logger(log_path, log_name) @@ -342,49 +428,77 @@ def io_test_workflow_3(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name1 = sv_info[sv_name]["mnt_client1"] + client_name2 = sv_info[sv_name]["mnt_client2"] for i in clients: - if client_name in i.node.hostname: - client = i - break - mnt_pt = sv_info[sv_name]["mnt_pt"] - dir_path = f"{mnt_pt}/client_io/io_test_workflow_3" - cmd = f"mkdir -p {dir_path}" + if client_name1 == i.node.hostname: + client1 = i + if client_name2 == i.node.hostname: + client2 = i + + mnt_pt1 = sv_info[sv_name]["mnt_pt1"] + dir_path1 = f"{mnt_pt1}/client_io/io_test_workflow_3" + cmd = f"mkdir -p {dir_path1}" log1.info(f"Executing cmd {cmd}") - client.exec_command(sudo=True, cmd=cmd) - sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) - cmd = f"{smallfile_cmd} --operation create --threads 1 --file-size 10240 --files 1 --top {dir_path}" + client1.exec_command(sudo=True, cmd=cmd) + mnt_pt2 = sv_info[sv_name]["mnt_pt2"] + dir_path2 = f"{mnt_pt2}/client_io/io_test_workflow_3" + sv_info_objs.update( + { + sv_name: { + "dir_path1": dir_path1, + "client1": client1, + "dir_path2": dir_path2, + "client2": client2, + } + } + ) + cmd = f"{smallfile_cmd} --operation create --threads 1 --file-size 10240 --files 1 --top {dir_path1}" log1.info(f"Executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + p.spawn(run_cmd, cmd, client1, log1) with parallel() as p: for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] - - client.exec_command( - sudo=True, cmd=f"mkdir -p /var/tmp/smallfile_dir_{rand_str}" - ) + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + for client in [client1, client2]: + client.exec_command( + sudo=True, cmd=f"mkdir -p /var/tmp/smallfile_dir_{rand_str}" + ) for io_type in ["read", "truncate-overwrite"]: + if io_type == "read": + client = client1 + dir_path = dir_path1 + else: + client = client2 + dir_path = dir_path2 cmd = f"{smallfile_cmd} --operation {io_type} --threads 1 --file-size 10240 --files 1 " cmd += f"--top {dir_path} --network-sync-dir /var/tmp/smallfile_dir_{rand_str}" p.spawn(run_cmd, cmd, client, log1) with parallel() as p: for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] - client.exec_command( - sudo=True, cmd=f"rm -rf /var/tmp/smallfile_dir_{rand_str}" - ) - cmd = f"rm -rf {dir_path}" - p.spawn(run_cmd, cmd, client, log1) + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + for client in [client1, client2]: + client.exec_command( + sudo=True, cmd=f"rm -rf /var/tmp/smallfile_dir_{rand_str}" + ) + if cleanup == 1: + cmd = f"rm -rf {dir_path1}" + p.spawn(run_cmd, cmd, client1, log1) cluster_healthy = is_cluster_healthy(clients[0]) log1.info("io_test_workflow_3 completed") return 0 -def io_test_workflow_4(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_4( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Random Read to same file from different client sessions log_name = "parallel_random_reads" log1 = fs_system_utils.configure_logger(log_path, log_name) @@ -393,24 +507,37 @@ def io_test_workflow_4(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name1 = sv_info[sv_name]["mnt_client1"] + client_name2 = sv_info[sv_name]["mnt_client2"] for i in clients: - if client_name in i.node.hostname: - client = i - break - mnt_pt = sv_info[sv_name]["mnt_pt"] - dir_path = f"{mnt_pt}/client_io" - sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) - cmd = f"mkdir -p {dir_path}" + if client_name1 == i.node.hostname: + client1 = i + if client_name2 == i.node.hostname: + client2 = i + mnt_pt1 = sv_info[sv_name]["mnt_pt1"] + dir_path1 = f"{mnt_pt1}/client_io" + + cmd = f"mkdir -p {dir_path1}" log1.info(f"Executing cmd {cmd}") try: - client.exec_command(sudo=True, cmd=cmd) + client1.exec_command(sudo=True, cmd=cmd) except BaseException as ex: log1.info(ex) - - cmd = f"fio --name={dir_path}/io_test_workflow_4 --ioengine=libaio --size 1Gb --rw=write --direct=0" + mnt_pt2 = sv_info[sv_name]["mnt_pt2"] + dir_path2 = f"{mnt_pt2}/client_io" + sv_info_objs.update( + { + sv_name: { + "dir_path1": dir_path1, + "client1": client1, + "dir_path2": dir_path2, + "client2": client2, + } + } + ) + cmd = f"fio --name={dir_path1}/io_test_workflow_4 --ioengine=libaio --size 100M --rw=write --direct=0" log1.info(f"Create FIO file for random read test,executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + p.spawn(run_cmd, cmd, client1, log1) log1.info(f"Run random reads in parallel, Repeat test until {run_time}secs") end_time = datetime.datetime.now() + datetime.timedelta(seconds=run_time) @@ -418,28 +545,40 @@ def io_test_workflow_4(run_time, log_path, clients, fs_system_utils, sv_info_lis while datetime.datetime.now() < end_time and cluster_healthy: with parallel() as p: for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] - dir_path = sv_info_objs[sv_name]["dir_path"] - cmd = f"fio --name={dir_path}/io_test_workflow_4 --ioengine=libaio --size 1Gb --rw=randread --direct=0" + client1 = sv_info_objs[sv_name]["client1"] + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + client2 = sv_info_objs[sv_name]["client2"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + for i in range(0, 5): + client = random.choice([client1, client2]) + dir_path = ( + dir_path1 + if client.node.hostname == client1.node.hostname + else dir_path2 + ) + cmd = f"fio --name={dir_path}/io_test_workflow_4 --ioengine=libaio --size 100M --rw=randread" + cmd += " --direct=0" log1.info( - f"Running cmd Iteration {i} on {sv_name} on {client_name}" + f"Running cmd Iteration {i} on {sv_name} on {client.node.hostname}" ) p.spawn(run_cmd, cmd, client, log1) - - with parallel() as p: - for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"rm -f {dir_path}/io_test_workflow_4*" - log1.info(f"Executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path1"] + client = sv_info_objs[sv_name]["client1"] + cmd = f"rm -f {dir_path}/io_test_workflow_4*" + log1.info(f"Executing cmd {cmd}") + p.spawn(run_cmd, cmd, client, log1) cluster_healthy = is_cluster_healthy(clients[0]) log1.info("io_test_workflow_4 completed") return 0 -def io_test_workflow_5(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_5( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Perform continuous overwrites on large files to generate Fragmented data log_name = "continuous_overwrites_large_file" log1 = fs_system_utils.configure_logger(log_path, log_name) @@ -449,19 +588,28 @@ def io_test_workflow_5(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name = sv_info[sv_name]["mnt_client1"] + for i in clients: - if client_name in i.node.hostname: + if client_name == i.node.hostname: client = i break - mnt_pt = sv_info[sv_name]["mnt_pt"] - - dir_path = f"{mnt_pt}/client_io/io_test_workflow_5" + mnt_pt = sv_info[sv_name]["mnt_pt1"] + rand_str = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(3)) + ) + dir_path = f"{mnt_pt}/client_io" cmd = f"mkdir -p {dir_path}" log1.info(f"Executing cmd {cmd}") - client.exec_command(sudo=True, cmd=cmd) + try: + client.exec_command(sudo=True, cmd=cmd) + except BaseException as ex: + log1.info(ex) + dir_path = f"{mnt_pt}/client_io/io_test_workflow_5_{rand_str}" sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) - cmd = f"{smallfile_cmd} --operation create --threads 1 --file-size 1024000 --files 1 --top {dir_path}" + cmd = f"fio --name={dir_path} --ioengine=libaio --size 1Gb --rw=write" + cmd += " --direct=0" log1.info(f"Executing cmd {cmd}") p.spawn(run_cmd, cmd, client, log1) @@ -475,8 +623,10 @@ def io_test_workflow_5(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_name in sv_info_objs: client = sv_info_objs[sv_name]["client"] dir_path = sv_info_objs[sv_name]["dir_path"] - cmd = f"{smallfile_cmd} --operation overwrite --threads 1 --file-size 1024000 --files 1 " - cmd += f"--top {dir_path}" + cmd = ( + f"fio --name={dir_path} --ioengine=libaio --size 1Gb --rw=randwrite" + ) + cmd += " --bs=10M --direct=0" log1.info(f"Executing cmd {cmd}") p.spawn(run_cmd, cmd, client, log1) @@ -494,19 +644,21 @@ def io_test_workflow_5(run_time, log_path, clients, fs_system_utils, sv_info_lis log1.info(f"Maximum Fragmentation seen across OSDs : {max(frag_list)}") time.sleep(10) cluster_healthy = is_cluster_healthy(clients[0]) - - with parallel() as p: - for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"rm -rf {dir_path}" - log1.info(f"Executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path"] + client = sv_info_objs[sv_name]["client"] + cmd = f"rm -f {dir_path}" + log1.info(f"Executing cmd {cmd}") + p.spawn(run_cmd, cmd, client, log1) log1.info("io_test_workflow_5 completed") return 0 -def io_test_workflow_6(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_6( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Read(find) and delete(rm) in parallel to same file and concurrently to many files log_name = "Parallel_find_delete_many_files" log1 = fs_system_utils.configure_logger(log_path, log_name) @@ -518,21 +670,31 @@ def io_test_workflow_6(run_time, log_path, clients, fs_system_utils, sv_info_lis ) cluster_healthy = 1 while datetime.datetime.now() < end_time and cluster_healthy: + rand_str = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(3)) + ) with parallel() as p: for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name = sv_info[sv_name]["mnt_client1"] + for i in clients: - if client_name in i.node.hostname: + if client_name == i.node.hostname: client = i break - mnt_pt = sv_info[sv_name]["mnt_pt"] - dir_path = f"{mnt_pt}/client_io/io_test_workflow_6" - sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) + mnt_pt = sv_info[sv_name]["mnt_pt1"] + dir_path = f"{mnt_pt}/client_io/io_test_workflow_6_{rand_str}" + cmd = f"mkdir -p {dir_path}" log1.info(f"Executing cmd {cmd}") - client.exec_command(sudo=True, cmd=cmd) + try: + client.exec_command(sudo=True, cmd=cmd) + except BaseException as ex: + log1.info(ex) + + sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) cmd = f"{smallfile_cmd} --operation create --threads 5 --file-size 1024 --files 1000 --top {dir_path}" log1.info(f"Executing cmd {cmd}") p.spawn(run_cmd, cmd, client, log1) @@ -541,24 +703,27 @@ def io_test_workflow_6(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_name in sv_info_objs: client = sv_info_objs[sv_name]["client"] dir_path = sv_info_objs[sv_name]["dir_path"] - cmd = f"find {dir_path} -name *{client_name}* > {dir_path}/tmp_file" - p.spawn(run_cmd, cmd, client, log1) - cmd = f"{smallfile_cmd} --operation delete --threads 5 --file-size 1024 --files 1000 --top {dir_path}" + cmd = f"find {dir_path} -name *{client.node.hostname}* > {dir_path}/tmp_file" p.spawn(run_cmd, cmd, client, log1) - with parallel() as p: - for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"rm -rf {dir_path}" - log1.info(f"Executing cmd {cmd}") + cmd = f"{smallfile_cmd} --operation delete --threads 5 --file-size 1024 --files 1000 --top {dir_path}" p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path"] + client = sv_info_objs[sv_name]["client"] + cmd = f"rm -rf {dir_path}" + log1.info(f"Executing cmd {cmd}") + p.spawn(run_cmd, cmd, client, log1) cluster_healthy = is_cluster_healthy(clients[0]) log1.info("io_test_workflow_6 completed") return 0 -def io_test_workflow_7(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_7( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Scale number of requests/sec to an MDS until 6k log_name = "Scale_MDS_requests_to_6k" log1 = fs_system_utils.configure_logger(log_path, log_name) @@ -567,13 +732,13 @@ def io_test_workflow_7(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name = sv_info[sv_name]["mnt_client1"] for i in clients: - if client_name in i.node.hostname: + if client_name == i.node.hostname: client = i break - mnt_pt = sv_info[sv_name]["mnt_pt"] + mnt_pt = sv_info[sv_name]["mnt_pt1"] fs_name = sv_info[sv_name]["fs_name"] dir_path = f"{mnt_pt}/client_io/io_test_workflow_7" @@ -581,9 +746,11 @@ def io_test_workflow_7(run_time, log_path, clients, fs_system_utils, sv_info_lis cmd = f"mkdir -p {dir_path}" log1.info(f"Executing cmd {cmd}") - client.exec_command(sudo=True, cmd=cmd) - - cmd = f"{smallfile_cmd} --operation create --threads 10 --file-size 10 --files 1000 --top {dir_path}" + try: + client.exec_command(sudo=True, cmd=cmd) + except BaseException as ex: + log1.info(ex) + cmd = f"{smallfile_cmd} --operation create --threads 10 --file-size 10 --files 10000 --top {dir_path}" log1.info(f"Executing cmd {cmd}") client.exec_command( sudo=True, @@ -607,7 +774,7 @@ def io_test_workflow_7(run_time, log_path, clients, fs_system_utils, sv_info_lis for _ in list(range(3)) ) for sv_name in sv_info_objs: - client = sv_info_objs[sv_name]["client"] + client = sv_info_objs[sv_name]["client1"] try: client.exec_command( sudo=True, cmd=f"mkdir -p /var/tmp/smallfile_dir_{rand_str}" @@ -630,7 +797,7 @@ def io_test_workflow_7(run_time, log_path, clients, fs_system_utils, sv_info_lis dir_path = sv_info_objs[sv_name]["dir_path"] client = sv_info_objs[sv_name]["client"] for op in file_ops: - cmd = f"{smallfile_cmd} --operation {op} --threads 10 --file-size 10 --files 1000 " + cmd = f"{smallfile_cmd} --operation {op} --threads 10 --file-size 10 --files 10000 " cmd += f"--top {dir_path} --network-sync-dir /var/tmp/smallfile_dir_{rand_str}" p.spawn(run_cmd, cmd, client, log1) @@ -639,18 +806,21 @@ def io_test_workflow_7(run_time, log_path, clients, fs_system_utils, sv_info_lis if mds_req_limit.value == 0: log1.error("MDS requests count not reach 6k") - with parallel() as p: - for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"rm -rf {dir_path}" - log1.info(f"Executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path"] + client = sv_info_objs[sv_name]["client"] + cmd = f"rm -rf {dir_path}" + log1.info(f"Executing cmd {cmd}") + p.spawn(run_cmd, cmd, client, log1) log1.info("io_test_workflow_7 completed") return 0 -def io_test_workflow_8(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_8( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # unlink and rename to same file in parallel log_name = "parallel_unlink_rename" @@ -659,71 +829,92 @@ def io_test_workflow_8(run_time, log_path, clients, fs_system_utils, sv_info_lis for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name1 = sv_info[sv_name]["mnt_client1"] + client_name2 = sv_info[sv_name]["mnt_client2"] for i in clients: - if client_name in i.node.hostname: - client = i - break - mnt_pt = sv_info[sv_name]["mnt_pt"] - - dir_path = f"{mnt_pt}/client_io/io_test_workflow_8" - cmd = f"mkdir -p {dir_path}" + if client_name1 == i.node.hostname: + client1 = i + if client_name2 == i.node.hostname: + client2 = i + mnt_pt1 = sv_info[sv_name]["mnt_pt1"] + dir_path1 = f"{mnt_pt1}/client_io/io_test_workflow_8" + cmd = f"mkdir -p {dir_path1}" log1.info(f"Executing cmd {cmd}") - client.exec_command(sudo=True, cmd=cmd) - sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) + try: + client1.exec_command(sudo=True, cmd=cmd) + except BaseException as ex: + log1.info(ex) + mnt_pt2 = sv_info[sv_name]["mnt_pt2"] + dir_path2 = f"{mnt_pt2}/client_io/io_test_workflow_8" + sv_info_objs.update( + { + sv_name: { + "dir_path1": dir_path1, + "client1": client1, + "dir_path2": dir_path2, + "client2": client2, + } + } + ) log1.info(f"Run unlink and rename in parallel, Repeat test until {run_time}secs") end_time = datetime.datetime.now() + datetime.timedelta(seconds=run_time) cluster_healthy = 1 while datetime.datetime.now() < end_time and cluster_healthy: for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"fio --name={dir_path}/symlink_parent --ioengine=libaio --size 10MB --rw=randwrite --direct=0" + dir_path1 = sv_info_objs[sv_name]["dir_path1"] + client1 = sv_info_objs[sv_name]["client1"] + dir_path2 = sv_info_objs[sv_name]["dir_path2"] + client2 = sv_info_objs[sv_name]["client2"] + cmd = f"fio --name={dir_path1}/symlink_parent --ioengine=libaio --size 10MB --rw=randwrite --direct=0" log1.info(f"Running cmd {cmd}") - out, _ = client.exec_command(sudo=True, cmd=cmd) + out, _ = client1.exec_command(sudo=True, cmd=cmd) log1.info(out) - testfile = f"{dir_path}/symlink_parent.0.0" - cmd = f"mkdir {dir_path}/symlink_dir" + testfile = f"{dir_path1}/symlink_parent.0.0" + cmd = f"mkdir {dir_path1}/symlink_dir" log1.info(f"Running cmd {cmd}") try: - out, _ = client.exec_command(sudo=True, cmd=cmd) + out, _ = client1.exec_command(sudo=True, cmd=cmd) except BaseException as ex: log1.info(ex) - symlink_file = f"{dir_path}/symlink_dir/symlink_file" - cmd = f"ln -s {testfile} {symlink_file}" + symlink_file1 = f"{dir_path1}/symlink_dir/symlink_file" + symlink_file2 = f"{dir_path2}/symlink_dir/symlink_file" + cmd = f"ln -s {testfile} {symlink_file1}" log1.info(f"Running cmd {cmd}") try: - out, _ = client.exec_command(sudo=True, cmd=cmd) + out, _ = client1.exec_command(sudo=True, cmd=cmd) except BaseException as ex: log1.info(ex) - symlink_file1 = f"{dir_path}/symlink_dir/symlink_file1" + symlink_file_new = f"{dir_path2}/symlink_dir/symlink_file_new" file_ops = { - "unlink": f"unlink {symlink_file}", - "rename": f"mv {symlink_file} {symlink_file1}", + "unlink": f"unlink {symlink_file1}", + "rename": f"mv {symlink_file2} {symlink_file_new}", } sv_info_objs[sv_name].update({"file_ops": file_ops}) - cmd = f"ls {symlink_file}" + cmd = f"ls {symlink_file1}" log1.info(f"Executing cmd {cmd}") try: - out, _ = client.exec_command(sudo=True, cmd=cmd) + out, _ = client1.exec_command(sudo=True, cmd=cmd) log1.info(out) except BaseException as ex: log1.info(ex) with parallel() as p: for sv_name in sv_info_objs: file_ops = sv_info_objs[sv_name]["file_ops"] + client1 = sv_info_objs[sv_name]["client1"] + client2 = sv_info_objs[sv_name]["client2"] for op_type in file_ops: cmd = file_ops[op_type] + client = client1 if op_type == "unlink" else client2 try: p.spawn(run_cmd, cmd, client, log1) except BaseException as ex: log.info(ex) time.sleep(10) for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] + dir_path = sv_info_objs[sv_name]["dir_path1"] + client = sv_info_objs[sv_name]["client1"] cmd = f"rm -rf {dir_path}/symlink_*" log1.info(f"Executing cmd {cmd}") try: @@ -731,26 +922,27 @@ def io_test_workflow_8(run_time, log_path, clients, fs_system_utils, sv_info_lis except BaseException as ex: log.info(ex) cluster_healthy = is_cluster_healthy(clients[0]) - with parallel() as p: - for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"rm -rf {dir_path}" - log1.info(f"Executing cmd {cmd}") - p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path1"] + client = sv_info_objs[sv_name]["client1"] + cmd = f"rm -rf {dir_path}" + log1.info(f"Executing cmd {cmd}") + p.spawn(run_cmd, cmd, client, log1) log1.info("io_test_workflow_8 completed") return 0 -def io_test_workflow_9(run_time, log_path, clients, fs_system_utils, sv_info): +def io_test_workflow_9(run_time, log_path, cleanup, clients, fs_system_utils, sv_info): # Client umount and mount in parallel log_name = "parallel_unmount_mount" log1 = fs_system_utils.configure_logger(log_path, log_name) for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name = sv_info[sv_name]["mnt_client1"] for i in clients: - if client_name in i.node.hostname: + if client_name == i.node.hostname: client = i break @@ -834,11 +1026,14 @@ def io_test_workflow_9(run_time, log_path, clients, fs_system_utils, sv_info): log1.info(ex) time.sleep(10) cluster_healthy = is_cluster_healthy(clients[0]) + log1.info(f"cleanup:{cleanup}") log1.info("io_test_workflow_9 completed") return 0 -def io_test_workflow_10(run_time, log_path, clients, fs_system_utils, sv_info_list): +def io_test_workflow_10( + run_time, log_path, cleanup, clients, fs_system_utils, sv_info_list +): # Continuous IO for given run time such that request seq_num can overflow log_name = "Continuous_io_check_seq_num" log1 = fs_system_utils.configure_logger(log_path, log_name) @@ -846,12 +1041,12 @@ def io_test_workflow_10(run_time, log_path, clients, fs_system_utils, sv_info_li for sv_info in sv_info_list: for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name = sv_info[sv_name]["mnt_client1"] for i in clients: - if client_name in i.node.hostname: + if client_name == i.node.hostname: client = i break - mnt_pt = sv_info[sv_name]["mnt_pt"] + mnt_pt = sv_info[sv_name]["mnt_pt1"] dir_path = f"{mnt_pt}/io_test_workflow_10" sv_info_objs.update({sv_name: {"dir_path": dir_path, "client": client}}) log1.info( @@ -861,9 +1056,14 @@ def io_test_workflow_10(run_time, log_path, clients, fs_system_utils, sv_info_li end_time = datetime.datetime.now() + datetime.timedelta(seconds=run_time) cluster_healthy = 1 while datetime.datetime.now() < end_time and cluster_healthy: + rand_str = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(3)) + ) with parallel() as p: for sv_name in sv_info_objs: dir_path = sv_info_objs[sv_name]["dir_path"] + dir_path += f"_{rand_str}" client = sv_info_objs[sv_name]["client"] cmd = f"mkdir {dir_path};" cmd += "for i in create read append read delete create overwrite rename delete-renamed mkdir rmdir " @@ -871,36 +1071,43 @@ def io_test_workflow_10(run_time, log_path, clients, fs_system_utils, sv_info_li cmd += f"--file-size 2048 --files 10 --top {dir_path}; done" log1.info(f"Executing cmd {cmd}") p.spawn(run_cmd, cmd, client, log1) - with parallel() as p: - for sv_name in sv_info_objs: - dir_path = sv_info_objs[sv_name]["dir_path"] - client = sv_info_objs[sv_name]["client"] - cmd = f"rm -rf {dir_path}" - p.spawn(run_cmd, cmd, client, log1) + if cleanup == 1: + with parallel() as p: + for sv_name in sv_info_objs: + dir_path = sv_info_objs[sv_name]["dir_path"] + dir_path += f"_{rand_str}" + client = sv_info_objs[sv_name]["client"] + cmd = f"rm -rf {dir_path}" + p.spawn(run_cmd, cmd, client, log1) cluster_healthy = is_cluster_healthy(clients[0]) log1.info("io_test_workflow_10 completed") return 0 -def io_test_workflow_11(run_time, log_path, clients, fs_system_utils, sv_info): +def io_test_workflow_11(run_time, log_path, cleanup, clients, fs_system_utils, sv_info): # 11.Download large file to cephfs mount that does read/write locks log_name = "Download_large_file_with_rw_locks" log1 = fs_system_utils.configure_logger(log_path, log_name) for i in sv_info: sv_name = i - client_name = sv_info[sv_name]["mnt_client"] + client_name = sv_info[sv_name]["mnt_client1"] for i in clients: if client_name in i.node.hostname: client = i break - mnt_pt = sv_info[sv_name]["mnt_pt"] + mnt_pt = sv_info[sv_name]["mnt_pt1"] log1.info( f"Run large file download that does read/write locks, Repeat test until {run_time}secs" ) + run_time = 300 end_time = datetime.datetime.now() + datetime.timedelta(seconds=run_time) cluster_healthy = 1 while datetime.datetime.now() < end_time and cluster_healthy: - dir_path = f"{mnt_pt}/io_test_workflow_11" + rand_str = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(3)) + ) + dir_path = f"{mnt_pt}/io_test_workflow_11_{rand_str}" cmd = f"mkdir {dir_path};touch {dir_path}/tmp_workflow11_data" client.exec_command(sudo=True, cmd=cmd) cmd = f"cd {dir_path};wget -O linux.tar.xz http://download.ceph.com/qa/linux-5.4.tar.gz > tmp_workflow11_data" @@ -911,8 +1118,9 @@ def io_test_workflow_11(run_time, log_path, clients, fs_system_utils, sv_info): long_running=True, timeout=3600, ) - cmd = f"rm -rf {dir_path}" - client.exec_command(sudo=True, cmd=cmd) + if cleanup == 1: + cmd = f"rm -rf {dir_path}" + client.exec_command(sudo=True, cmd=cmd) cluster_healthy = is_cluster_healthy(clients[0]) log1.info("io_test_workflow_11 completed") return 0 diff --git a/tests/cephfs/cephfs_system/cephfs_systest_setup.py b/tests/cephfs/cephfs_system/cephfs_systest_setup.py index 982f528a42..998bda622d 100644 --- a/tests/cephfs/cephfs_system/cephfs_systest_setup.py +++ b/tests/cephfs/cephfs_system/cephfs_systest_setup.py @@ -41,7 +41,7 @@ def run(ceph_cluster, **kw): file = "cephfs_systest_data.json" mnt_type_list = ["kernel", "fuse", "nfs"] client1 = clients[0] - ephemeral_pin = config.get("ephemeral_pin", 0) + ephemeral_pin = config.get("ephemeral_pin", 1) client1.upload_file( sudo=True, src=f"tests/cephfs/cephfs_system/{file}", @@ -74,6 +74,7 @@ def run(ceph_cluster, **kw): client1.exec_command(sudo=True, cmd=cmd) for i in cephfs_config: + mds_pin_cnt = 1 fs_details = fs_util_v1.get_fs_info(client1, fs_name=i) if not fs_details: log.info(f"Creating FileSystem {i}") @@ -115,7 +116,8 @@ def run(ceph_cluster, **kw): ) fs_util_v1.create_subvolume(client1, **sv_iter) cephfs_config[i]["group"][j][type].update({sv_name: {}}) - mnt_client = random.choice(clients) + mnt_client1 = random.choice(clients) + mnt_client2 = random.choice(clients) cmd = f"ceph fs subvolume getpath {i} {sv_name}" if "default" not in j: cmd += f" {j}" @@ -126,7 +128,6 @@ def run(ceph_cluster, **kw): mnt_path = subvol_path.strip() mount_params = { "fs_util": fs_util_v1, - "client": mnt_client, "mnt_path": mnt_path, "fs_name": i, "export_created": 0, @@ -143,11 +144,16 @@ def run(ceph_cluster, **kw): } ) log.info(f"Perform {mnt_type} mount of {sv_name}") - mounting_dir, _ = fs_util_v1.mount_ceph(mnt_type, mount_params) + mount_params.update({"client": mnt_client1}) + mounting_dir1, _ = fs_util_v1.mount_ceph(mnt_type, mount_params) + mount_params.update({"client": mnt_client2}) + mounting_dir2, _ = fs_util_v1.mount_ceph(mnt_type, mount_params) cephfs_config[i]["group"][j][type][sv_name].update( { - "mnt_pt": mounting_dir, - "mnt_client": mnt_client.node.hostname, + "mnt_pt1": mounting_dir1, + "mnt_client1": mnt_client1.node.hostname, + "mnt_pt2": mounting_dir2, + "mnt_client2": mnt_client2.node.hostname, "mnt_type": mnt_type, } ) @@ -159,13 +165,19 @@ def run(ceph_cluster, **kw): "nfs_server": nfs_server, } ) - elif ephemeral_pin == 1: - log.info("Configure MDS pinning") - cmd = f"setfattr -n ceph.dir.pin.random -v 0.75 {mounting_dir}" - mnt_client.exec_command( + elif (ephemeral_pin == 1) and (mds_pin_cnt < 5): + log.info(f"Configure MDS pinning : {mds_pin_cnt}") + cmd = f"setfattr -n ceph.dir.pin.random -v 0.75 {mounting_dir1}" + mnt_client1.exec_command( sudo=True, cmd=cmd, ) + cmd = f"setfattr -n ceph.dir.pin.random -v 0.75 {mounting_dir2}" + mnt_client2.exec_command( + sudo=True, + cmd=cmd, + ) + mds_pin_cnt += 1 log.info(f"CephFS System Test config : {cephfs_config}") f = clients[0].remote_file(