Source code for gridmap.job

# -*- coding: utf-8 -*-

# Written (W) 2008-2012 Christian Widmer
# Written (W) 2008-2010 Cheng Soon Ong
# Written (W) 2012-2014 Daniel Blanchard, dblanchard@ets.org
# Copyright (C) 2008-2012 Max-Planck-Society, 2012-2014 ETS

# This file is part of GridMap.

# GridMap is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# GridMap is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with GridMap.  If not, see <http://www.gnu.org/licenses/>.

"""
This module provides wrappers that simplify submission and collection of jobs,
in a more 'pythonic' fashion.

We use pyZMQ to provide a heart beat feature that allows close monitoring
of submitted jobs and take appropriate action in case of failure.

:author: Christian Widmer
:author: Cheng Soon Ong
:author: Dan Blanchard (dblanchard@ets.org)

"""

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import inspect
import logging
import multiprocessing
import os
import smtplib
import sys
import traceback
import functools
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from io import open
from importlib import import_module
from multiprocessing import Pool
from socket import gethostname, gethostbyname, getaddrinfo, getfqdn
from smtplib import (SMTPRecipientsRefused, SMTPHeloError, SMTPSenderRefused,
                     SMTPDataError)

import zmq

from gridmap.conf import (CHECK_FREQUENCY, CREATE_PLOTS, DEFAULT_QUEUE,
                          DRMAA_PRESENT, ERROR_MAIL_RECIPIENT,
                          ERROR_MAIL_SENDER, HEARTBEAT_FREQUENCY,
                          IDLE_THRESHOLD, MAX_IDLE_HEARTBEATS,
                          MAX_TIME_BETWEEN_HEARTBEATS, NUM_RESUBMITS,
                          SEND_ERROR_MAIL, SMTP_SERVER, USE_MEM_FREE,
                          DEFAULT_TEMP_DIR, DEFAULT_PAR_ENV)
from gridmap.data import zdumps, zloads
from gridmap.runner import _heart_beat


