Source code for gwsumm.config

# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm.  If not, see <http://www.gnu.org/licenses/>.

"""Thin wrapper around configparser
"""

import configparser
import os
import re

# import these for evaluating lambda expressions in the configuration file
import math  # noqa: F401
import numpy  # noqa: F401

from collections import OrderedDict
from importlib import import_module
from io import StringIO
from http.client import HTTPException

from matplotlib import rcParams

from astropy import units

from gwpy.detector import (Channel, ChannelList)
from gwpy.time import tconvert

from ..channels import (get_channels, split as split_channels,
                        update_channel_params)
from ..html import (get_css, get_js)
from ..utils import (nat_sorted, re_cchar, re_quote, safe_eval,
                     OBSERVATORY_MAP)

__all__ = [
    'GWSummConfigParser',
]

CONFIGDIR = os.path.dirname(__file__)


# -- class definitions --------------------------------------------------------

[docs] class GWSummConfigParser(configparser.ConfigParser): # preserve case in options optionxform = str # disable colon separator OPTCRE = re.compile( r'(?P<option>[^=\s][^=]*)\s*(?P<vi>[=])\s*(?P<value>.*)$') # set interpolation match _interpvar_re = re.compile(r"%\(([^)]+)\)s") def __init__(self, *args, **kwargs): kwargs.setdefault('dict_type', OrderedDict) configparser.ConfigParser.__init__(self, *args, **kwargs) __init__.__doc__ = configparser.ConfigParser.__init__.__doc__
[docs] def ndoptions(self, section, **kwargs): options = configparser.ConfigParser.options(self, section, **kwargs) return [o for o in options if o not in self._defaults]
[docs] def nditems(self, section, **kwargs): items = configparser.ConfigParser.items(self, section, **kwargs) return [i for i in items if i[0] not in self._defaults]
[docs] def read(self, filenames): readok = configparser.ConfigParser.read(self, filenames) if isinstance(filenames, str): filenames = filenames.split(',') for fp in filenames: if fp not in readok: raise IOError("Cannot read file: %s" % fp) self.files = list(map(os.path.abspath, readok)) return readok
[docs] @classmethod def from_configparser(cls, cp): """Copy an existing :class:`~configparser.ConfigParser`. """ # if its already what we need, just return this instance if isinstance(cp, cls): return cp # set up temporary buffer buf = StringIO() # write to buffer cp.write(buf) buf.seek(0) # read new GWSummConfigParser new = cls() new.read_file(buf) return new
def __repr__(self): return '<GWSummConfigParser()>'
[docs] def interpolate_section_names(self, **kwargs): """Interpolate a specific key in a section name using val """ for section in self.sections(): s = section for key in self._interpvar_re.findall(section): try: val = kwargs[key] except KeyError: raise configparser.InterpolationMissingOptionError( '[%s]' % section, section, key, '%%(%s)s' % key) s = section % {key: val} self._sections[s] = self._sections.pop(section)
[docs] def set_ifo_options(self, ifo, observatory=None, section=configparser.DEFAULTSECT): """Set configurations options in [DEFAULT] based on the given `ifo` The following options are set - `IFO` - the two-character interferometer prefix, e.g. ``L1`` - `ifo` - the two-character interferometer prefix in lower-case, e.g. ``l1`` - `SITE` - the single-character site ID, e.g. ``L`` - `site` - the single-character site ID,n lower-case e.g. ``l`` Additionally, if `observatory` is given, or the `ifo` matches known observatories, the following option is set - `observatory` - the name of the observatory, e.g. ``LIGO Livingston`` """ if observatory is None: observatory = OBSERVATORY_MAP.get(ifo) self.set(section, 'IFO', ifo) self.set(section, 'ifo', ifo.lower()) self.set(section, 'SITE', ifo[0].upper()) self.set(section, 'site', ifo[0].lower()) if observatory is not None: self.set(section, 'observatory', observatory)
[docs] def set_date_options(self, start, end, section=configparser.DEFAULTSECT): """Set datetime options in [DEFAULT] based on the given times The following options are set - `gps-start-time` - the integer GPS start time of this job - `gps-end-time` - the integer GPS end time of this job - `yyyy` - the four-digit year of the start date - `mm` - the two-digit month of the start date - `dd` - the two-digit day-of-month of the start date - `yyyymm` - the six-digit year and month of the start date - `yyyymmdd` - the eight-digit year-month-day of the start date - `duration` - the duration of the job (seconds) Additionally, if LAL is available, the following extra options are also set - `leap-seconds` - the number of leap seconds for the start date - `gps-start-time-noleap` - the leap-corrected integer GPS start time - `gps-end-time-noleap` - the leap-corrected integer GPS end time """ utc = tconvert(start) self.set(section, 'gps-start-time', str(int(start))) self.set(section, 'gps-end-time', str(int(end))) self.set(section, 'yyyy', utc.strftime('%Y')) self.set(section, 'yy', utc.strftime('%y')) self.set(section, 'mm', utc.strftime('%m')) self.set(section, 'dd', utc.strftime('%d')) self.set(section, 'yyyymm', utc.strftime('%Y%m')) self.set(section, 'yyyymmdd', utc.strftime('%Y%m%d')) self.set(section, 'duration', str(int(end - start))) try: from lal import GPSLeapSeconds as leap_seconds except ImportError: pass else: nleap = leap_seconds(int(start)) self.set(section, 'leap-seconds', str(nleap)) self.set(section, 'gps-start-time-noleap', str(int(start) - nleap)) self.set(section, 'gps-end-time-noleap', str(int(end) - nleap))
[docs] def finalize(self): """Finalize this `GWSummConfigParser` by running all the loaders This method is just a shortcut to run each of the following .. autosummary:: ~GWSummConfigParser.load_plugins ~GWSummConfigParser.load_units ~GWSummConfigParser.load_channels ~GWSummConfigParser.load_states """ self.load_plugins() self.load_units() self.load_channels() self.load_states()
[docs] def load_plugins(self): """Load all plugin modules as defined in the [plugins] section """ mods = [] try: plugins = self.ndoptions('plugins') except configparser.NoSectionError: pass else: for plugin in plugins: mods.append(import_module(plugin)) return mods
[docs] def load_units(self): """Load all unit definitions as defined in the [units] section """ try: customunits = self.nditems('units') except configparser.NoSectionError: return [] else: new_ = [] for unit, b in customunits: if b.lower() == 'dimensionless': b = '' new_.append(units.def_unit([unit], units.Unit(b))) units.add_enabled_units(new_) return new_
[docs] def load_channels(self): """Load all channel definitions as given in the selfuration Channels are loaded from sections named [channels-...] or those sections whose name is a channel name in itself """ channels_re = re.compile(r'channels[-\s]') # parse channel grouns into individual sections for section in filter(channels_re.match, self.sections()): names = split_channels(self.get(section, 'channels')) items = dict(self.nditems(section, raw=True)) items.pop('channels') for name in names: name = name.strip(' \n') if not self.has_section(name): self.add_section(name) for key, val in items.items(): if not self.has_option(name, key): self.set(name, key, val) # read all channels raw = set() trend = set() for section in self.sections(): try: m = Channel.MATCH.match(section).groupdict() except AttributeError: continue else: if not m['ifo']: continue if m['trend']: trend.add(section) else: raw.add(section) channels = ChannelList() for group in [raw, trend]: try: newchannels = get_channels(group) except HTTPException: newchannels = [] # read custom channel definitions for channel, section in zip(newchannels, group): for key, val in nat_sorted(self.nditems(section), key=lambda x: x[0]): key = re_cchar.sub('_', key.rstrip()) if key.isdigit(): if not hasattr(channel, 'bits'): channel.bits = [] while len(channel.bits) < int(key): channel.bits.append(None) if val.startswith('r"') or val.startswith('r\''): val = eval(val) channel.bits.append(str(val)) else: setattr(channel, key, safe_eval(val.rstrip())) channels.append(channel) # rebuild channel list with new parameters update_channel_params() return channels
[docs] def load_states(self, section='states'): """Read and format a list of `SummaryState` definitions from the given :class:`~configparser.ConfigParser` """ from ..state import (register_state, SummaryState, ALLSTATE, generate_all_state, get_state) # parse the [states] section into individual state definitions try: states = dict(self.nditems(section)) except configparser.NoSectionError: self.add_section(section) states = {} for state in states: if not (self.has_section('state-%s' % state) or self.has_section('state %s' % state)): section = 'state-%s' % state self.add_section(section) self.set(section, 'name', state) self.set(section, 'definition', states[state]) # parse each state section into a new state states = [] for section in self.sections(): if re.match(r'state[-\s]', section): states.append(register_state( SummaryState.from_ini(self, section))) # register All state start = self.getint(section, 'gps-start-time') end = self.getint(section, 'gps-end-time') try: all_ = generate_all_state(start, end) except ValueError: # user defined an all state themselves all_ = get_state(ALLSTATE) else: states.insert(0, all_) return states
[docs] def load_rcParams(self, section='rcParams'): """Load custom `matplotlib.rcParams` for plots in this analysis """ try: new = dict(self.nditems(section)) except configparser.NoSectionError: return dict() else: for key, value in new.items(): new[key] = safe_eval(value) rcParams.update(new) return new
[docs] def get_css(self, section='html'): # get critical CSS css = get_css().copy() for key in css: try: css[key] = self.get(section, '%s-css' % key) except configparser.NoSectionError: # no overrides are present return list(css.values()) except configparser.NoOptionError: continue files = list(css.values()) # get extra CSS try: extras = self.get(section, 'extra-css') except configparser.NoOptionError: return files else: files.extend([re_quote.sub('', x) for x in extras.split(',')]) return files
[docs] def get_javascript(self, section='html'): # get critical JS js = get_js().copy() for key in js: try: js[key] = self.get(section, '%s-js' % key) except configparser.NoSectionError: return list(js.values()) except configparser.NoOptionError: continue files = list(js.values()) # get extra JS try: extras = self.get(section, 'extra-js') except configparser.NoOptionError: return files else: files.extend([re_quote.sub('', x) for x in extras.split(',')]) return files