import subprocess
from vcstools.config import load_config_file
import logging
import socket
logger = logging.getLogger(__name__)
SLURM_TMPL = """{shebag}
#SBATCH --export={export}
#SBATCH --output={outfile}
{account}
#SBATCH --clusters={cluster}
#SBATCH --partition={partition}
#
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node={threads}
#SBATCH --mem-per-cpu={mem}MB
#SBATCH --nice={nice}
{header}
ncpus={threads}
export OMP_NUM_THREADS={threads}
{switches}
module use {module_dir}
{modules}
{script}
"""
# NOTE: --gid option removed after discussion in helpdesk ticket GS-9370
[docs]def submit_slurm(name, commands, tmpl=SLURM_TMPL, slurm_kwargs=None,
module_list=[], vcstools_version="master",
batch_dir="batch/", depend=None, depend_type='afterok',
submit=True, outfile=None, queue="cpuq", export="NONE",
gpu_res=None, mem=1024, cpu_threads=1, temp_mem=None,
nice=0, shebag='#!/bin/bash -l',
module_dir=None, load_vcstools=True):
"""Making this function to cleanly submit SLURM jobs using a simple template.
Parameters
----------
name : `str`
The base name that is used to create the "`name`.batch" and "`name`.out" files.
commands : `list`
Each item in the list is a line of the bash script commands you want to run.
tmpl : `str`, optional
A template header string with format place holders: export, outfile,
cluster, header and script.
This is used to create the final string to be written to the job script.
For this function, it is required to be SLURM compliant.
|br| Default: `SLURM_TMPL`
slurm_kwargs : `dict`, optional
A dictionary of SLURM keyword, value pairs to fill in whatever is not
in the template supplied to `tmpl`.
|br| Default: `{}` (empty dictionary, i.e. no additional header parameters).
module_list : `list`, optional
A list of module names (including versions if applicable) that will
be included in the header for the batch
scripts. e.g. ["vcstools/master", "mwa-voltage/master", "presto/master"] would append
|br| module load vcstools/master
|br| module load mwa-voltage/master
|br| module load presto/master
|br| to the header of the batch script. This can also invoke "module use ..." commands.
NOTE: /group/mwa/software/modulefiles is used and vcstools/master is loaded by default.
vcstools_version : `str`, optional
The version of vcstools to load. |br| Default: master.
batch_dir : `str`, optional
The directory where you want to write the batch scripts
|br| Default: "batch/". (i.e. it will write to `$PWD/batch_dir`).
depend : `list`, optional
A list of the SLURM job IDs that your would like this job to depend on.
If `None` then it is assumed there is no dependency on any other job.
|br| Default: `None`.
depend_type : `str`, optional
The type of slurm dependancy required. For example if you wanted the
job to run after the jobs have been terminated use 'afterany'.
|br| Default: "afterok".
submit : `boolean`, optional
Whether to write and submit the job scripts (`True`) or only write the scripts (`False`).
|br| Default: `True`.
outfile : `str`, optional
The output file name if "`name`.out" is not desirable.
|br| Default: `None` (i.e. "`batch_dir`/`name`.out")
queue : `str`, optional
The type of queue you require (cpuq, gpuq or copyq) then the script will
choose the correct partitions and clusters for the job to run on
Default: "cpuq"
export : `str`, optional
Switch that lets SLURM use your login environment on the compute
nodes ("ALL") or not ("NONE").
|br| Default: "None".
gpu_res : `int`, optional
Number of GPUs that the SLURM job will reserve.
|br| Default: "None".
mem : `int`, optional
The MB of ram required for your slurm job.
|br| Default: 8192.
cpu_threads : `int`, optional
The number of CPU threads required for your slurm job.
|br| Default: 1.
Returns
-------
jobid : `int`
The unique SLURM job ID associated with the submitted job.
"""
if slurm_kwargs is None:
slurm_kwargs={}
#Work out which partition and cluster to use based on the supercomputer
#(in config file) and queue required
comp_config = load_config_file()
if queue == 'cpuq':
cluster = comp_config['cpuq_cluster']
partition = comp_config['cpuq_partition']
elif queue == 'gpuq':
cluster = comp_config['gpuq_cluster']
partition = comp_config['gpuq_partition']
if gpu_res is None:
# No gpus reserved so change it to a default of 1
gpu_res = 1
elif queue == 'copyq':
cluster = comp_config['copyq_cluster']
partition = comp_config['copyq_partition']
elif queue == 'zcpuq':
# Download and checks should be done on Zeus's cpuq. This will only work
# on Galaxy as the Ozstar workflow is different
cluster = comp_config['zcpuq_cluster']
partition = comp_config['zcpuq_partition']
else:
logger.error("No queue found, please use cpuq, gpuq or copyq")
header = []
if batch_dir.endswith("/") is False:
batch_dir += "/"
# define file names (both the batch job file and the output file)
jobfile = batch_dir + name + ".batch"
if not outfile:
outfile = batch_dir + name + ".out"
# create the header from supplied arguments
for k,v in slurm_kwargs.items():
if len(k) > 1:
k = "--" + k + "="
else:
k = "-" + k + " "
header.append("#SBATCH {0}{1}".format(k, v))
# check if there are dependencies, and if so include that in the header
if depend is not None:
#assumes append is a list but if not will make an educated guess of how to reformat it
if isinstance(depend, int):
#assume it's ben given a single job id
header.append("#SBATCH --dependency={0}:{1}".format(depend_type, depend))
if isinstance(depend, str):
if ":" in depend:
#assume it has been given an already formated string
if depend.startswith(":"):
depend = depend[1:]
#or a single jobid
header.append("#SBATCH --dependency={0}:{1}".format(depend_type, depend))
if isinstance(depend, list):
depend_str = ""
for job_id in depend:
depend_str += ":" + str(job_id)
header.append("#SBATCH --dependency={0}{1}".format(depend_type, depend_str))
# add a gpu res to header
if gpu_res is not None:
header.append('#SBATCH --gres=gpu:{0}'.format(gpu_res))
# add temp SSD memory to combat I/O issues. Only availble on Ozstar
hostname = socket.gethostname()
if temp_mem is not None:
header.append("#SBATCH --tmp={0}GB".format(temp_mem))
if module_dir is None:
module_dir = comp_config['module_dir']
# now join the header into one string
header = "\n".join(header)
# construct the module loads
if load_vcstools:
modules = ["module load vcstools/{0}\n".format(vcstools_version)]
else:
modules = []
switches = []
for m in module_list:
if m == "vcstools":
# don't do anything as vcstools is loaded automatically
continue
if "module switch" in m:
# if a module switch command is included rather than just a module name, then add it to a separate list
switches.append(m)
elif "module" in m:
modules.append("{0}\n".format(m))
else:
modules.append("module load {0}\n".format(m))
# join the module loads and switches into a single string
switches = "\n".join(switches)
modules = "\n".join(modules)
# join the commands into a single string
commands = "\n".join(commands)
# some little hacks to make jobs work on the shanghai server
if hostname.startswith('x86') or hostname.startswith('arm'):
if vcstools_version == 'master':
vcstools_version = 'cpu-master'
if export == "NONE":
export = "ALL"
if shebag == "#!/bin/bash -l":
shebag = "#!/bin/bash"
# format the template script
tmpl = tmpl.format(shebag=shebag, script=commands, outfile=outfile, header=header,
switches=switches, modules=modules,
cluster=cluster, partition=partition,
export=export, account=comp_config['group_account'][queue],
module_dir=module_dir,
threads=cpu_threads, mem=mem, nice=nice)
# write the formatted template to the job file for submission
with open(jobfile, "w") as fh:
fh.write(tmpl)
# submit the jobs
batch_submit_line = "sbatch {0}".format(jobfile)
jobid = None
if submit:
submit_cmd = subprocess.Popen(batch_submit_line, shell=True, stdout=subprocess.PIPE)
for line in submit_cmd.stdout:
if b"Submitted" in line:
jobid = str(line.split(b" ")[3].decode())
if jobid is None:
logger.debug(batch_submit_line)
logger.debug(submit_cmd.stdout)
return
else:
return jobid
else:
return