[docs]class DRMAANotPresentException(ImportError): pass
if DRMAA_PRESENT: from drmaa import (ExitTimeoutException, InvalidJobException, JobControlAction, JOB_IDS_SESSION_ALL, Session, TIMEOUT_NO_WAIT) # Python 2.x backward compatibility if sys.version_info < (3, 0): range = xrange # Setup back-end if we're using matplotlib if CREATE_PLOTS: import matplotlib matplotlib.use('AGG') import matplotlib.pyplot as plt # Placeholder string, since a job could potentially return None on purpose _JOB_NOT_FINISHED = '*@#%$*@#___GRIDMAP___NOT___DONE___@#%**#*$&*%'
[docs]class JobException(Exception): ''' New exception type for when one of the jobs crashed. ''' pass
[docs]class Job(object): """ Central entity that wraps a function and its data. Basically, a job consists of a function, its argument list, its keyword list and a field "ret" which is filled, when the execute method gets called. .. note:: This can only be used to wrap picklable functions (i.e., those that are defined at the module or class level). """ __slots__ = ('function', 'args', 'id', 'kwlist', 'cleanup', 'ret', 'traceback', 'num_slots', 'mem_free', 'white_list', 'path', 'uniq_id', 'name', 'queue', 'environment', 'working_dir', 'cause_of_death', 'num_resubmits', 'home_address', 'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name', 'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell', 'copy_env', 'par_env', 'gpu', 'h_vmem', 'h_rt', 'resources') def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE, interpreting_shell=None, copy_env=True, add_env=None, par_env=DEFAULT_PAR_ENV, gpu=0, h_vmem=None, h_rt=None, resources=None): """ Initializes a new Job. :param f: a function, which should be executed. :type f: function :param args: argument list of function f :type args: list :param kwlist: dictionary of keyword arguments for f :type kwlist: dict :param cleanup: flag that determines the cleanup of input and log file :type cleanup: boolean :param mem_free: Estimate of how much memory this job will need (for scheduling) :type mem_free: str :param name: Name to give this job :type name: str :param num_slots: Number of slots this job should use. :type num_slots: int :param queue: SGE queue to schedule job on. :type queue: str :param interpreting_shell: The interpreting shell for the job :type interpreting_shell: str :param copy_env: copy environment from master node to worker node? :type copy_env: boolean :param add_env: Environment variables to add to the environment. Overwrites variables which already exist due to ``copy_env=True``. :type add_env: dict :param par_env: parallel environment to use. :type par_env: str :param gpu: number of GPUs to request :type gpu: int :param h_vmem: hard virtual memory limit (e.g. "4G") :type h_vmem: str, optional :param h_rt: hard runtime limit (e.g. "00:59:00") :type h_rt: str, optional :param resources: list of additional custom resources specifications :type resources: list of str, optional """ self.track_mem = [] self.track_cpu = [] self.heart_beat = None self.traceback = None self.host_name = '' self.timestamp = None self.log_stdout_fn = '' self.log_stderr_fn = '' self.home_address = '' self.num_resubmits = 0 self.cause_of_death = '' self.path = None self.function = f self.args = args self.id = -1 self.kwlist = kwlist if kwlist is not None else {} self.cleanup = cleanup self.ret = _JOB_NOT_FINISHED self.num_slots = num_slots self.mem_free = mem_free self.white_list = [] self.name = name.replace(' ', '_') self.queue = queue self.interpreting_shell = interpreting_shell self.copy_env = copy_env # Save copy of environment variables self.environment = {} def _add_env(env_vars): for env_var, value in env_vars.items(): try: if not isinstance(env_var, bytes): env_var = env_var.encode() if not isinstance(value, bytes): value = value.encode() except UnicodeEncodeError: logger = logging.getLogger(__name__) logger.warning('Skipping non-ASCII environment variable.') else: self.environment[env_var] = value if self.copy_env: _add_env(os.environ) if add_env is not None: _add_env(add_env) self.working_dir = os.getcwd() self.par_env = par_env self.gpu = gpu self.h_vmem = h_vmem self.h_rt = h_rt self.resources = resources
[docs] def execute(self): """ Executes function f with given arguments and writes return value to field ret. If an exception is encountered during execution, ret will contain a pickled version of it. Input data is removed after execution to save space. """ try: self.ret = self.function(*self.args, **self.kwlist) except Exception as exception: self.ret = exception self.traceback = traceback.format_exc() traceback.print_exc()
@property def native_specification(self): """ define python-style getter """ ret = "-shell yes" if self.interpreting_shell: ret += " -S {}".format(self.interpreting_shell) ret += " -b yes" if self.mem_free and USE_MEM_FREE: ret += " -l mem_free={}".format(self.mem_free) if self.num_slots and self.num_slots > 1: ret += " -pe {} {}".format(self.par_env, self.num_slots) if self.white_list: ret += " -l h={}".format('|'.join(self.white_list)) if self.queue: ret += " -q {}".format(self.queue) if self.gpu: ret += " -l gpu={}".format(self.gpu) if self.h_vmem: ret += " -l h_vmem={}".format(self.h_vmem) if self.h_rt: ret += " -l h_rt={}".format(self.h_rt) if self.resources: ret += "".join([" -l {}".format(x) for x in self.resources]) return ret
############################### # Job Submission and Monitoring ###############################
[docs]class JobMonitor(object): """ Job monitor that communicates with other nodes via 0MQ. """ def __init__(self, temp_dir=DEFAULT_TEMP_DIR): """ set up socket """ self.logger = logging.getLogger(__name__) context = zmq.Context() self.temp_dir = temp_dir self.socket = context.socket(zmq.REP) self.host_name = gethostname() self.ip_address = gethostbyname(self.host_name) for _, _, _, _, (ip, _) in getaddrinfo(getfqdn(), 0): if ip != '127.0.0.1': self.ip_address = ip self.interface = "tcp://%s" % (self.ip_address) break else: self.logger.warning('IP address for JobMonitor server is ' '127.0.0.1. Runners on other machines will be' ' unable to connect.') self.ip_address = '127.0.0.1' self.interface = "tcp://%s" % (self.ip_address) # bind to random port and remember it self.port = self.socket.bind_to_random_port(self.interface) self.home_address = "%s:%i" % (self.interface, self.port) self.logger.info("Setting up JobMonitor on %s", self.home_address) # uninitialized field (set in check method) self.jobs = [] self.ids = [] self.session_id = None self.id_to_job = {} def __enter__(self): ''' Enable JobMonitor to be used as a context manager. ''' return self def __exit__(self, exc_type, exc_value, exc_tb): ''' Gracefully handle exceptions by terminating all jobs, and closing sockets. ''' # Always close socket self.socket.close() # Clean up if we have a valid session if self.session_id is not None: with Session(self.session_id) as session: # If we encounter an exception, kill all jobs if exc_type is not None: self.logger.info('Encountered %s, so killing all jobs.', exc_type.__name__) # try to kill off all old jobs try: session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) except InvalidJobException: self.logger.debug("Could not kill all jobs for " + "session.", exc_info=True) # Get rid of job info to prevent memory leak try: session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, dispose=True) except ExitTimeoutException: pass
[docs] def check(self, session_id, jobs): """ serves input and output data """ # save list of jobs self.jobs = jobs self.id_to_job = {job.id: job for job in self.jobs} # keep track of DRMAA session_id (for resubmissions) self.session_id = session_id # determines in which interval to check if jobs are alive self.logger.debug('Starting local hearbeat') local_heart = multiprocessing.Process(target=_heart_beat, args=(-1, self.home_address, -1, "", CHECK_FREQUENCY)) local_heart.start() try: self.logger.debug("Starting ZMQ event loop") # main loop while not self.all_jobs_done(): self.logger.debug('Waiting for message') msg_str = self.socket.recv() msg = zloads(msg_str) self.logger.debug('Received message: %s', msg) return_msg = "" job_id = msg["job_id"] # only if its not the local beat if job_id != -1: # If message is from a valid job, process that message if job_id in self.id_to_job: job = self.id_to_job[job_id] if msg["command"] == "fetch_input": return_msg = self.id_to_job[job_id] job.timestamp = datetime.now() self.logger.debug("Received input request from %s", job_id) if msg["command"] == "store_output": # be nice return_msg = "thanks" # store tmp job object if isinstance(msg["data"], Job): tmp_job = msg["data"] # copy relevant fields job.ret = tmp_job.ret job.traceback = tmp_job.traceback self.logger.info("Received output from %s", job_id) # Returned exception instead of job, so store that elif isinstance(msg["data"], tuple): job.ret, job.traceback = msg["data"] self.logger.info("Received exception from %s", job_id) else: self.logger.error(("Received message with " + "invalid data: %s"), msg) job.ret = msg["data"] job.timestamp = datetime.now() if msg["command"] == "heart_beat": job.heart_beat = msg["data"] # keep track of mem and cpu try: job.track_mem.append(job.heart_beat["memory"]) job.track_cpu.append(job.heart_beat["cpu_load"]) except (ValueError, TypeError): self.logger.error("Error decoding heart-beat", exc_info=True) return_msg = "all good" job.timestamp = datetime.now() if msg["command"] == "get_job": # serve job for display return_msg = job else: # update host name job.host_name = msg["host_name"] # If this is an unknown job, report it and reply else: self.logger.error(('Received message from unknown job' + ' with ID %s. Known job IDs are: ' + '%s'), job_id, list(self.id_to_job.keys())) return_msg = 'thanks, but no thanks' else: # run check self.check_if_alive() if msg["command"] == "get_jobs": # serve list of jobs for display return_msg = self.jobs # send back compressed response self.logger.debug('Sending reply: %s', return_msg) self.socket.send(zdumps(return_msg)) finally: # Kill child processes that we don't need anymore local_heart.terminate()
[docs] def check_if_alive(self): """ check if jobs are alive and determine cause of death if not """ self.logger.debug('Checking if jobs are alive') for job in self.jobs: # noting was returned yet if job.ret == _JOB_NOT_FINISHED: # exclude first-timers if job.timestamp is not None: # check heart-beats if there was a long delay current_time = datetime.now() time_delta = current_time - job.timestamp if time_delta.seconds > MAX_TIME_BETWEEN_HEARTBEATS: self.logger.debug("It has been %s seconds since we " + "received a message from job %s", time_delta.seconds, job.id) self.logger.error("Job died for unknown reason") job.cause_of_death = "unknown" elif (len(job.track_cpu) > MAX_IDLE_HEARTBEATS and all(cpu_load <= IDLE_THRESHOLD and not running for cpu_load, running in job.track_cpu[-MAX_IDLE_HEARTBEATS:])): self.logger.error('Job stalled for unknown reason.') job.cause_of_death = 'stalled' # could have been an exception, we check right away elif isinstance(job.ret, Exception): job.cause_of_death = 'exception' # Send error email, in addition to raising and logging exception if SEND_ERROR_MAIL: send_error_mail(job) # Format traceback much like joblib does self.logger.error("-" * 80) self.logger.error("GridMap job traceback for %s:", job.name) self.logger.error("-" * 80) self.logger.error("Exception: %s", type(job.ret).__name__) self.logger.error("Job ID: %s", job.id) self.logger.error("Host: %s", job.host_name) self.logger.error("." * 80) self.logger.error(job.traceback) raise job.ret # attempt to resubmit if job.cause_of_death: self.logger.info("Creating error report") # send report if SEND_ERROR_MAIL: send_error_mail(job) # try to resubmit old_id = job.id job.track_cpu = [] job.track_mem = [] handle_resubmit(self.session_id, job, temp_dir=self.temp_dir) # Update job ID if successfully resubmitted self.logger.info('Resubmitted job %s; it now has ID %s', old_id, job.id) del self.id_to_job[old_id] self.id_to_job[job.id] = job # break out of loop to avoid too long delay break
[docs] def all_jobs_done(self): """ checks for all jobs if they are done """ if self.logger.getEffectiveLevel() == logging.DEBUG: num_jobs = len(self.jobs) num_completed = sum((job.ret != _JOB_NOT_FINISHED and not isinstance(job.ret, Exception)) for job in self.jobs) self.logger.debug('%i out of %i jobs completed', num_completed, num_jobs) # exceptions will be handled in check_if_alive return all((job.ret != _JOB_NOT_FINISHED and not isinstance(job.ret, Exception)) for job in self.jobs)
def _send_mail(subject, body_text, attachments=None): """ Send out job status email This is a helper function for send_error_mail and send_completion_mail """ logger = logging.getLogger(__name__) # Connect to server try: s = smtplib.SMTP(SMTP_SERVER) except smtplib.SMTPConnectError: logger.error('Failed to connect to SMTP server to send status ' + 'email.', exc_info=True) return # create message msg = MIMEMultipart() msg["subject"] = subject msg["From"] = ERROR_MAIL_SENDER msg["To"] = ERROR_MAIL_RECIPIENT logger.info('Email body: %s', body_text) body_msg = MIMEText(body_text) msg.attach(body_msg) if attachments is not None: for attachment in attachments: msg.attach(attachment) # Send mail try: s.sendmail(ERROR_MAIL_SENDER, ERROR_MAIL_RECIPIENT, msg.as_string()) except (SMTPRecipientsRefused, SMTPHeloError, SMTPSenderRefused, SMTPDataError): logger.error('Failed to send status email.', exc_info=True) s.quit()
[docs]def send_error_mail(job): """ send out diagnostic email """ logger = logging.getLogger(__name__) # create message subject = "GridMap error {}".format(job.name) attachments = list() # compose error message body_text = "" body_text += "Job {}\n".format(job.name) body_text += "Last timestamp: {}\n".format(job.timestamp) body_text += "Resubmissions: {}\n".format(job.num_resubmits) body_text += "Cause of death: {}\n".format(job.cause_of_death) if job.heart_beat: body_text += "Last memory usage: {}\n".format(job.heart_beat["memory"]) body_text += "Last cpu load: {}\n".format(job.heart_beat["cpu_load"][0]) body_text += ("Process was running at last check: " + "{}\n\n").format(job.heart_beat["cpu_load"][1]) body_text += "Host: {}\n\n".format(job.host_name) if isinstance(job.ret, Exception): body_text += "Job encountered exception: {}\n".format(job.ret) body_text += "Stacktrace: {}\n\n".format(job.traceback) # attach log file if job.heart_beat and "log_file" in job.heart_beat: log_file_attachement = MIMEText(job.heart_beat['log_file']) log_file_attachement.add_header('Content-Disposition', 'attachment', filename='{}_log.txt'.format(job.id)) attachments.append(log_file_attachement) # if matplotlib is installed if CREATE_PLOTS: #TODO: plot to cstring directly (some code is there) #imgData = cStringIO.StringIO() #plt.savefig(imgData, format='png') # rewind the data #imgData.seek(0) #plt.savefig(imgData, format="png") time = [HEARTBEAT_FREQUENCY * i for i in range(len(job.track_mem))] # attack mem plot img_mem_fn = os.path.join('/tmp', "{}_mem.png".format(job.id)) plt.figure(1) plt.plot(time, job.track_mem, "-o") plt.xlabel("time (s)") plt.ylabel("memory usage") plt.savefig(img_mem_fn) plt.close() with open(img_mem_fn, "rb") as img_mem: img_data = img_mem.read() img_mem_attachement = MIMEImage(img_data) img_mem_attachement.add_header('Content-Disposition', 'attachment', filename=os.path.basename(img_mem_fn)) attachments.append(img_mem_attachement) # attach cpu plot img_cpu_fn = os.path.join("/tmp", "{}_cpu.png".format(job.id)) plt.figure(2) plt.plot(time, [cpu_load for cpu_load, _ in job.track_cpu], "-o") plt.xlabel("time (s)") plt.ylabel("cpu load") plt.savefig(img_cpu_fn) plt.close() with open(img_cpu_fn, "rb") as img_cpu: img_data = img_cpu.read() img_cpu_attachement = MIMEImage(img_data) img_cpu_attachement.add_header('Content-Disposition', 'attachment', filename=os.path.basename(img_cpu_fn)) attachments.append(img_cpu_attachement) # Send mail _send_mail(subject, body_text, attachments) # Clean up plot temporary files if CREATE_PLOTS: os.unlink(img_cpu_fn) os.unlink(img_mem_fn)
[docs]def handle_resubmit(session_id, job, temp_dir=DEFAULT_TEMP_DIR): """ heuristic to determine if the job should be resubmitted side-effect: job.num_resubmits incremented job.id set to new ID """ # reset some fields job.timestamp = None job.heart_beat = None if job.num_resubmits < NUM_RESUBMITS: logger = logging.getLogger(__name__) logger.warning("Looks like job %s (%s) died an unnatural death, " + "resubmitting (previous resubmits = %i)", job.name, job.id, job.num_resubmits) # remove node from white_list node_name = '{}@{}'.format(job.queue, job.host_name) if job.white_list and node_name in job.white_list: job.white_list.remove(node_name) # increment number of resubmits job.num_resubmits += 1 job.cause_of_death = "" _resubmit(session_id, job, temp_dir) else: raise JobException(("Job {0} ({1}) failed after {2} " + "resubmissions").format(job.name, job.id, NUM_RESUBMITS))
def _execute(job): """ Cannot pickle method instances, so fake a function. Used by _process_jobs_locally """ job.execute() return job.ret def _process_jobs_locally(jobs, max_processes=1): """ Local execution using the package multiprocessing, if present :param jobs: jobs to be executed :type jobs: list of Job :param max_processes: maximal number of processes :type max_processes: int :return: list of jobs, each with return in job.ret :rtype: list of Job """ logger = logging.getLogger(__name__) logger.info("using %i processes", max_processes) if max_processes == 1: # perform sequential computation for job in jobs: job.execute() else: pool = Pool(max_processes) result = pool.map(_execute, jobs) for ret_val, job in zip(result, jobs): job.ret = ret_val pool.close() pool.join() return jobs def _submit_jobs(jobs, home_address, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True): """ Method used to send a list of jobs onto the cluster. :param jobs: list of jobs to be executed :type jobs: list of `Job` :param home_address: Full address (including IP and port) of JobMonitor on submitting host. Running jobs will communicate with the parent process at that address via ZMQ. :type home_address: str :param temp_dir: Local temporary directory for storing output for an individual job. :type temp_dir: str :param white_list: List of acceptable nodes to use for scheduling job. If None, all are used. :type white_list: list of str :param quiet: When true, do not output information about the jobs that have been submitted. :type quiet: bool :returns: Session ID """ with Session() as session: for job in jobs: # set job white list job.white_list = white_list # remember address of submission host job.home_address = home_address # append jobs _append_job_to_session(session, job, temp_dir=temp_dir, quiet=quiet) sid = session.contact return sid def _append_job_to_session(session, job, temp_dir=DEFAULT_TEMP_DIR, quiet=True): """ For an active session, append new job based on information stored in job object. Also sets job.id to the ID of the job on the grid. :param session: The current DRMAA session with the grid engine. :type session: Session :param job: The Job to add to the queue. :type job: `Job` :param temp_dir: Local temporary directory for storing output for an individual job. :type temp_dir: str :param quiet: When true, do not output information about the jobs that have been submitted. :type quiet: bool """ jt = session.createJobTemplate() logger = logging.getLogger(__name__) # logger.debug('{}'.format(job.environment)) jt.jobEnvironment = job.environment # Run module using python -m to avoid ImportErrors when unpickling jobs jt.remoteCommand = sys.executable jt.args = ['-m', 'gridmap.runner', '{}'.format(job.home_address), job.path] jt.nativeSpecification = job.native_specification jt.jobName = job.name jt.workingDirectory = job.working_dir jt.outputPath = ":{}".format(temp_dir) jt.errorPath = ":{}".format(temp_dir) # Create temp directory if necessary if not os.path.exists(temp_dir): try: os.makedirs(temp_dir) except OSError: logger.warning(("Failed to create temporary directory " + "{}. Your jobs may not start " + "correctly.").format(temp_dir)) job_id = session.runJob(jt) # set job fields that depend on the job_id assigned by grid engine job.id = job_id job.log_stdout_fn = os.path.join(temp_dir, '{}.o{}'.format(job.name, job_id)) job.log_stderr_fn = os.path.join(temp_dir, '{}.e{}'.format(job.name, job_id)) if not quiet: print('Your job {} has been submitted with id {}'.format(job.name, job_id), file=sys.stderr) session.deleteJobTemplate(jt)
[docs]def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, max_processes=1, local=False, require_cluster=False): """ Take a list of jobs and process them on the cluster. :param jobs: Jobs to run. :type jobs: list of Job :param temp_dir: Local temporary directory for storing output for an individual job. :type temp_dir: str :param white_list: If specified, limit nodes used to only those in list. :type white_list: list of str :param quiet: When true, do not output information about the jobs that have been submitted. :type quiet: bool :param max_processes: The maximum number of concurrent processes to use if processing jobs locally. :type max_processes: int :param local: Should we execute the jobs locally in separate processes instead of on the the cluster? :type local: bool :param require_cluster: Should we raise an exception if access to cluster is not available? :type require_cluster: bool :returns: List of Job results """ if (not local and not DRMAA_PRESENT): logger = logging.getLogger(__name__) if require_cluster: raise DRMAANotPresentException( 'Could not import drmaa, but cluster access required.' ) logger.warning('Could not import drmaa. Processing jobs locally.') local = True if not local: # initialize monitor to get port number with JobMonitor(temp_dir=temp_dir) as monitor: # get interface and port home_address = monitor.home_address # job_id field is attached to each job object sid = _submit_jobs(jobs, home_address, temp_dir=temp_dir, white_list=white_list, quiet=quiet) # handling of inputs, outputs and heartbeats monitor.check(sid, jobs) else: _process_jobs_locally(jobs, max_processes=max_processes) return [job.ret for job in jobs]
def _resubmit(session_id, job, temp_dir): """ Resubmit a failed job. :returns: ID of new job """ logger = logging.getLogger(__name__) logger.info("starting resubmission process") if DRMAA_PRESENT: # append to session with Session(session_id) as session: # try to kill off old job try: session.control(job.id, JobControlAction.TERMINATE) logger.info("zombie job killed") except Exception: logger.error("Could not kill job with SGE id %s", job.id, exc_info=True) # create new job _append_job_to_session(session, job, temp_dir=temp_dir) else: logger.error("Could not restart job because we're in local mode.") ##################### # MapReduce Interface #####################
[docs]def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', num_slots=1, temp_dir=DEFAULT_TEMP_DIR, white_list=None, queue=DEFAULT_QUEUE, quiet=True, local=False, max_processes=1, interpreting_shell=None, copy_env=True, add_env=None, gpu=0, h_vmem=None, h_rt=None, resources=None, completion_mail=False, require_cluster=False, par_env=DEFAULT_PAR_ENV): """ Maps a function onto the cluster. .. note:: This can only be used with picklable functions (i.e., those that are defined at the module or class level). :param f: The function to map on args_list :type f: function :param args_list: List of arguments to pass to f :type args_list: list :param cleanup: Should we remove the stdout and stderr temporary files for each job when we're done? (They are left in place if there's an error.) :type cleanup: bool :param mem_free: Estimate of how much memory each job will need (for scheduling). (Not currently used, because our cluster does not have that setting enabled.) :type mem_free: str :param name: Base name to give each job (will have a number add to end) :type name: str :param num_slots: Number of slots each job should use. :type num_slots: int :param temp_dir: Local temporary directory for storing output for an individual job. :type temp_dir: str :param white_list: If specified, limit nodes used to only those in list. :type white_list: list of str :param queue: The SGE queue to use for scheduling. :type queue: str :param quiet: When true, do not output information about the jobs that have been submitted. :type quiet: bool :param local: Should we execute the jobs locally in separate processes instead of on the the cluster? :type local: bool :param max_processes: The maximum number of concurrent processes to use if processing jobs locally. :type max_processes: int :param interpreting_shell: The interpreting shell for the jobs. :type interpreting_shell: str :param copy_env: copy environment from master node to worker node? :type copy_env: boolean :param add_env: Environment variables to add to the environment. Overwrites variables which already exist due to ``copy_env=True``. :type add_env: dict :param gpu: number of GPUs to request :type gpu: int :param h_vmem: hard virtual memory limit (e.g. "4G") :type h_vmem: str, optional :param h_rt: hard runtime limit (e.g. "00:59:00") :type h_rt: str, optional :param resources: list of additional custom resources specifications :type resources: list of str, optional :param par_env: parallel environment to use. :type par_env: str :param completion_mail: whether to send an e-mail upon completion of all jobs :type completion_mail: boolean :param require_cluster: Should we raise an exception if access to cluster is not available? :type require_cluster: bool :returns: List of Job results """ # construct jobs jobs = [Job(f, [args] if not isinstance(args, list) else args, cleanup=cleanup, mem_free=mem_free, name='{}{}'.format(name, job_num), num_slots=num_slots, queue=queue, interpreting_shell=interpreting_shell, copy_env=copy_env, add_env=add_env, par_env=par_env, gpu=gpu, h_vmem=h_vmem, h_rt=h_rt, resources=resources) for job_num, args in enumerate(args_list)] # process jobs job_results = process_jobs(jobs, temp_dir=temp_dir, white_list=white_list, quiet=quiet, local=local, max_processes=max_processes, require_cluster=require_cluster) # send a completion mail (if requested and configured) if completion_mail and SEND_ERROR_MAIL: send_completion_mail(name=name) return job_results
[docs]def send_completion_mail(name): """ send out success email """ # create message subject = "GridMap completed grid_map {}".format(name) # compose error message body_text = "" body_text += "Job {}\n".format(name) # Send mail _send_mail(subject, body_text)