Skip to content

Commit

Permalink
Merge pull request #1 from gvallee/topic/launcher
Browse files Browse the repository at this point in the history
Add the launcher
  • Loading branch information
gvallee authored Feb 1, 2021
2 parents 09ceaac + 57be4e1 commit 9dea2cd
Show file tree
Hide file tree
Showing 11 changed files with 503 additions and 106 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/gvallee/go_hpc_jobmgr
go 1.13

require (
github.com/gvallee/go_exec v0.0.1
github.com/gvallee/go_util v1.0.1
github.com/gvallee/go_exec v0.0.2
github.com/gvallee/go_util v1.1.0
)
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
github.com/gvallee/go_exec v0.0.1 h1:yNP5/fTWnnym4wT17JIEBiRAxOEhclOePdwxCxWsEZ8=
github.com/gvallee/go_exec v0.0.1/go.mod h1:4AwegK9oPhkgwkd0rjlTwxRw//8cW4pPcCSFLZ6+LZg=
github.com/gvallee/go_exec v0.0.2 h1:qxTlfCI5aiPmElCn+kCip++mBgKx6TNxmVOfh7lP/Ok=
github.com/gvallee/go_exec v0.0.2/go.mod h1:b8ZYmmYCaHo/g92ZHvYK8LnTp06sdTU3u2CjisIyA9w=
github.com/gvallee/go_software_build v0.0.8 h1:exJ+3V8V6oQHL+gdKYGUTXWotj9tSpRUBnIBTP4Se/s=
github.com/gvallee/go_software_build v0.0.8/go.mod h1:HaMNclnl0SI0qd/vY4zNiAhKH/U3ojXOBqYf3IEMa1A=
github.com/gvallee/go_util v1.0.0/go.mod h1:fTexpwdH/n05Ziu0TXJIQsr7E+46QpBxNdeOOsyC0/s=
github.com/gvallee/go_util v1.0.1 h1:Ch/PpAlHrHNmL2Upaxif/Nt4CqtaazDyTXh5fIhutJo=
github.com/gvallee/go_util v1.0.1/go.mod h1:fTexpwdH/n05Ziu0TXJIQsr7E+46QpBxNdeOOsyC0/s=
github.com/gvallee/go_util v1.1.0 h1:qSwjCPTejJ8zbhFmpJ3WQtzpNOHWQwc4ijFsTbCJK7s=
github.com/gvallee/go_util v1.1.0/go.mod h1:fTexpwdH/n05Ziu0TXJIQsr7E+46QpBxNdeOOsyC0/s=
github.com/gvallee/kv v1.0.0/go.mod h1:sfSclfFfLV+Y+9e9FayIbBUOtvbt1779S6q52bSSU5E=
github.com/sylabs/singularity-mpi v1.2.2/go.mod h1:drvCxAHw6GUmtBLG4luqkjHYmABUTlWbCk4Ff1GoiFc=
28 changes: 24 additions & 4 deletions internal/pkg/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type Job struct {
// ErrBuffer is a buffer with the stderr of the job
ErrBuffer bytes.Buffer

// GetOutput is the function to call to gather the output of the application based on the use of a given job manager
GetOutput GetOutputFn
// internalGetOutput is the function to call to gather the output of the application based on the use of a given job manager
internalGetOutput GetOutputFn

// GetError is the function to call to gather stderr of the application based on the use of a given job manager
GetError GetErrorFn
// internalGetError is the function to call to gather stderr of the application based on the use of a given job manager
internalGetError GetErrorFn

// Args is a set of arguments to be used for launching the job
Args []string
Expand All @@ -68,3 +68,23 @@ type Job struct {
// Partition is the name of the partition to use with the jobmgr (optional)
Partition string
}

// GetOutput is the function to call to gather the output (stdout) of the application after execution of the job
func (j *Job) GetOutput(sysCfg *sys.Config) string {
return j.internalGetOutput(j, sysCfg)
}

// GetError is the function to call to gather stderr of the application after execution of the job
func (j *Job) GetError(sysCfg *sys.Config) string {
return j.internalGetError(j, sysCfg)
}

// SetOutputFn sets the internal function specific to the job manager to get the output of a job
func (j *Job) SetOutputFn(fn GetOutputFn) {
j.internalGetOutput = fn
}

// SetErrorFn sets the internal function specific to the job manager to get stderr of a job
func (j *Job) SetErrorFn(fn GetErrorFn) {
j.internalGetError = fn
}
6 changes: 6 additions & 0 deletions internal/pkg/sys/sys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@

package sys

// Config represents the system configuration
type Config struct {
// ScratchDir is the path to a scratch directory on the system (most HPC systems have one)
ScratchDir string

// Persistent is the path to the directory where to installed software packages in the context of a persistent execution
Persistent string

// CurPath is the path to the current directory
CurPath string
}
31 changes: 21 additions & 10 deletions pkg/jm/jm.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2019, Sylabs Inc. All rights reserved.
// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
// This software is licensed under a 3-clause BSD license. Please consult the
// LICENSE.md file distributed with the sources of this project regarding your
// rights to use or distribute this software.
Expand Down Expand Up @@ -29,7 +30,9 @@ const (
PrunID = "prun"
)

// Environment represents the job's environment to use
type Environment struct {
// InstallDir is where software packages needed for the job are installed
InstallDir string

mpiBin string
Expand All @@ -41,20 +44,23 @@ type Loader interface {
}

// LoadFn loads a specific job manager once detected
type LoadFn func(*JM, *sys.Config) error
type LoadFn func(jobmgr *JM, sysCfg *sys.Config) error

// SubmitFn is a "function pointer" that lets us job a new job
type SubmitFn func(*job.Job, *sys.Config) (advexec.Advcmd, error)
type SubmitFn func(j *job.Job, jobmgr *JM, sysCfg *sys.Config) advexec.Result

// JM is the structure representing a specific JM
type JM struct {
// ID identifies which job manager has been detected on the system
ID string

Load LoadFn
loadJM LoadFn

// Submit is the function to submit a job through the current job manager
Submit SubmitFn
submitJM SubmitFn

BinPath string

CmdArgs []string
}

// Detect figures out which job manager must be used on the system and return a
Expand All @@ -80,11 +86,6 @@ func Detect() JM {
return comp
}

// Load is the function to use to load the JM component
func Load(jm *JM) error {
return nil
}

// TempFile creates a temporary file that is used to store a batch script
func TempFile(j *job.Job, sysCfg *sys.Config) error {
filePrefix := "sbash-" + j.Name
Expand Down Expand Up @@ -116,3 +117,13 @@ func TempFile(j *job.Job, sysCfg *sys.Config) error {

return nil
}

// Load sets data specific to the job managers that was previously detected
func (jobmgr *JM) Load(sysCfg *sys.Config) error {
return jobmgr.loadJM(jobmgr, sysCfg)
}

// Submit executes a job with a job manager that was previously detected and loaded
func (jobmgr *JM) Submit(j *job.Job, sysCfg *sys.Config) advexec.Result {
return jobmgr.submitJM(j, jobmgr, sysCfg)
}
32 changes: 20 additions & 12 deletions pkg/jm/jobmgr_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
type Native struct {
}

// NativeGetOutput retrieves the application's output after the completion of a job
func NativeGetOutput(j *job.Job, sysCfg *sys.Config) string {
// nativeGetOutput retrieves the application's output after the completion of a job
func nativeGetOutput(j *job.Job, sysCfg *sys.Config) string {
return j.OutBuffer.String()
}

// NativeGetError retrieves the error messages from an application after the completion of a job
func NativeGetError(j *job.Job, sysCfg *sys.Config) string {
// nativeGetError retrieves the error messages from an application after the completion of a job
func nativeGetError(j *job.Job, sysCfg *sys.Config) string {
return j.ErrBuffer.String()
}

Expand Down Expand Up @@ -65,23 +65,30 @@ func prepareStdSubmit(cmd *advexec.Advcmd, j *job.Job, env *Environment, sysCfg
return nil
}

// NativeSubmit is the function to call to submit a job through the native job manager
func NativeSubmit(j *job.Job, sysCfg *sys.Config) (advexec.Advcmd, error) {
// nativeSubmit is the function to call to submit a job through the native job manager
func nativeSubmit(j *job.Job, jobmgr *JM, sysCfg *sys.Config) advexec.Result {
var cmd advexec.Advcmd
var res advexec.Result

if j.App.BinPath == "" {
return cmd, fmt.Errorf("application binary is undefined")
res.Err = fmt.Errorf("application binary is undefined")
return res
}

err := prepareMPISubmit(&cmd, j, sysCfg)
if err != nil {
return cmd, fmt.Errorf("unable to prepare MPI job: %s", err)
res.Err = fmt.Errorf("unable to prepare MPI job: %s", err)
return res
}

j.GetOutput = NativeGetOutput
j.GetError = NativeGetError
j.SetOutputFn(nativeGetOutput)
j.SetErrorFn(nativeGetError)

return cmd, nil
return cmd.Run()
}

func nativeLoad(jobmgr *JM, sysCfg *sys.Config) error {
return nil
}

// NativeDetect is the function used by our job management framework to figure out if mpirun should be used directly.
Expand All @@ -90,7 +97,8 @@ func NativeSubmit(j *job.Job, sysCfg *sys.Config) (advexec.Advcmd, error) {
func NativeDetect() (bool, JM) {
var jm JM
jm.ID = NativeID
jm.Submit = NativeSubmit
jm.submitJM = nativeSubmit
jm.loadJM = nativeLoad

// This is the default job manager, i.e., mpirun so we do not check anything, just return this component.
// If the component is selected and mpirun not correctly installed, the framework will pick it up later.
Expand Down
26 changes: 14 additions & 12 deletions pkg/jm/jobmgr_prun.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,31 @@ import (
type Prun struct {
}

// PrunGetOutput retrieves the application's output after the completion of a job
func PrunGetOutput(j *job.Job, sysCfg *sys.Config) string {
// prunGetOutput retrieves the application's output after the completion of a job
func prunGetOutput(j *job.Job, sysCfg *sys.Config) string {
return j.OutBuffer.String()
}

// PrunGetError retrieves the error messages from an application after the completion of a job
func PrunGetError(j *job.Job, sysCfg *sys.Config) string {
// prunGetError retrieves the error messages from an application after the completion of a job
func prunGetError(j *job.Job, sysCfg *sys.Config) string {
return j.ErrBuffer.String()
}

// PrunSubmit is the function to call to submit a job through the native job manager
func PrunSubmit(j *job.Job, sysCfg *sys.Config) (advexec.Advcmd, error) {
func PrunSubmit(j *job.Job, jobmgr *JM, sysCfg *sys.Config) advexec.Result {
var cmd advexec.Advcmd
var res advexec.Result
var err error

if j.App.BinPath == "" {
return cmd, fmt.Errorf("application binary is undefined")
res.Err = fmt.Errorf("application binary is undefined")
return res
}

cmd.BinPath, err = exec.LookPath("prun")
if err != nil {
return cmd, fmt.Errorf("prun not found")
res.Err = fmt.Errorf("prun not found")
return res
}

for _, a := range j.Args {
Expand All @@ -60,10 +63,9 @@ func PrunSubmit(j *job.Job, sysCfg *sys.Config) (advexec.Advcmd, error) {
//cmd.Env = append([]string{"LD_LIBRARY_PATH=" + newLDPath}, os.Environ()...)
//cmd.Env = append([]string{"PATH=" + newPath}, sycmd.Env...)

j.GetOutput = PrunGetOutput
j.GetError = PrunGetError

return cmd, nil
j.SetOutputFn(prunGetOutput)
j.SetErrorFn(prunGetError)
return cmd.Run()
}

// PrunDetect is the function used by our job management framework to figure out if mpirun should be used directly.
Expand All @@ -79,7 +81,7 @@ func PrunDetect() (bool, JM) {
}

jm.ID = PrunID
jm.Submit = PrunSubmit
jm.submitJM = PrunSubmit

// This is the default job manager, i.e., mpirun so we do not check anything, just return this component.
// If the component is selected and mpirun not correctly installed, the framework will pick it up later.
Expand Down
Loading

0 comments on commit 9dea2cd

Please sign in to comment.