Skip to content

Commit

Permalink
[Integ-test] test slurm with custom partitions
Browse files Browse the repository at this point in the history
This new test do the following checks:
1. pcluster nodes in custom partition are not brought down when the partition is set to inactive
2. cluster recovers without over-scaling after terminating EC2 instances of static nodes belonging to multiple partition
3. protected mode only manages pcluster partitions
4. pcluster stop/start only manages pcluster partitions

Signed-off-by: Hanwen <[email protected]>
  • Loading branch information
hanwen-cluster committed Jul 6, 2023
1 parent e9d1ac3 commit 5acfd02
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CHANGELOG

**CHANGES**
- Assign Slurm dynamic nodes a priority (weight) of 1000 by default. This allows Slurm to prioritize idle static nodes over idle dynamic ones.
- Make `aws-parallelcluster-node` daemons handle only ParallelCluster-managed Slurm partitions.

**BUG FIXES**
- Fix cluster creation failure when using CloudFormation custom resource with `ElastipIp` set to `True`.
Expand Down
137 changes: 121 additions & 16 deletions tests/integration-tests/tests/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,87 @@ def test_slurm_scaling(
assert_no_errors_in_logs(remote_command_executor, scheduler)


@pytest.mark.usefixtures("os", "instance", "scheduler")
@pytest.mark.slurm_scaling
def test_slurm_custom_partitions(
region, pcluster_config_reader, s3_bucket_factory, clusters_factory, test_datadir, scheduler_commands_factory
):
"""Test ParallelCluster node deamons manage only Slurm partitions specified in cluster configuration file."""
bucket_name = s3_bucket_factory()
bucket = boto3.resource("s3", region_name=region).Bucket(bucket_name)
bucket.upload_file(str(test_datadir / "preinstall.sh"), "scripts/preinstall.sh")
custom_partitions = ["CustomerPartition1", "CustomerPartition2"]
cluster_config = pcluster_config_reader(bucket=bucket_name)
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = scheduler_commands_factory(remote_command_executor)

logging.info("Checking number of instances...")
static_nodes = list(set(scheduler_commands.get_compute_nodes()))
assert_that(static_nodes).is_length(3)
assert_num_instances_in_cluster(cluster.name, region, len(static_nodes))
logging.info(
f"Setting {custom_partitions[0]} to inactive to verify pcluster nodes in the partition are not brought down..."
)
scheduler_commands.set_partition_state(custom_partitions[0], "INACTIVE")
logging.info("Terminating cluster EC2 instances to check cluster can recover the nodes without overscaling...")
_terminate_nodes_manually(get_compute_nodes_instance_ids(cluster.name, region), region)
# Assert that cluster replaced static node and reset dynamic nodes
_wait_for_node_reset(scheduler_commands, static_nodes, [])
assert_num_instances_in_cluster(cluster.name, region, len(static_nodes))
logging.info(f"Setting {custom_partitions[0]} to active...")
scheduler_commands.set_partition_state(custom_partitions[0], "UP")

logging.info("Decreasing protected failure count for quicker enter protected mode...")
clustermgtd_conf_path = _retrieve_clustermgtd_conf_path(remote_command_executor)
_set_protected_failure_count(remote_command_executor, 2, clustermgtd_conf_path)
failing_partition = "ondemand1"
logging.info("Testing protected mode is skipped while job running and activated when no jobs are in the queue...")
pending_job_id = _test_active_job_running(
scheduler_commands,
remote_command_executor,
running_partition=custom_partitions[0],
failing_partition=failing_partition,
)
_check_protected_mode_message_in_log(remote_command_executor)
check_status(cluster, compute_fleet_status="PROTECTED")
_wait_for_partition_state_changed(scheduler_commands, failing_partition, "INACTIVE")
logging.info(
"Checking paritition other than the failing partition is active. "
"i.e. custom partitions are not managed by protected mode..."
)
all_partitions = scheduler_commands.get_partitions()
for partition in all_partitions:
if partition != failing_partition:
assert_that(scheduler_commands.get_partition_state(partition=partition)).is_equal_to("UP")
scheduler_commands.cancel_job(pending_job_id)

logging.info("Checking pcluster stop...")
cluster.stop()
logging.info("Checking all pcluster cluster EC2 instances are terminated...")
wait_for_num_instances_in_cluster(cluster.name, region, 0)
logging.info("Checking pcluster stop does not manage custom partitions...")
for partition in all_partitions:
if partition in custom_partitions:
expected_state = "UP"
else:
expected_state = "INACTIVE"
assert_that(scheduler_commands.get_partition_state(partition=partition)).is_equal_to(expected_state)

logging.info("Checking pcluster start...")
for partition in custom_partitions:
scheduler_commands.set_partition_state(partition, "INACTIVE")
cluster.start()
wait_for_num_instances_in_cluster(cluster.name, region, len(static_nodes))
logging.info("Checking pcluster start does not manage custom partitions...")
for partition in all_partitions:
if partition in custom_partitions:
expected_state = "INACTIVE"
else:
expected_state = "UP"
assert_that(scheduler_commands.get_partition_state(partition=partition)).is_equal_to(expected_state)


@pytest.mark.usefixtures("region", "os", "instance", "scheduler")
@pytest.mark.slurm_error_handling
def test_error_handling(
Expand Down Expand Up @@ -294,7 +375,16 @@ def test_slurm_protected_mode(
_test_disable_protected_mode(
remote_command_executor, cluster, bucket_name, pcluster_config_reader, clustermgtd_conf_path
)
pending_job_id = _test_active_job_running(scheduler_commands, remote_command_executor, clustermgtd_conf_path)

# Re-enable protected mode
_enable_protected_mode(remote_command_executor, clustermgtd_conf_path)
# Decrease protected failure count for quicker enter protected mode.
_set_protected_failure_count(remote_command_executor, 2, clustermgtd_conf_path)

partition = "half-broken"
pending_job_id = _test_active_job_running(
scheduler_commands, remote_command_executor, running_partition=partition, failing_partition=partition
)
_test_protected_mode(scheduler_commands, remote_command_executor, cluster)
test_cluster_health_metric(["NoCorrespondingInstanceErrors", "OnNodeStartRunErrors"], cluster.cfn_name, region)
_test_job_run_in_working_queue(scheduler_commands)
Expand Down Expand Up @@ -1724,24 +1814,35 @@ def _test_disable_protected_mode(
)


def _test_active_job_running(scheduler_commands, remote_command_executor, clustermgtd_conf_path):
"""Test cluster is not placed into protected mode when there is an active job running even reach threshold."""
def _test_active_job_running(scheduler_commands, remote_command_executor, running_partition, failing_partition):
"""
Test cluster is not placed into protected mode when there is an active job running even reach threshold.
running_partition and failing_partition should usually be the same. When slurm partitions are customized,
running_partition and failing_partition can be different as long as the running job is on nodes belonging to both
partitions.
"""
# Submit a job to the queue contains broken nodes and normal node, submit the job to the normal node to test
# the queue will not be disabled if there's active job running.
cancel_job_id = scheduler_commands.submit_command_and_assert_job_accepted(
submit_command_args={"command": "sleep 3000", "nodes": 1, "partition": "half-broken", "constraint": "c5.xlarge"}
submit_command_args={
"command": "sleep 3000",
"nodes": 1,
"partition": running_partition,
"constraint": "c5.xlarge",
}
)
# Wait for the job to run
scheduler_commands.wait_job_running(cancel_job_id)

# Re-enable protected mode
_enable_protected_mode(remote_command_executor, clustermgtd_conf_path)
# Decrease protected failure count for quicker enter protected mode.
_set_protected_failure_count(remote_command_executor, 2, clustermgtd_conf_path)

# Submit a job to the problematic compute resource, so the protected_failure count will increase
job_id_pending = scheduler_commands.submit_command_and_assert_job_accepted(
submit_command_args={"command": "sleep 60", "nodes": 2, "partition": "half-broken", "constraint": "c5.large"}
submit_command_args={
"command": "sleep 60",
"nodes": 2,
"partition": failing_partition,
"constraint": "c5.large",
}
)
# Check the threshold reach but partition will be still UP since there's active job running
retry(wait_fixed=seconds(20), stop_max_delay=minutes(7))(assert_lines_in_logs)(
Expand All @@ -1751,7 +1852,7 @@ def _test_active_job_running(scheduler_commands, remote_command_executor, cluste
"currently have jobs running, not disabling them",
],
)
assert_that(scheduler_commands.get_partition_state(partition="half-broken")).is_equal_to("UP")
assert_that(scheduler_commands.get_partition_state(partition=failing_partition)).is_equal_to("UP")
# Cancel the job
scheduler_commands.cancel_job(cancel_job_id)
return job_id_pending
Expand All @@ -1760,6 +1861,15 @@ def _test_active_job_running(scheduler_commands, remote_command_executor, cluste
def _test_protected_mode(scheduler_commands, remote_command_executor, cluster):
"""Test cluster will be placed into protected mode when protected count reach threshold and no job running."""
# See if the cluster can be put into protected mode when there's no job running after reaching threshold
_check_protected_mode_message_in_log(remote_command_executor)
# Assert bootstrap failure queues are inactive and compute fleet status is PROTECTED
check_status(cluster, compute_fleet_status="PROTECTED")
assert_that(scheduler_commands.get_partition_state(partition="normal")).is_equal_to("UP")
_wait_for_partition_state_changed(scheduler_commands, "broken", "INACTIVE")
_wait_for_partition_state_changed(scheduler_commands, "half-broken", "INACTIVE")


def _check_protected_mode_message_in_log(remote_command_executor):
retry(wait_fixed=seconds(20), stop_max_delay=minutes(7))(assert_lines_in_logs)(
remote_command_executor,
["/var/log/parallelcluster/clustermgtd"],
Expand All @@ -1770,11 +1880,6 @@ def _test_protected_mode(scheduler_commands, remote_command_executor, cluster):
"is in power up state without valid backing instance",
],
)
# Assert bootstrap failure queues are inactive and compute fleet status is PROTECTED
check_status(cluster, compute_fleet_status="PROTECTED")
assert_that(scheduler_commands.get_partition_state(partition="normal")).is_equal_to("UP")
_wait_for_partition_state_changed(scheduler_commands, "broken", "INACTIVE")
_wait_for_partition_state_changed(scheduler_commands, "half-broken", "INACTIVE")


def _test_job_run_in_working_queue(scheduler_commands):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
Image:
Os: {{ os }}
HeadNode:
InstanceType: {{ instance }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Scheduling:
Scheduler: {{ scheduler }}
SlurmSettings:
CustomSlurmSettings:
- NodeSet: nodeset
Nodes: "ondemand1-st-ondemand1-i1-[1-2],ondemand2-dy-ondemand2-c5large-[1-10]"
- PartitionName: CustomerPartition1
Nodes: nodeset
- PartitionName: CustomerPartition2
Nodes: nodeset
SlurmQueues:
- Name: ondemand1
Networking:
SubnetIds:
- {{ private_subnet_id }}
ComputeResources:
- Name: ondemand1-c5large
Instances:
- InstanceType: c5.large
- Name: ondemand1-i1
Instances:
- InstanceType: {{ instance }}
MinCount: 2
Iam:
S3Access:
- BucketName: {{ bucket }}
CustomActions:
OnNodeStart:
# pre-install script to make c5.large instance type instance has bootstrap error
Script: s3://{{ bucket }}/scripts/preinstall.sh
- Name: ondemand2
Networking:
SubnetIds:
- {{ private_subnet_id }}
ComputeResources:
- Name: ondemand2-c5large
Instances:
- InstanceType: c5.large
- Name: ondemand2-i1
Instances:
- InstanceType: {{ instance }}
MinCount: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file.
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
# See the License for the specific language governing permissions and limitations under the License.
function get_instance_type() {
token=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 300")
instance_type_url="http://169.254.169.254/latest/meta-data/instance-type"
instance_type=$(curl --retry 3 --retry-delay 0 --silent --fail -H "X-aws-ec2-metadata-token: ${token}" "${instance_type_url}")
}
get_instance_type
if [ "${instance_type}" == "c5.large" ]; then
echo "Test Bootstrap error that causes instance to self terminate."
exit 1
fi

0 comments on commit 5acfd02

Please sign in to comment.