Source code for gwsumm.triggers

# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm.  If not, see <http://www.gnu.org/licenses/>.

"""Read and store transient event triggers
"""

import warnings
from urllib.parse import urlparse

from astropy.table import vstack as vstack_tables

from lal.utils import CacheEntry

from ligo.lw import lsctables

from glue.lal import Cache

from gwpy.io.cache import cache_segments
from gwpy.table import (EventTable, filters as table_filters)
from gwpy.table.filter import parse_column_filters
from gwpy.table.io.pycbc import filter_empty_files as filter_pycbc_live_files
from gwpy.segments import (DataQualityFlag, SegmentList)

import gwtrigfind

from . import globalv
from .utils import (re_cchar, vprint, safe_eval)
from .config import GWSummConfigParser
from .channels import get_channel

# build list of default keyword arguments for reading ETGs
ETG_READ_KW = {
    'cwb': {
        'format': 'root',
        'treename': 'waveburst',
    },
    'daily_ihope': {
        'format': 'ligolw',
        'tablename': 'sngl_inspiral',
        'use_numpy_dtypes': True,
    },
    'daily_ahope': {
        'format': 'ligolw',
        'tablename': 'sngl_inspiral',
        'use_numpy_dtypes': True,
    },
    'dmt_omega': {
        'format': 'ligolw',
        'tablename': 'sngl_burst',
        'use_numpy_dtypes': True,
    },
    'dmt_wsearch': {
        'format': 'ligolw',
        'tablename': 'sngl_burst',
        'use_numpy_dtypes': True,
    },
    'kleinewelle': {
        'format': 'ligolw',
        'tablename': 'sngl_burst',
        'use_numpy_dtypes': True,
    },
    'kw': {
        'format': 'ligolw',
        'tablename': 'sngl_burst',
        'use_numpy_dtypes': True,
    },
    'omega': {
        'format': 'ascii',
    },
    'omegadq': {
        'format': 'ascii',
    },
    'omicron': {
        'format': 'hdf5',
        'path': '/triggers',
        'trigfind-ext': 'h5',
    },
    'pycbc_live': {
        'format': 'hdf5.pycbc_live',
        'timecolumn': 'end_time',
        'extended_metadata': False,
    },
}

# set default for all LIGO_LW
for name in lsctables.TableByName:
    ETG_READ_KW[name] = {
        'format': 'ligolw',
        'tablename': name,
        'use_numpy_dtypes': True,
    }


