Source code for gwsumm.segments

# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#               Evan Goetz (2023)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm.  If not, see <http://www.gnu.org/licenses/>.

"""Utilities for segment handling and display
"""

import sys
import operator
import warnings
from collections import OrderedDict
from configparser import (
    DEFAULTSECT,
    ConfigParser,
    NoSectionError,
    NoOptionError,
)

from astropy.io.registry import IORegistryError

from gwpy.segments import (DataQualityFlag, DataQualityDict,
                           SegmentListDict, SegmentList, Segment)

from . import globalv
from .utils import (
    re_flagdiv,
    vprint,
    WARNC,
    ENDC,
)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

SEGDB_URLS = [
    'https://segdb.ligo.caltech.edu',
    'https://metaserver.phy.syr.edu',
    'https://geosegdb.atlas.aei.uni-hannover.de',
    'http://10.20.50.30'  # geosegdb internal
]


[docs] def get_segments(flag, validity=None, config=ConfigParser(), cache=None, query=True, return_=True, coalesce=True, padding=None, ignore_undefined=False, segdb_error='raise', url=None, **read_kw): """Retrieve the segments for a given flag Segments will be loaded from global memory if already defined, otherwise they will be loaded from the given :class:`~glue.lal.Cache`, or finally from the segment database Parameters ---------- flag : `str`, `list` either the name of one flag, or a list of names validity : `~gwpy.segments.SegmentList` the segments over which to search for other segments query : `bool`, optional, default: `True` actually execute a read/query operation (if needed), otherwise just retrieve segments that have already been cached config : `~configparser.ConfigParser`, optional the configuration for your analysis, if you have one. If present the ``[segment-database]`` section will be queried for the following options - ``gps-start-time``, and ``gps-end-time``, if ``validity`` is not given - ``url`` (the remote hostname for the segment database) if the ``url`` keyword is not given cache : :class:`glue.lal.Cache`, optional a cache of files from which to read segments, otherwise segments will be downloaded from the segment database coalesce : `bool`, optional, default: `True` coalesce all segmentlists before returning, otherwise just return segments as they were downloaded/read padding : `tuple`, or `dict` of `tuples`, optional `(start, end)` padding with which to pad segments that are downloaded/read ignore_undefined : `bool`, optional, default: `False` Special case needed for network calculation compound flags so that when this is True, DataQualityFlag.known values are set to the same value as ``validity`` segdb_error : `str`, optional, default: ``'raise'`` how to handle errors returned from the segment database, one of - ``'raise'`` (default) : raise the exception as normal - ``'warn'`` : print the exception as a warning, but return no segments - ``'ignore'`` : silently ignore the error and return no segments url : `str`, optional the remote hostname for the target segment database return_ : `bool`, optional, default: `True` internal flag to enable (True) or disable (False) actually returning anything. This is useful if you want to download/read segments now but not use them until later (e.g. plotting) **read_kw : `dict`, optional additional keyword arguments to `~gwpy.segments.DataQualityDict.read` or `~gwpy.segments.DataQualityFlag.read` Returns ------- flag : `~gwpy.segments.DataQualityFlag` the flag object representing segments for the given single flag, OR flagdict : `~gwpy.segments.DataQualityDict` the dict of `~gwpy.segments.DataQualityFlag` objects for multiple flags, if ``flag`` is given as a `list`, OR None if ``return_=False`` """ if isinstance(flag, str): flags = flag.split(',') else: flags = flag allflags = set([f for cf in flags for f in re_flagdiv.split(str(cf))[::2] if f]) if padding is None and isinstance(flag, DataQualityFlag): padding = {flag: flag.padding} elif padding is None: padding = dict((flag, isinstance(flag, DataQualityFlag) and flag.padding or None) for flag in flags) # check validity if validity is None: start = config.get(DEFAULTSECT, 'gps-start-time') end = config.get(DEFAULTSECT, 'gps-end-time') span = SegmentList([Segment(start, end)]) elif isinstance(validity, DataQualityFlag): validity = validity.active try: span = SegmentList([validity.extent()]) except ValueError: span = SegmentList() else: try: span = SegmentList([SegmentList(validity).extent()]) except ValueError: span = SegmentList() validity = SegmentList(validity) # generate output object out = DataQualityDict() for f in flags: out[f] = DataQualityFlag(f, known=validity, active=validity) for f in allflags: globalv.SEGMENTS.setdefault(f, DataQualityFlag(f)) # read segments from global memory and get the intersection of needed times try: old = SegmentListDict() for f in flags: old[f] = globalv.SEGMENTS.get(f, DataQualityFlag(f)).known old = SegmentList(old.intersection(flags)) except TypeError: old = SegmentList() newsegs = validity - old # load new segments query &= abs(newsegs) != 0 query &= len(allflags) > 0 if cache is not None: query &= len(cache) != 0 if query: if cache is not None: if isinstance(cache, str) and cache.endswith( (".h5", ".hdf", ".hdf5")) and ( 'path' not in read_kw): read_kw['path'] = config.get( 'DEFAULT', 'segments-hdf5-path', fallback='segments') try: new = DataQualityDict.read(cache, list(allflags), **read_kw) except IORegistryError as e: # can remove when astropy >= 1.2 is required if type(e) is not IORegistryError: raise if len(allflags) == 1: f = list(allflags)[0] new = DataQualityDict() new[f] = DataQualityFlag.read( cache, f, coalesce=False, **read_kw) for f in new: new[f].known &= newsegs new[f].active &= newsegs if coalesce: new[f].coalesce() vprint(" Read %d segments for %s (%.2f%% coverage).\n" % (len(new[f].active), f, float(abs(new[f].known))/float(abs(newsegs))*100)) else: if len(newsegs) >= 10: qsegs = span else: qsegs = newsegs # parse configuration for query kwargs = {} if url is not None: kwargs['url'] = url else: try: kwargs['url'] = config.get('segment-database', 'url') except (NoSectionError, NoOptionError): pass if kwargs.get('url', None) in SEGDB_URLS: query_func = DataQualityDict.query_segdb else: query_func = DataQualityDict.query_dqsegdb try: new = query_func(allflags, qsegs, on_error=segdb_error, **kwargs) except Exception as e: # ignore error from SegDB if segdb_error in ['ignore', None]: pass # convert to warning elif segdb_error in ['warn']: print('%sWARNING: %sCaught %s: %s [gwsumm.segments]' % (WARNC, ENDC, type(e).__name__, str(e)), file=sys.stderr) warnings.warn('%s: %s' % (type(e).__name__, str(e))) # otherwise raise as normal else: raise new = DataQualityDict() for f in new: new[f].known &= newsegs new[f].active &= newsegs if coalesce: new[f].coalesce() vprint(" Downloaded %d segments for %s (%.2f%% coverage).\n" % (len(new[f].active), f, float(abs(new[f].known))/float(abs(newsegs))*100)) # record new segments globalv.SEGMENTS += new for f in new: globalv.SEGMENTS[f].description = str(new[f].description) # return what was asked for if return_: for compound in flags: union, intersection, exclude, notequal = split_compound_flag( compound) if len(f := (union + intersection)) == 1: out[compound].description = globalv.SEGMENTS[f[0]].description out[compound].padding = padding.get(f[0], (0, 0)) for flist, op in zip([exclude, intersection, union, notequal], [operator.sub, operator.and_, operator.or_, not_equal]): for f in flist: pad = padding.get(f, (0, 0)) segs = globalv.SEGMENTS[f].copy() if ignore_undefined: segs.known = validity if isinstance(pad, (float, int)): segs = segs.pad(pad, pad) elif pad is not None: segs = segs.pad(*pad) if coalesce: segs = segs.coalesce() out[compound] = op(out[compound], segs) out[compound].known &= validity out[compound].active &= validity if coalesce: out[compound].coalesce() if isinstance(flag, str): return out[flag] else: return out
[docs] def not_equal(a, b, f): diff1 = a - b diff2 = b - a return diff1 | diff2
[docs] def split_compound_flag(compound): """Parse the configuration for this state. Returns ------- flags : `tuple` a 2-tuple containing lists of flags defining required ON and OFF segments respectively for this state """ # find flags divs = re_flagdiv.findall(compound) keys = re_flagdiv.split(compound) # load flags and vetoes union = [] intersection = [] exclude = [] notequal = [] for i, key in enumerate(keys[::2]): if not key: continue # get veto bit if i != 0 and divs[i-1] == '!': exclude.append(key) elif i != 0 and divs[i-1] == '|': union.append(key) elif i != 0 and divs[i-1] == '!=': notequal.append(key) else: intersection.append(key) return union, intersection, exclude, notequal
[docs] def format_padding(flags, padding): """Format an arbitrary collection of paddings into a `dict` """ # parse string to start with if isinstance(padding, str): padding = list(eval(str)) # zip list into dict if (isinstance(padding, (list)) or (isinstance(padding, tuple) and len(padding) and (any(isinstance(p, (list, tuple)) for p in padding) or len(padding) > 2))): return OrderedDict(list(zip(flags, padding))) # otherwise copy single padding param for all flags elif not isinstance(padding, dict): return OrderedDict((c, padding) for c in flags) else: return padding