Source code for gwsumm.batch

# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
# This file is part of GWSumm.
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with GWSumm.  If not, see <>.

"""Pipeline generator for the Gravitational-wave interferometer
summary information system (`gwsumm`)

This module constructs a directed acyclic graph (DAG) that defines
a workflow to be submitted via the HTCondor scheduler.

import argparse
import os
import shutil
import sys

from glue import pipeline

from gwdatafind.utils import find_credential

from import kerberos as gwkerberos

from gwdetchar import cli

from . import __version__
from .utils import mkdir

__author__ = 'Duncan Macleod <>'
__credits__ = 'Alex Urban <>'

PROG = ('python -m gwsumm.batch' if sys.argv[0].endswith('.py')
        else os.path.basename(sys.argv[0]))

# -- utilities ----------------------------------------------------------------

[docs] class GWSummaryJob(pipeline.CondorDAGJob): """Job representing a configurable instance of gw_summary. """ logtag = '$(cluster)-$(process)' def __init__(self, universe, tag='gw_summary', subdir=None, logdir=None, **cmds): pipeline.CondorDAGJob.__init__(self, universe, sys.executable) if subdir: subdir = os.path.abspath(subdir) self.set_sub_file(os.path.join(subdir, '%s.sub' % (tag))) if logdir: logdir = os.path.abspath(logdir) self.set_log_file(os.path.join( logdir, '%s-%s.log' % (tag, self.logtag))) self.set_stderr_file(os.path.join( logdir, '%s-%s.err' % (tag, self.logtag))) self.set_stdout_file(os.path.join( logdir, '%s-%s.out' % (tag, self.logtag))) cmds.setdefault('getenv', 'True') for key, val in cmds.items(): if hasattr(self, 'set_%s' % key.lower()): getattr(self, 'set_%s' % key.lower())(val) else: self.add_condor_cmd(key, val) # add python module sub-command self._command = ' '.join(['-m', __package__])
[docs] def add_opt(self, opt, value=''): pipeline.CondorDAGJob.add_opt(self, opt, str(value))
add_opt.__doc__ = pipeline.CondorDAGJob.add_opt.__doc__
[docs] def set_command(self, command): self._command = ' '.join([ self._command, command, ])
[docs] def get_command(self): return self._command
[docs] def write_sub_file(self): pipeline.CondorDAGJob.write_sub_file(self) # insert positional arguments in the right place with open(self.get_sub_file(), 'r') as f: sub = sub = sub.replace( 'arguments = "', 'arguments = " {0}'.format(self.get_command()), ) with open(self.get_sub_file(), 'w') as f: f.write(sub)
[docs] class GWSummaryDAGNode(pipeline.CondorDAGNode):
[docs] def get_cmd_line(self): # merge positional arguments with options return ' '.join([ self.job().get_command(), pipeline.CondorDAGNode.get_cmd_line(self), ])
# -- parse command-line -------------------------------------------------------
[docs] class GWHelpFormatter(argparse.HelpFormatter): def __init__(self, *args, **kwargs): kwargs.setdefault('indent_increment', 4) super(GWHelpFormatter, self).__init__(*args, **kwargs)
[docs] def create_parser(): """Create a command-line parser for this entry point """ # initialize argument parser usage = ('%(prog)s --global-config defaults.ini --config-file ' 'myconfig.ini [--config-file myconfig2.ini] [options]') parser = argparse.ArgumentParser( prog=PROG, usage=usage, description=__doc__, formatter_class=GWHelpFormatter, ) bopts = parser.add_argument_group("Basic options") htcopts = parser.add_argument_group("Condor options") copts = parser.add_argument_group( "Configuration options", "Each --global-config file will be used in all nodes of the workflow, " "while a single node will be created for each other --config-file", ) popts = parser.add_argument_group( "Process options", "Configure how this summary will be processed.", ) outopts = parser.add_argument_group("Output options") topts = parser.add_argument_group( "Time mode options", "Choose a stadard time mode, or a GPS [start, stop) interval", ) # general arguments parser.add_argument( "-v", "--verbose", action="store_true", default=False, help="show verbose output, default: %(default)s", ) parser.add_argument( "-V", "--version", action="version", help="show program's version number and exit", ) parser.version = __version__ # basic options bopts.add_argument( '-i', '--ifo', action='store', type=str, metavar='IFO', help="Instrument to process. If this option is set in " "the [DEFAULT] of any of the INI files, giving it " "here is redundant.", ) wrapgroup = bopts.add_mutually_exclusive_group() wrapgroup.add_argument( '-w', '--skip-html-wrapper', action='store_true', default=False, help="Do not configure first job for HTML htmlnode, default: " "%(default)s. Useful for separating large summary pipeline " "across multiple DAGs", ) wrapgroup.add_argument( '-W', '--html-wrapper-only', action='store_true', help="Only run first job for HTML htmlnode.", ) bopts.add_argument( '-t', '--file-tag', action='store', type=str, default='gw_summary_pipe', help="file tag for pipeline files, default: %(default)s", ) # HTCondor options htcopts.add_argument( '-u', '--universe', action='store', type=str, default='vanilla', help="Universe for condor jobs, default: %(default)s", ) htcopts.add_argument( '-l', '--log-dir', action='store', type=str, default=os.environ.get('LOCALDIR', None), help="Directory path for condor log files, default: %(default)s", ) htcopts.add_argument( '-m', '--maxjobs', action='store', type=int, default=None, metavar='N', help="Restrict the DAG to submit only N jobs at any one " "time, default: %(default)s", ) htcopts.add_argument( '-T', '--condor-timeout', action='store', type=float, default=None, metavar='T', help='Configure condor to terminate jobs after T hours ' 'to prevent idling, default: %(default)s', ) htcopts.add_argument( '-c', '--condor-command', action='append', type=str, default=[], help="Extra condor submit commands to add to gw_summary submit file. " "Can be given multiple times in the form \"key=value\"", ) # configuration options copts.add_argument( '-f', '--config-file', action='append', type=str, metavar='FILE', default=[], help="INI file for analysis, may be given multiple times", ) copts.add_argument( '-g', '--global-config', action='append', type=str, metavar='FILE', default=[], help="INI file for use in all workflow jobs, may be given " "multiple times", ) copts.add_argument( '-p', '--priority', action='append', type=str, default=[], help="priority for DAG node, should be given once " "for each --config-file in the same order", ) # process options popts.add_argument( '--nds', action='store_true', default='guess', help='use NDS as the data source, default: %(default)s', ) popts.add_argument( '--single-process', action='store_true', default=False, help="restrict gw_summary to a single process, mainly for " "debugging purposes, default: %(default)s", ) popts.add_argument( '--multi-process', action='store', type=int, default=None, help="maximum number of concurrent sub-processes for each " "gw_summary job, {number of CPUs} / {min(number of jobs, 4)}", ) popts.add_argument( '-a', '--archive', action='store_true', default=False, help="Read archived data from the FILE, and " "write back to it at the end", ) popts.add_argument( '-S', '--on-segdb-error', action='store', type=str, default='raise', choices=['raise', 'ignore', 'warn'], help="action upon error fetching segments from SegDB", ) popts.add_argument( '-G', '--on-datafind-error', action='store', type=str, default='raise', choices=['raise', 'ignore', 'warn'], help="action upon error querying for frames from the " "datafind server: default: %(default)s", ) popts.add_argument( '--data-cache', action='append', default=[], help='path to LAL-format cache of TimeSeries data files', ) popts.add_argument( '--event-cache', action='append', default=[], help='path to LAL-format cache of event trigger files', ) popts.add_argument( '--segment-cache', action='append', default=[], help='path to LAL-format cache of state ' 'or data-quality segment files', ) popts.add_argument( '--no-htaccess', action='store_true', default=False, help='tell gw_summary to not write .htaccess files', ) # output options outopts.add_argument( '-o', '--output-dir', action='store', type=str, metavar='OUTDIR', default=os.curdir, help="Output directory for summary information, " "default: '%(default)s'", ) # time mode options topts.add_argument( "--day", action="store", type=str, metavar='YYYYMMDD', help="UTC date to process", ) topts.add_argument( "--week", action="store", type=str, metavar="YYYYMMDD", help="week to process (by UTC starting date)", ) topts.add_argument( "--month", action="store", type=str, metavar="YYYYMM", help="calendar month to process", ) topts.add_argument( "--year", action="store", type=str, metavar="YYYY", help="calendar year to process", ) topts.add_argument( "-s", "--gps-start-time", action="store", type=int, metavar="GPSSTART", help="GPS start time", ) topts.add_argument( "-e", "--gps-end-time", action="store", type=int, metavar="GPSEND", help="GPS end time", ) # return the argument parser return parser
# -- main code block ----------------------------------------------------------
[docs] def main(args=None): """Run the command-line Omega scan tool in batch mode """ parser = create_parser() args = parser.parse_args(args=args) # initialize logger logger = cli.logger( name=PROG.split('python -m ').pop(), level='DEBUG' if args.verbose else 'INFO', ) # check time options N = sum([ is not None, args.month is not None, args.gps_start_time is not None, args.gps_end_time is not None]) if N > 1 and not (args.gps_start_time and args.gps_end_time): raise parser.error("Please give only one of --day, --month, or " "--gps-start-time and --gps-end-time.") for (i, cf) in enumerate(args.config_file): args.config_file[i] = ','.join(map(os.path.abspath, cf.split(','))) args.global_config = list(map( os.path.abspath, [fp for csv in args.global_config for fp in csv.split(',')], )) # -- build workflow directories ----------------- # move to output directory indir = os.getcwd() mkdir(args.output_dir) os.chdir(args.output_dir) outdir = os.curdir # set node log path, and condor log path logdir = os.path.join(outdir, 'logs') htclogdir = args.log_dir or logdir mkdir(logdir, htclogdir) # set config directory and copy config files etcdir = os.path.join(outdir, 'etc') mkdir(etcdir) for (i, fp) in enumerate(args.global_config): inicopy = os.path.join(etcdir, os.path.basename(fp)) if not os.path.isfile(inicopy) or not os.path.samefile(fp, inicopy): shutil.copyfile(fp, inicopy) args.global_config[i] = os.path.abspath(inicopy) for (i, csv) in enumerate(args.config_file): inicopy = [] for fp in csv.split(','): fp2 = os.path.join(etcdir, os.path.basename(fp)) if not os.path.isfile(fp2) or not os.path.samefile(fp, fp2): shutil.copyfile(fp, fp2) inicopy.append(os.path.abspath(fp2)) args.config_file[i] = ','.join(inicopy) logger.debug("Copied all INI configuration files to %s" % etcdir) # -- configure X509 and kerberos for condor ----- if args.universe != 'local': # copy X509 grid certificate into local location (x509cert, _) = find_credential() x509copy = os.path.join(etcdir, os.path.basename(x509cert)) shutil.copyfile(x509cert, x509copy) # rerun kerberos with new path krb5cc = os.path.abspath(os.path.join(etcdir, 'krb5cc.krb5')) gwkerberos.kinit(krb5ccname=krb5cc) logger.debug("Configured Condor and Kerberos " "for NFS-shared credentials") # -- build DAG ---------------------------------- dag = pipeline.CondorDAG(os.path.join(htclogdir, '%s.log' % args.file_tag)) dag.set_dag_file(os.path.join(outdir, args.file_tag)) universe = args.universe # -- parse condor commands ---------------------- # parse into a dict condorcmds = {} if args.condor_timeout: condorcmds['periodic_remove'] = ( 'CurrentTime-EnteredCurrentStatus > %d' % (3600 * args.condor_timeout) ) for cmd_ in args.condor_command: (key, value) = cmd_.split('=', 1) condorcmds[key.rstrip().lower()] = value.strip() if args.universe != 'local': # add X509 to environment for (env_, val_) in zip(['X509_USER_PROXY', 'KRB5CCNAME'], [os.path.abspath(x509copy), krb5cc]): condorenv = '%s=%s' % (env_, val_) if ('environment' in condorcmds and env_ not in condorcmds['environment']): condorcmds['environment'] += ';%s' % condorenv elif 'environment' not in condorcmds: condorcmds['environment'] = condorenv # -- build individual gw_summary jobs ----------- globalconfig = ','.join(args.global_config) jobs = [] if not args.skip_html_wrapper: htmljob = GWSummaryJob( 'local', subdir=outdir, logdir=logdir, tag='%s_local' % args.file_tag, **condorcmds) jobs.append(htmljob) if not args.html_wrapper_only: datajob = GWSummaryJob( universe, subdir=outdir, logdir=logdir, tag=args.file_tag, **condorcmds) jobs.append(datajob) # add common command-line options for job in jobs: if job.set_command('day') job.add_arg( elif args.week: job.set_command('week') job.add_arg(args.week) elif args.month: job.set_command('month') job.add_arg(args.month) elif args.year: job.set_command('year') job.add_arg(args.year) elif args.gps_start_time or args.gps_end_time: job.set_command('gps') job.add_arg(str(args.gps_start_time)) job.add_arg(str(args.gps_end_time)) else: job.set_command('day') if args.nds is True: job.add_opt('nds') if args.single_process: job.add_opt('single-process') elif args.multi_process is not None: job.add_opt('multi-process', args.multi_process) if args.verbose: job.add_opt('verbose') if args.ifo: job.add_opt('ifo', args.ifo) job.add_opt('on-segdb-error', args.on_segdb_error) job.add_opt('on-datafind-error', args.on_datafind_error) job.add_opt('output-dir', outdir) for (opt, fplist) in zip( ['--data-cache', '--event-cache', '--segment-cache'], [args.data_cache, args.event_cache, args.segment_cache]): if fplist: job.add_arg('%s %s' % (opt, (' %s ' % opt).join(fplist))) if args.no_htaccess: job.add_opt('no-htaccess') # make surrounding HTML first if not args.skip_html_wrapper: htmljob.add_opt('html-only', '') htmljob.add_opt('config-file', ','.join( [globalconfig]+args.config_file).strip(',')) htmlnode = GWSummaryDAGNode(htmljob) for configfile in args.config_file: htmlnode.add_input_file(args.config_file) htmlnode.set_category('gw_summary') dag.add_node(htmlnode) logger.debug(" -- Configured HTML htmlnode job") # create node for each config file if not args.html_wrapper_only: # add html opts datajob.add_opt('no-html', '') if args.archive: datajob.add_condor_cmd('+SummaryNodeType', '"$(macroarchive)"') # configure each data node for (i, configfile) in enumerate(args.config_file): node = GWSummaryDAGNode(datajob) node.add_var_arg('--config-file %s' % ','.join( [globalconfig, configfile]).strip(',')) if args.archive: jobtag = os.path.splitext(os.path.basename(configfile))[0] archivetag = jobtag.upper().replace('-', '_') if args.ifo and archivetag.startswith('%s_' % args.ifo.upper()): archivetag = archivetag[3:] node.add_var_opt('archive', archivetag) for cf in configfile.split(','): node.add_input_file(cf) node.set_category('gw_summary') try: node.set_priority(args.priority[i]) except IndexError: node.set_priority(0) node.set_retry(1) if not args.skip_html_wrapper: node.add_parent(htmlnode) dag.add_node(node) logger.debug(" -- Configured job for config %s" % configfile) if args.maxjobs: dag.add_maxjobs_category('gw_summary', args.maxjobs) # -- finish up ---------------------------------- dag.write_sub_files() dag.write_dag() dag.write_script() logger.debug("Setup complete, DAG written to: {}".format( os.path.abspath(dag.get_dag_file()))) # return to original directory os.chdir(indir)
# -- run from command-line ---------------------------------------------------- if __name__ == "__main__": main()