[docs] def get_etg_table(etg): """Find which table should be used for the given etg Parameters ---------- etg : `str` name of Event Trigger Generator for which to query Returns ------- table : `type`, subclass of `~ligo.lw.table.Table` LIGO_LW table registered to the given ETG Raises ------ KeyError if the ETG is not registered """ try: kw_ = get_etg_read_kwargs(etg) form = kw_['format'] tablename = kw_['tablename'] except KeyError as e: e.args = ('No LIGO_LW table registered to etg %r' % etg,) raise if form == 'ligolw': return lsctables.TableByName[tablename] raise KeyError("No LIGO_LW table registered to etg %r" % etg)
[docs] def get_triggers(channel, etg, segments, config=GWSummConfigParser(), cache=None, columns=None, format=None, query=True, nproc=1, ligolwtable=None, filter=None, timecolumn=None, verbose=False, return_=True): """Read a table of transient event triggers for a given channel. """ key = '%s,%s' % (str(channel), etg.lower()) # convert input segments to a segmentlist (for convenience) if isinstance(segments, DataQualityFlag): segments = segments.active segments = SegmentList(segments) # get read keywords for this etg read_kw = get_etg_read_kwargs(etg, config=config, exclude=[]) read_kw['verbose'] = verbose # extract columns (using function keyword if given) if columns: read_kw['columns'] = columns columns = read_kw.pop('columns', None) # override with user options if format: read_kw['format'] = format elif not read_kw.get('format', None): read_kw['format'] = etg.lower() if timecolumn: read_kw['timecolumn'] = timecolumn elif columns is not None and 'time' in columns: read_kw['timecolumn'] = 'time' # replace columns keyword if read_kw['format'].startswith('ascii.'): read_kw['include_names'] = columns else: read_kw['columns'] = columns # parse filters if filter: read_kw['selection'].extend(parse_column_filters(filter)) # read segments from global memory try: havesegs = globalv.TRIGGERS[key].meta['segments'] except KeyError: new = segments else: new = segments - havesegs # read new triggers if query and abs(new) != 0: ntrigs = 0 vprint(" Grabbing %s triggers for %s" % (etg, str(channel))) # -- setup ---------- # get find/read kwargs trigfindkwargs = dict( (k[9:], read_kw.pop(k)) for k in list(read_kw) if k.startswith('trigfind-')) trigfindetg = trigfindkwargs.pop('etg', etg) # customise kwargs for this ETG if etg.lower().replace('-', '_') in ['pycbc_live']: read_kw['ifo'] = get_channel(channel).ifo if etg.lower() in ['kw', 'kleinewelle']: read_kw['selection'].append('channel == "%s"' % channel) if etg.lower() in ['cwb'] and 'root' not in read_kw['format']: read_kw.pop('treename') # filter on segments if 'timecolumn' in read_kw: read_kw['selection'].append(( read_kw['timecolumn'], table_filters.in_segmentlist, new)) # -- read ----------- # if single file if cache is not None and len(cache) == 1: trigs = read_cache(cache, new, etg, nproc=nproc, **read_kw) if trigs is not None: add_triggers(trigs, key) ntrigs += len(trigs) # otherwise, loop over segments else: for segment in new: # find trigger files if cache is None and not etg.lower() == 'hacr': try: segcache = gwtrigfind.find_trigger_files( str(channel), trigfindetg, segment[0], segment[1], **trigfindkwargs) except ValueError as e: warnings.warn("Caught %s: %s" % (type(e).__name__, str(e))) continue elif cache is not None: segcache = cache # read table if etg.lower() == 'hacr': from gwpy.table.io.hacr import get_hacr_triggers trigs = get_hacr_triggers(channel, segment[0], segment[1], columns=columns) trigs.meta['segments'] = SegmentList([segment]) else: trigs = read_cache(segcache, SegmentList([segment]), etg, nproc=nproc, **read_kw) # extend columns if possible to include bandwidth, central # frequency, and duration if trigs is not None: if 'fstart' in columns and 'fend' in columns: if 'bandwidth' not in columns: trigs.add_column(trigs['fend'] - trigs['fstart'], name='bandwidth') if 'central_freq' not in columns: trigs.add_column( 0.5*(trigs['fstart'] + trigs['fend']), name='central_freq') if ('tstart' in columns and 'tend' in columns and 'duration' not in columns): trigs.add_column(trigs['tend'] - trigs['tstart'], name='duration') # record triggers if trigs is not None: # add metadata add_triggers(trigs, key) ntrigs += len(trigs) vprint(".") vprint(" | %d events read\n" % ntrigs) # if asked to read triggers, but didn't actually read any, # create an empty table so that subsequent calls don't raise KeyErrors if query and key not in globalv.TRIGGERS: # find LIGO_LW table for this ETG try: if columns is not None: # don't need to map to LIGO_LW raise KeyError TableClass = get_etg_table(etg) except KeyError: # build simple table if 'fstart' in columns and 'fend' in columns: for colval in ['bandwidth', 'central_freq']: if colval not in columns: columns.extend([colval]) if ('tstart' in columns and 'tend' in columns and 'duration' not in columns): columns.extend(['duration']) tab = EventTable(names=columns) else: # map to LIGO_LW table with full column listing tab = EventTable(lsctables.New(TableClass)) tab.meta['segments'] = SegmentList() for metakey in ('timecolumn', 'tablename',): if metakey in read_kw: tab.meta[metakey] = read_kw[metakey] add_triggers(tab, key) # work out time function if return_: return keep_in_segments(globalv.TRIGGERS[key], segments, etg)
[docs] def add_triggers(table, key, segments=None): """Add a `EventTable` to the global memory cache """ if segments is not None: table.meta['segments'] = segments try: old = globalv.TRIGGERS[key] except KeyError: new = globalv.TRIGGERS[key] = table new.meta.setdefault('segments', SegmentList()) else: new = globalv.TRIGGERS[key] = vstack_tables((old, table)) new.meta = old.meta new.meta['segments'] |= table.meta.get('segments', SegmentList()) new.meta['segments'].coalesce() return new
[docs] def keep_in_segments(table, segmentlist, etg=None): """Return a view of the table containing only those rows in the segmentlist """ times = get_times(table, etg) keep = table_filters.in_segmentlist(times, segmentlist) out = table[keep] out.meta['segments'] = segmentlist & table.meta['segments'] return out
[docs] def get_times(table, etg): """Get the time data for this table See Also -------- get_time_column """ try: return table[get_time_column(table, etg)] except ValueError: # match well-defined LIGO_LW table types tablename = table.meta.get('tablename') or '' for ttype, tcol in ( ('_burst', 'peak'), ('_inspiral', 'end'), ('_ringdown', 'start'), ): if tablename.endswith(ttype): sec = '{0}_time'.format(tcol) nanosec = '{0}_time_ns'.format(tcol) return table[sec] + table[nanosec] * 1e-9 raise
[docs] def get_time_column(table, etg): """Get the time column name for this table """ # allow user to have selected the time column if table.meta.get('timecolumn'): return table.meta['timecolumn'] # otherwise search for it try: return table._get_time_column() except ValueError: # shortcut pycbc if etg == 'pycbc_live': 'end_time' # match well-defined LIGO_LW table types tablename = table.meta.get('tablename') or '' for ttype, tcol in ( ('_burst', 'peak'), ('_inspiral', 'end'), ('_ringdown', 'start'), ): if tablename.endswith(ttype) and tcol in table.columns: table.meta['timecolumn'] = tcol return tcol raise
[docs] def get_etg_read_kwargs(etg, config=None, exclude=['columns']): """Read keyword arguments to pass to the trigger reader for a given etg """ # use global defaults kwargs = { 'format': None, 'selection': [], } # get kwargs from config if config is not None and config.has_section(etg): config_kw = dict(config.nditems(etg)) elif config is not None and config.has_section(etg.lower()): config_kw = dict(config.nditems(etg.lower())) else: config_kw = {} usrfmt = config_kw.get('format', None) # get ETG defaults : only if user didn't specify the read format, # or the format they did specify matches our default etgl = re_cchar.sub('_', etg).lower() if etgl in ETG_READ_KW and usrfmt in (None, ETG_READ_KW[etgl]['format']): kwargs.update(ETG_READ_KW.get(etgl, {})) # now add the config kwargs (so they override our defaults) kwargs.update(config_kw) # format kwargs for key in list(kwargs): # remove unwanted keys if key in exclude: kwargs.pop(key) continue # eval string to object (safely) kwargs[key] = safe_eval(kwargs[key]) if key.endswith('columns') and isinstance(kwargs[key], str): kwargs[key] = kwargs[key].replace(' ', '').split(',') if 'selection' in kwargs: kwargs['selection'] = parse_column_filters(kwargs['selection']) return kwargs
[docs] def read_cache(cache, segments, etg, nproc=1, timecolumn=None, **kwargs): """Read a table of events from a cache This function is mainly meant for use from the `get_triggers` method Parameters ---------- cache : :class:`glue.lal.Cache` the formatted list of files to read segments : `~gwpy.segments.SegmentList` the list of segments to read etg : `str` the name of the trigger generator that created the files nproc : `int`, optional the number of parallel processes to use when reading **kwargs other keyword arguments are passed to the `EventTable.read` or `{tableclass}.read` methods Returns ------- table : `~gwpy.table.EventTable`, `None` a table of events, or `None` if the cache has no overlap with the segments """ if isinstance(cache, Cache): cache = cache.sieve(segmentlist=segments) cache = cache.checkfilesexist()[0] cache.sort(key=lambda x: x.segment[0]) cache = cache.pfnlist() # some readers only like filenames else: cache = [urlparse(url).path for url in cache] if etg == 'pycbc_live': # remove empty HDF5 files cache = filter_pycbc_live_files(cache, ifo=kwargs['ifo']) # Currently the EventTable.read() method fails if there are zero # length trigger files in addition to non-zero length trigger files. # Here, we read trigger files one-by-one and remove any with zero length # from the cache list. cache = [this_trigger_file for this_trigger_file in cache if len(EventTable.read(this_trigger_file, **kwargs)) != 0] if len(cache) == 0: return # read triggers table = EventTable.read(cache, **kwargs) # store read keywords in the meta table if timecolumn: table.meta['timecolumn'] = timecolumn # get back from cache entry if isinstance(cache, CacheEntry): cache = Cache([cache]) # append new events to existing table try: csegs = cache_segments(cache) & segments except (AttributeError, TypeError, ValueError): csegs = SegmentList() table.meta['segments'] = csegs if timecolumn: # already filtered on-the-fly return table # filter now return keep_in_segments(table, segments, etg)