Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Fix worker #1935

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions chroma_core/models/client_mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def get_deps(self, state=None):
state = self.state

deps = []

if self.host.immutable_state:
return DependAll(deps)

if state == "mounted":
# Depend on this mount's host having LNet up. If LNet is stopped
# on the host, this filesystem will be unmounted first.
Expand Down Expand Up @@ -293,7 +297,16 @@ def description(self):
def get_steps(self):
search = lambda cm: (cm.host == self.host and cm.state == "unmounted")
unmounted = ObjectCache.get(LustreClientMount, search)
args = dict(host=self.host, filesystems=[(m.filesystem.mount_path(), m.mountpoint) for m in unmounted])
args = dict(
host=self.host,
filesystems=[
(
ObjectCache.get_one(ManagedFilesystem, lambda mf, mtd=m: mf.name == mtd.filesystem).mount_path(),
m.mountpoint,
)
for m in unmounted
],
)
return [(MountLustreFilesystemsStep, args)]


Expand Down Expand Up @@ -353,5 +366,14 @@ def description(self):
def get_steps(self):
search = lambda cm: (cm.host == self.host and cm.state == "mounted")
mounted = ObjectCache.get(LustreClientMount, search)
args = dict(host=self.host, filesystems=[(m.filesystem.mount_path(), m.mountpoint) for m in mounted])
args = dict(
host=self.host,
filesystems=[
(
ObjectCache.get_one(ManagedFilesystem, lambda mf, mtd=m: mf.name == mtd.filesystem).mount_path(),
m.mountpoint,
)
for m in mounted
],
)
return [(UnmountLustreFilesystemsStep, args)]
39 changes: 21 additions & 18 deletions chroma_core/models/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ class BaseSetupHostJob(NullStateChangeJob):
class Meta:
abstract = True

