Skip to content

Commit

Permalink
refactor how jobs are logged in the project manager
Browse files Browse the repository at this point in the history
  • Loading branch information
johrstrom committed Aug 20, 2024
1 parent 4b09edb commit d2dfb73
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 59 deletions.
2 changes: 1 addition & 1 deletion apps/dashboard/app/controllers/projects_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def job_details
cluster = OodAppkit.clusters[cluster_str.to_sym]
render(:status => 404) if cluster.nil?

hpc_job = project.job_from_id(job_details_params[:jobid].to_s, cluster_str)
hpc_job = project.job(job_details_params[:jobid].to_s, cluster_str)

render(partial: 'job_details', locals: { job: hpc_job })
end
Expand Down
48 changes: 48 additions & 0 deletions apps/dashboard/app/models/concerns/job_logger.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

module JobLogger

def upsert_job!(directory, job)
existing_jobs = jobs(directory)
stored_job = existing_jobs.detect { |j| j.id == job.id && j.cluster == job.cluster }

if stored_job.nil?
new_jobs = (existing_jobs + [job.to_h]).map(&:to_h)
else
new_jobs = existing_jobs.map(&:to_h)
idx = existing_jobs.index(stored_job)
new_jobs[idx].merge!(job.to_h) { |_key, old_val, new_val| new_val.nil? ? old_val : new_val }
end

JobLoggerHelper.write_log(directory, new_jobs)
end

# def write_job_log!(directory, jobs)
# JobLoggerHelper.write_log!(directory, jobs)
# endd

def jobs(directory)
file = JobLoggerHelper.log_file(directory)
begin
data = YAML.safe_load(File.read(file.to_s), permitted_classes: [Time]).to_a
data.map { |job_data| HpcJob.new(**job_data) }
rescue StandardError => e
Rails.logger.error("Cannot read job log file '#{file}' due to error: #{e}")
[]
end
end

# helper methods here are located here so that they don't
# bleed into the class that uses this module
class JobLoggerHelper
class << self
def log_file(directory)
Pathname.new("#{directory}/.ondemand/job_log.yml")
end

def write_log(directory, jobs)
file = log_file(directory)
File.write(file.to_s, jobs.to_yaml)
end
end
end
end
50 changes: 2 additions & 48 deletions apps/dashboard/app/models/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

class Launcher
include ActiveModel::Model
include JobLogger

class ClusterNotFound < StandardError; end

Expand Down Expand Up @@ -196,28 +197,6 @@ def submit(options)
nil
end

def active_jobs
@active_jobs ||= jobs.reject { |job| job.completed? }
end

def jobs
@jobs ||= begin
data = YAML.safe_load(File.read(job_log_file.to_s), permitted_classes: [Time]).to_a
data.map do |job_data|
HpcJob.new(**job_data)
end
end
end

# When a job is requested, update before returning
def job_from_id(job_id, cluster)
job = stored_job_from_id(job_id, cluster)
unless job.nil? || job.status.to_s == 'completed'
update_job_log(job_id, cluster)
end
stored_job_from_id(job_id, cluster)
end

def create_default_script
return false if Launcher.scripts?(project_dir) || default_script_path.exist?

Expand Down Expand Up @@ -310,37 +289,12 @@ def cached_values
end
end

def most_recent_job
jobs.sort_by do |data|
data['submit_time']
end.reverse.first.to_h
end

def stored_job_from_id(job_id, cluster)
jobs.detect { |j| j.id == job_id && j.cluster == cluster }
end

def update_job_log(job_id, cluster)
adapter = adapter(cluster).job_adapter
info = adapter.info(job_id)
job = HpcJob.from_core_info(info: info, cluster: cluster)
existing_jobs = jobs
stored_job = stored_job_from_id(job_id, cluster)
if stored_job.nil?
new_jobs = (jobs + [job.to_h]).map(&:to_h)
else
new_jobs = existing_jobs.map(&:to_h)
idx = existing_jobs.index(stored_job)
new_jobs[idx].merge!(job.to_h) { |key, old_val, new_val| new_val.nil? ? old_val : new_val }
end

File.write(job_log_file.to_s, new_jobs.to_yaml)
end

def job_log_file
@job_log_file ||= Pathname.new(File.join(Launcher.script_path(project_dir, id), "job_history.log")).tap do |path|
FileUtils.touch(path.to_s)
end
upsert_job!(project_dir, job)
end

def submit_opts(options, render_format)
Expand Down
25 changes: 15 additions & 10 deletions apps/dashboard/app/models/project.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Project
include ActiveModel::Validations
include ActiveModel::Validations::Callbacks
include IconWithUri
extend JobLogger

class << self
def lookup_file
Expand Down Expand Up @@ -197,18 +198,22 @@ def size
end

def jobs
launchers = Launcher.all(directory)
launchers.map do |launcher|
launcher.jobs
end.flatten
Project.jobs(directory)
end

def job_from_id(job_id, cluster)
launchers = Launcher.all(directory)
launchers.each do |launcher|
job = launcher.job_from_id(job_id, cluster)
return job unless job.nil?
end
def job(job_id, cluster)
cached_job = jobs.detect { |j| j.id == job_id && j.cluster == cluster }
return cached_job if cached_job.completed?

active_job = adapter(cluster).info(job_id)
active_job = HpcJob.new(**active_job.to_h)
Project.upsert_job!(directory, active_job)
active_job
end

def adapter(cluster_id)
cluster = OodAppkit.clusters[cluster_id] || raise(StandardError, "Job specifies nonexistent '#{cluster_id}' cluster id.")
cluster.job_adapter
end

def readme_path
Expand Down

0 comments on commit d2dfb73

Please sign in to comment.