# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm. If not, see <http://www.gnu.org/licenses/>.
"""Utilities for channel access
"""
import re
from functools import wraps
from astropy.units import Unit
from gwpy.detector import (Channel, ChannelList)
from gwpy.io.nds2 import Nds2ChannelType
from . import globalv
from .utils import re_quote
__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
NDS2_TYPES = Nds2ChannelType.nds2names()
CIS_URL = 'https://cis.ligo.org'
re_channel = re.compile(r'[A-Z]\d:[a-zA-Z0-9]+' # core channel section L1:TEST
r'(?:[-_][a-zA-Z0-9_]+)?' # underscore-delimited parts
r'(?:\.[a-z]+)?' # trend type
r'(?:,[a-z-]+)?') # NDS channel type
# -- channel creation ---------------------------------------------------------
def _match(channel):
"""Find a matching channel in global memory variable
Parameters
----------
channel : `str`, `~gwpy.detector.Channel`
name of a channel
Returns
-------
found : `Channel`
matching channel from global memory or an empty list
"""
channel = Channel(channel)
name = str(channel)
type_ = channel.type
found = globalv.CHANNELS.sieve(
name=name, type=type_, exact_match=True)
# if match, return now
if found:
return found
# if no matches, try again without matching type
found = globalv.CHANNELS.sieve(name=name, exact_match=True)
if len(found) == 1:
# found single match that is less specific, so we make it more
# specific. If someone else wants another type for the sme channel
# this is fine, and it will create another channel.
found[0].type = type_
return found
def _find_parent(channel):
"""Find the parent for a given channel
This is either the raw channel from which a trend was generated, or the
real channel for a mathematically-manipulated channel.
Parameters
----------
channel : `~gwpy.detector.Channel`
trend channel or mathematically-manipulated channel
Returns
-------
parent_channel : `~gwpy.detector.Channel`
the raw channel from the trend or real channel from manipulated channel
Raises
------
ValueError
if the parent cannot be parsed
"""
# If the channel is a trend, then get the part of the name before the dot
if channel.trend:
parent = str(channel).rsplit('.')[0]
else:
parent, = re_channel.findall(str(channel))
# If the parent and input channel are the same, then no parent is found
if parent == str(channel):
raise ValueError(f"Cannot find parent for '{str(channel)}'")
parent_channel = get_channel(parent)
return parent_channel
def _update_dependent(channel):
"""Update a trend channel from its parent
TODO: what does this mean?
Parameters
----------
channel : `~gwpy.detector.Channel`
Returns
-------
channel : `~gwpy.detector.Channel`
"""
try:
source = _find_parent(channel)
except (ValueError, IndexError):
return channel
if source is channel:
return channel
channel.url = source.url
channel.unit = source.unit
# copy custom named params
# this works because the upstream Channel stores all
# attributes in private variable names
# NOTE: we need to exclude 'resample' to not attempt to upsample trends
for param in filter(
lambda x: not x.startswith('_') and
not hasattr(channel, x) and x not in ('resample',),
vars(source)):
try:
setattr(channel, param, getattr(source, param))
except AttributeError:
pass
return channel
def _with_update_dependent(func):
"""Decorate ``func`` to call `_update_dependent()` upon exit
"""
@wraps(func)
def wrapped_func(*args, **kwargs):
_update = kwargs.pop('find_parent', True)
out = func(*args, **kwargs)
if _update and out.trend:
out = _update_dependent(out)
return out
return wrapped_func
def _new(channel, find_parent=True):
"""Create a new `~gwpy.detector.Channel` in the globalv cache.
Parameters
----------
channel : `str`, `~gwpy.detector.Channel`
channel string or object to create in the global memory
find_parent : `bool`, optional, default: `True`
query for raw version of trend channel (trends not in CIS)
Returns
-------
Channel : `~gwpy.detector.Channel`
newly created channel
Raises
------
RuntimeError
"""
# convert to Channel
if isinstance(channel, Channel):
new = channel
else:
new = Channel(channel)
name = str(channel)
type_ = new.type
# work out what kind of channel it is
parts = re_channel.findall(name)
# match single raw channel for LIGO
if (
len(parts) == 1 and
new.ifo in ('H1', 'L1') and
not re.search(r'\.[a-z]+\Z', name)
):
new.url = '%s/channel/byname/%s' % (CIS_URL, str(new))
# match single trend
elif len(parts) == 1:
# set default trend type based on mode
if type_ is None and ':DMT-' in name: # DMT is always m-trend
new.type = 'm-trend'
# match parameters from 'raw' version of this channel
# match composite channel
else:
new.subchannels = parts
new._ifo = "".join(set(p.ifo for p in map(Channel, parts) if p.ifo))
if find_parent:
_update_dependent(new)
# store new channel and return
globalv.CHANNELS.append(new)
try:
return get_channel(new)
except RuntimeError as e:
if 'maximum recursion depth' in str(e):
raise RuntimeError("Recursion error while accessing channel "
"information for %s" % str(channel))
raise
[docs]
@_with_update_dependent
def get_channel(channel, find_parent=True, timeout=5):
"""Find (or create) a :class:`~gwpy.detector.Channel`.
If ``channel`` has already been created, the cached copy will be
returned, otherwise a new `~gwpy.detector.Channel` object will be created.
Parameters
----------
channel : `str`
name of new channel
find_parent : `bool`, optional, default: `True`
query for raw version of trend channel (trends not in CIS)
timeout : `float`, optional, default: `5`
number of seconds to wait before connection times out
Returns
-------
Channel : :class:`~gwpy.detector.Channel`
new channel.
"""
chans = re_channel.findall(str(channel))
nchans = len(chans)
# match compound channel name
if nchans > 1 or (nchans == 1 and chans[0] != str(channel)):
name = str(channel)
# handle special characters in channel name
rename = name
for cchar in ['+', '*', '^', '|']:
rename = rename.replace(cchar, r'\%s' % cchar)
found = globalv.CHANNELS.sieve(name=rename, exact_match=True)
# match normal channel
else:
found = _match(channel)
# if single match, return it
if len(found) == 1:
return found[0]
# if multiple matches raise error (unless weird circumstances)
elif len(found) > 1:
cstrings = set(['%s [%s, %s]' % (c.ndsname, c.sample_rate, c.unit)
for c in found])
if len(cstrings) == 1: # somehow all channels are the same
return found[0]
raise ValueError("Ambiguous channel request '%s', multiple "
"existing channels recovered:\n %s"
% (str(channel), '\n '.join(cstrings)))
# otherwise there were no matches, so we create a new channel
return _new(channel, find_parent=find_parent)
[docs]
def get_channels(channels, **kwargs):
"""Find (or create) multiple channels calling get_channel()
Parameters
----------
channels : `list`
list of channels as `str` or `~gwpy.detector.Channel` objects
**kwargs
keyword arguments applied to each channel in the list
Returns
-------
ChannelList : `~gwpy.detector.ChannelList`
a list of channels
See Also
--------
get_channel
"""
return ChannelList(get_channel(c, **kwargs) for c in channels)
# -- channel manipulation -----------------------------------------------------
[docs]
def update_missing_channel_params(channel, **kwargs):
"""Update empty channel parameters using the given input
This method will only set parameters in the channel if the target
parameter is `None`.
Parameters
----------
channel : `~gwpy.detector.Channel`
channel to update
**kwargs
`(key, value)` pairs to set
Returns
-------
target : `~gwpy.detector.Channel`
the channel after updating parameters
"""
target = get_channel(str(channel))
if isinstance(channel, Channel):
for param in ['unit', 'sample_rate', 'frametype']:
if getattr(target, param) is None or (
param == 'unit' and
getattr(target, param) is Unit('undef')):
setattr(target, param, getattr(channel, param))
for param in kwargs:
if getattr(target, param) is None:
setattr(target, param, kwargs[param])
return target
[docs]
def update_channel_params():
"""Update the `globalv.CHANNELS` list based on internal parameter changes
This is required to update `Channel.type` based on `Channel.frametype`,
and similar.
"""
for c in globalv.CHANNELS:
# update type based on frametype
if c.type is None and c.frametype == '{0.ifo}_M'.format(c):
c.type = 'm-trend'
elif c.type is None and c.frametype == '{0.ifo}_T'.format(c):
c.type = 's-trend'
# update sample_rate based on trend type
if c.type == 'm-trend' and c.sample_rate is None:
c.sample_rate = 1/60.
elif c.type == 's-trend' and c.sample_rate is None:
c.sample_rate = 1.
return
# -- string parsing -----------------------------------------------------------
[docs]
def split(channelstring):
"""Split a comma-separated list of channels that may, or may not
contain NDS2 channel types as well
Parameters
----------
channelstring : `str`
comma-separated string of channels
Returns
-------
out : `list`
list of strings for each channel
"""
out = []
channelstring = re_quote.sub('', channelstring)
while True:
channelstring = channelstring.strip('\' \n')
if ',' not in channelstring:
break
# check for complete line without NDS type
line = channelstring.split('\n')[0].rstrip('\', \n')
if ',' not in line:
try:
channelstring = channelstring.split('\n', 1)[1]
except IndexError:
pass
else:
out.append(line)
continue
# check for channel name with optional nds type
for nds2type in NDS2_TYPES + ['']:
if nds2type and ',%s' % nds2type in channelstring:
try:
channel, ctype, channelstring = channelstring.split(',', 2)
except ValueError:
channel, ctype = channelstring.split(',')
channelstring = ''
out.append('%s,%s' % (channel, ctype))
break
elif nds2type == '' and ',' in channelstring:
channel, channelstring = channelstring.split(',', 1)
out.append(channel)
break
if channelstring:
out.append(channelstring)
return out
[docs]
def split_combination(channelstring):
"""Split a math-combination of channels
Parameters
----------
channelstring : `str`
Returns
-------
ChannelList : `~gwpy.detector.ChannelList`
"""
return get_channels(re_channel.findall(str(channelstring)),
find_parent=False)