def _common_deps(self, lnet_state_required, lnet_acceptable_states, lnet_unacceptable_states):
def _common_deps(self):
# It really does not feel right that this is in here, but it does sort of work. These are the things
# it is dependent on so create them. Also I can't work out with today's state machine anywhere else to
# put them that works.
Expand Down Expand Up @@ -826,23 +826,13 @@ def _common_deps(self, lnet_state_required, lnet_acceptable_states, lnet_unaccep

deps = []

if self.target_object.lnet_configuration:
deps.append(
DependOn(
self.target_object.lnet_configuration,
lnet_state_required,
lnet_acceptable_states,
lnet_unacceptable_states,
)
)

if self.target_object.pacemaker_configuration:
deps.append(DependOn(self.target_object.pacemaker_configuration, "started"))

if self.target_object.ntp_configuration:
deps.append(DependOn(self.target_object.ntp_configuration, "configured"))

return DependAll(deps)
return deps


class InitialiseBlockDeviceDriversStep(Step):
Expand Down Expand Up @@ -871,7 +861,12 @@ def description(self):
return help_text["setup_managed_host_on"] % self.target_object

def get_deps(self):
return self._common_deps("lnet_up", None, None)
deps = self._common_deps()

if self.target_object.lnet_configuration:
deps.append(DependOn(self.target_object.lnet_configuration, "lnet_up"))

return DependAll(deps)

def get_steps(self):
return [(InitialiseBlockDeviceDriversStep, {"host": self.target_object})]
Expand All @@ -891,9 +886,9 @@ class Meta:
ordering = ["id"]

def get_deps(self):
# Moving out of unconfigured into lnet_unloaded will mean that lnet will start monitoring and responding to
# the state. Once we start monitoring any state other than unconfigured is acceptable.
return self._common_deps("lnet_unloaded", None, ["unconfigured"])
deps = self._common_deps()

return DependAll(deps)

def description(self):
return help_text["setup_monitored_host_on"] % self.target_object
Expand All @@ -913,14 +908,19 @@ class Meta:
ordering = ["id"]

def get_deps(self):
return self._common_deps("lnet_up", None, None)
deps = self._common_deps()

if self.target_object.lnet_configuration and not self.target_object.immutable_state:
deps.append(DependOn(self.target_object.lnet_configuration, "lnet_up"))

return DependAll(deps)

def description(self):
return help_text["setup_worker_host_on"] % self.target_object

@classmethod
def can_run(cls, host):
return host.is_managed and host.is_worker and (host.state != "unconfigured")
return host.is_worker and (host.state != "unconfigured")


class DetectTargetsStep(Step):
Expand Down Expand Up @@ -1174,6 +1174,9 @@ def description(self):
def get_deps(self):
deps = []

if self.host.immutable_state:
return DependAll(deps)

if self.host.lnet_configuration:
deps.append(DependOn(self.host.lnet_configuration, "unconfigured"))

Expand Down
2 changes: 1 addition & 1 deletion chroma_core/models/stratagem.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ def get_steps(self):
client_host = ManagedHost.objects.get(
Q(server_profile_id="stratagem_client") | Q(server_profile_id="stratagem_existing_client")
)
client_mount = LustreClientMount.objects.get(host_id=client_host.id, filesystem=self.filesystem.name)
client_mount = LustreClientMount.objects.get(host_id=client_host.id, filesystem=filesystem.name)

return [
(
Expand Down
13 changes: 9 additions & 4 deletions chroma_core/services/job_scheduler/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,14 +1106,16 @@ def key(td):
# if we have an entry with 'root'=true then move it to the front of the list before returning the result
return sorted(sorted_list, key=lambda entry: entry.get("root", False), reverse=True)

def create_client_mount(self, host_id, filesystem_name, mountpoint):
def create_client_mount(self, host_id, filesystem_name, mountpoint, existing):
# RPC-callable
host = ObjectCache.get_one(ManagedHost, lambda mh: mh.id == host_id)
mount = self._create_client_mount(host, filesystem_name, mountpoint)

mount = self._create_client_mount(host, filesystem_name, mountpoint, existing)

self.progress.advance()
return mount.id

def _create_client_mount(self, host, filesystem_name, mountpoint):
def _create_client_mount(self, host, filesystem_name, mountpoint, existing=False):
# Used for intra-JobScheduler calls
log.debug("Creating client mount for %s as %s:%s" % (filesystem_name, host, mountpoint))

Expand All @@ -1122,6 +1124,9 @@ def _create_client_mount(self, host, filesystem_name, mountpoint):

with transaction.atomic():
mount, created = LustreClientMount.objects.get_or_create(host=host, filesystem=filesystem_name)
if existing:
mount.state = "mounted"

mount.mountpoint = mountpoint
mount.save()

Expand Down Expand Up @@ -1889,7 +1894,7 @@ def run_stratagem(self, mdts, fs_id, stratagem_data):

mountpoint = "/mnt/{}".format(filesystem.name)
if not client_mount_exists:
self._create_client_mount(client_host, filesystem, mountpoint)
self._create_client_mount(client_host, filesystem.name, mountpoint)

client_mount = ObjectCache.get_one(
LustreClientMount, lambda mnt: mnt.host_id == client_host.id and mnt.filesystem == filesystem.name
Expand Down
4 changes: 2 additions & 2 deletions chroma_core/services/job_scheduler/job_scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ def create_targets(cls, targets_data):
return (list(ManagedTarget.objects.filter(id__in=target_ids)), Command.objects.get(pk=command_id))

@classmethod
def create_client_mount(cls, host, filesystem_name, mountpoint):
def create_client_mount(cls, host, filesystem_name, mountpoint, existing):
from chroma_core.models import LustreClientMount

client_mount_id = JobSchedulerRpc().create_client_mount(host.id, filesystem_name, mountpoint)
client_mount_id = JobSchedulerRpc().create_client_mount(host.id, filesystem_name, mountpoint, existing)
return LustreClientMount.objects.get(id=client_mount_id)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion chroma_core/services/lustre_audit/update_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def update_client_mounts(self):
log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host))
except IndexError:
log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host))
JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"])
JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"], True)

def update_target_mounts(self):
# If mounts is None then nothing changed since the last update and so we can just return.
Expand Down
12 changes: 5 additions & 7 deletions iml-system-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,20 @@ pub const STRATAGEM_SERVER_PROFILE: &str = r#"{

pub const STRATAGEM_CLIENT_PROFILE: &str = r#"{
"ui_name": "Stratagem Client Node",
"managed": true,
"managed": false,
"worker": true,
"name": "stratagem_client",
"initial_state": "managed",
"ntp": true,
"initial_state": "monitored",
"ntp": false,
"corosync": false,
"corosync2": false,
"pacemaker": false,
"ui_description": "A client that can receive stratagem data",
"packages": [
"python2-iml-agent-management",
"lustre-client"
"python2-iml-agent-management"
],
"repolist": [
"base",
"lustre-client"
"base"
]
}
"#;
Expand Down