Source code for gwsumm.data.timeseries

# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm.  If not, see <http://www.gnu.org/licenses/>.

"""Utilities for data handling and display
"""

import os
import re
import operator
import warnings
from time import sleep
from functools import reduce
from math import (floor, ceil)
from urllib.parse import urlparse
from collections import OrderedDict
from configparser import (NoSectionError, NoOptionError)

from astropy import units

import gwdatafind

from gwpy.io import nds2 as io_nds2
from gwpy.io.cache import (file_segment, cache_segments)
from gwpy.io.gwf import data_segments
from gwpy.segments import (Segment, SegmentList)
from gwpy.timeseries import (TimeSeriesList, TimeSeriesDict,
                             StateVector, StateVectorList, StateVectorDict)
from gwpy.timeseries.io.gwf import get_default_gwf_api
from gwpy.utils.mp import multiprocess_with_queues

from .. import globalv
from ..utils import vprint
from ..config import GWSummConfigParser
from ..channels import (get_channel, update_missing_channel_params,
                        split_combination as split_channel_combination,
                        update_channel_params)
from .utils import (use_configparser, use_segmentlist, make_globalv_key)
from .mathutils import get_with_math

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

warnings.filterwarnings("ignore", "LAL has no unit corresponding")

OPERATOR = {
    '*': operator.mul,
    '-': operator.sub,
    '+': operator.add,
    '/': operator.truediv,
}

FRAMETYPE_REGEX = {
    'commissioning': re.compile(
        r'[A-Z][0-9]_C\Z'),
    'raw data': re.compile(
        r'([A-Z][0-9]_R\Z|raw)'),
    'second-trend': re.compile(
        r'[A-Z][0-9]_T\Z'),
    'minute-trend': re.compile(
        r'[A-Z][0-9]_M\Z'),
    'low-latency h(t)': re.compile(
        r'([A-Z][0-9]_DMT_C00\Z|[A-Z][0-9]_llhoft)'),
    'calibrated h(t) version 0': re.compile(
        r'[A-Z][0-9]_HOFT_C00\Z'),
    'calibrated h(t) version 1': re.compile(
        r'([A-Z][0-9]_HOFT_C01|G1_RDS_C01_L3)\Z'),
    'calibrated h(t) version 2': re.compile(
        r'[A-Z][0-9]_HOFT_C02\Z'),
    'DMT SenseMon on GDS h(t)': re.compile(
        r'SenseMonitor_hoft_[A-Z][0-9]_M\Z'),
    'DMT SenseMon on front-end h(t)': re.compile(
        r'SenseMonitor_CAL_[A-Z][0-9]_M\Z'),
}

# list of GWF frametypes that contain only ADC channels
# (allows big memory/time savings when reading with frameCPP)
try:
    GWF_API = get_default_gwf_api()
except ImportError:
    GWF_API = None

# frameCPP I/O optimisations
ADC_TYPES = {
    'R', 'C',  # old LIGO raw and commissioning types
    'T', 'M',  # old LIGO trend types
    'H1_R', 'H1_C', 'L1_R', 'L1_C',  # new LIGO raw and commissioning types
    'H1_T', 'H1_M', 'L1_T', 'L1_M',  # new LIGO trend types
    'K1_R', 'K1_C', 'K1_T', 'K1_M',  # KAGRA commissioning and trend types
    'raw',  # Virgo raw type
}

SHORT_HOFT_TYPES = {  # map aggregated h(t) type to short h(t) type
    'H1_HOFT_C00': 'H1_llhoft',
    'L1_HOFT_C00': 'L1_llhoft',
    'HoftOnline': 'V1_llhoft',
    'V1Online': 'V1_llhoft',
    'H1_HOFT_TEST': 'H1_DMT_TEST',
    'L1_HOFT_TEST': 'L1_DMT_TEST',
}

VIRGO_HOFT_CHANNELS = {
    "V1:Hrec_hoft_16384Hz",
    "V1:DQ_ANALYSIS_STATE_VECTOR",
}


# -- utilities ----------------------------------------------------------------

re_gwf_gps_epoch = re.compile(r'[-\/](?P<gpsepoch>\d+)$')


def _urlpath(url):
    return urlparse(url).path


[docs] def sieve_cache(cache, ifo=None, tag=None, segment=None): def _sieve(url): try: uifo, utag, useg = gwdatafind.utils.filename_metadata(url) except (AttributeError, TypeError): # CacheEntry uifo = url.observatory utag = url.description useg = url.segment if segment is not None: # cast type to prevent TypeErrors useg = type(segment)(*useg) return ( (ifo is None or uifo == ifo) and (tag is None or utag == tag) and (segment is None or useg.intersects(segment)) ) return type(cache)(filter(_sieve, cache))
[docs] @use_configparser def find_frames(ifo, frametype, gpsstart, gpsend, config=GWSummConfigParser(), urltype='file', gaps='warn', onerror='raise'): """Query the datafind server for GWF files for the given type Parameters ---------- ifo : `str` prefix for the IFO of interest (either one or two characters) frametype : `str` name of the frametype to find gpsstart : `int` GPS start time of the query gpsend : `int` GPS end time of the query config : `~ConfigParser.ConfigParser`, optional configuration with `[datafind]` section containing `server` specification, otherwise taken from the environment urltype : `str`, optional what type of file paths to return, default: `file` gaps : `str`, optional what to do when gaps are detected, one of - `ignore` : do nothing - `warn` : display the existence of gaps but carry on - `raise` : raise an exception onerror : `str`, optional what to do when the `gwdatafind` query itself fails, same options as for ``gaps`` Returns ------- cache : `list` of `str` a list of file paths pointing at GWF files matching the request """ vprint(' Finding %s-%s frames for [%d, %d)...' % (ifo[0], frametype, int(gpsstart), int(gpsend))) # find datafind host try: host = config.get('datafind', 'server') except (NoOptionError, NoSectionError): host = None # XXX HACK: LLO changed frame types on Dec 6 2013: LLOCHANGE = 1070291904 if re.match(r'L1_{CRMT}', frametype) and gpsstart < LLOCHANGE: frametype = frametype[-1] # query frames ifo = ifo[0].upper() gpsstart = int(floor(gpsstart)) gpsend = int(ceil(min(globalv.NOW, gpsend))) if gpsend <= gpsstart: return [] # parse match try: frametype, match = frametype.split('|', 1) except ValueError: match = None def _query(): return gwdatafind.find_urls(ifo[0].upper(), frametype, gpsstart, gpsend, urltype=urltype, on_gaps=gaps, match=match, host=host) try: cache = _query() except RuntimeError as e: sleep(1) try: cache = _query() except RuntimeError: if 'Invalid GPS times' in str(e): e.args = ('%s: %d ... %s' % (str(e), gpsstart, gpsend),) if onerror in ['ignore', None]: pass elif onerror in ['warn']: warnings.warn('Caught %s: %s' % (type(e).__name__, str(e))) else: raise cache = [] # XXX: if querying for day of LLO frame type change, do both if (ifo[0].upper() == 'L' and frametype in ['C', 'R', 'M', 'T'] and gpsstart < LLOCHANGE < gpsend): start = len(cache) and cache[-1].segment[1] or gpsstart if start < gpsend: cache.extend(gwdatafind.find_urls( ifo[0].upper(), 'L1_%s' % frametype, start, gpsend, urltype=urltype, on_gaps=gaps, host=host)[1:]) # extend cache beyond datafind's knowledge to reduce latency try: latest = cache[-1] ngps = len(re_gwf_gps_epoch.search( os.path.dirname(latest)).groupdict()['gpsepoch']) except (IndexError, AttributeError): pass else: while True: s, e = file_segment(latest) if s >= gpsend: break # replace GPS time of file basename new = latest.replace('-%d-' % s, '-%d-' % e) # replace GPS epoch in dirname new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps]) if os.path.isfile(new): cache.append(new) else: break # validate files existing and return cache = list(filter(os.path.exists, map(_urlpath, cache))) vprint(' %d found.\n' % len(cache)) return cache
[docs] def find_best_frames(ifo, frametype, start, end, **kwargs): """Find frames for the given type, replacing with a better type if needed """ # find cache for this frametype cache = find_frames(ifo, frametype, start, end, **kwargs) # check for gaps in current cache span = SegmentList([Segment(start, end)]) gaps = span - cache_segments(cache) # if gaps and using aggregated h(t), check short files if abs(gaps) and frametype in SHORT_HOFT_TYPES: f2 = SHORT_HOFT_TYPES[frametype] vprint(" Gaps discovered in aggregated h(t) type " "%s, checking %s\n" % (frametype, f2)) kwargs['gaps'] = 'ignore' cache.extend(filter(lambda e: file_segment(e) in gaps, find_frames(ifo, f2, start, end, **kwargs))) new = int(abs(gaps - cache_segments(cache))) if new: vprint(" %ss extra coverage with frametype %s\n" % (new, f2)) else: vprint(" No extra coverage with frametype %s\n" % f2) return cache, frametype
[docs] def find_frame_type(channel): """Find the frametype associated with the given channel If the input channel has a `frametype` attribute, that will be used, otherwise the frametype will be guessed based on the channel name and any trend options given """ if (channel.frametype is None and channel.type is None and channel.trend is not None): raise ValueError("Cannot automatically determine frametype for {0}, " "please manually select frametype or give NDS-style " "channel suffix".format(channel.name)) if channel.frametype is None: try: ndstype = io_nds2.Nds2ChannelType.find(channel.type) except (AttributeError, KeyError, ValueError): ndstype = None if ndstype == io_nds2.Nds2ChannelType.MTREND: ftype = 'M' elif ndstype == io_nds2.Nds2ChannelType.STREND: ftype = 'T' elif ndstype == io_nds2.Nds2ChannelType.RDS: ftype = 'LDAS_C02_L2' elif ndstype == io_nds2.Nds2ChannelType.ONLINE: ftype = 'lldetchar' else: ftype = 'R' if channel.ifo == 'C1': channel.frametype = ftype else: channel.frametype = '%s1_%s' % (channel.ifo[0], ftype) return channel.frametype
[docs] def frame_trend_type(ifo, frametype): """Returns the trend type of based on the given frametype """ if ifo == 'C1' and frametype == 'M': return 'minute' if re.match(r'(?:(.*)_)?[A-Z]\d_M', str(frametype)): return 'minute' if re.match(r'(?:(.*)_)?[A-Z]\d_T', str(frametype)): return 'second' return None
[docs] def get_channel_type(name): """Returns the probable type of this channel, based on the name Parameters ---------- name : `str` the name of the channel Returns ------- type : `str` one of ``'adc'``, ``'proc'``, or ``'sim'`` """ channel = get_channel(name) # find GEO h(t) channels if channel.ifo == 'G1' and channel.system in ['DER']: return 'proc' # find LIGO h(t) or LOSC channels if channel.ifo in ['L1', 'H1'] and channel.system in ['GDS', 'LOSC']: return 'proc' # default to ADC return 'adc'
[docs] def exclude_short_trend_segments(segments, ifo, frametype): """Remove segments from a list shorter than 1 trend sample """ frametype = frametype or '' trend = frame_trend_type(ifo, frametype) if trend == 'minute': mindur = 60. elif trend == 'second': mindur = 1. else: mindur = 0. return type(segments)([s for s in segments if abs(s) >= mindur])
[docs] def all_adc(cache): """Returns `True` if all cache entries point to GWF file known to contain only ADC channels This is useful to set `type='adc'` when reading with frameCPP, which can greatly speed things up. """ for path in cache: try: tag = os.path.basename(path).split('-')[1] except (AttributeError, TypeError): # CacheEntry tag = path.description path = path.path if not path.endswith('.gwf') or tag not in ADC_TYPES: return False return True
# -- data accessors -----------------------------------------------------------
[docs] @use_configparser @use_segmentlist def get_timeseries_dict(channels, segments, config=GWSummConfigParser(), cache=None, query=True, nds=None, nproc=1, frametype=None, statevector=False, return_=True, datafind_error='raise', **ioargs): """Retrieve the data for a set of channels Parameters ---------- channels : `list` of `str` or `~gwpy.detector.Channel` the channels you want to get segments : `~gwpy.segments.SegmentList` the data segments of interest config : `~gwsumm.config.GWSummConfigParser` the configuration for this analysis query : `bool`, optional whether you want to retrieve new data from the source if it hasn't been loaded already nds : `bool`, optional whether to try and use NDS2 for data access, default is to guess based on other arguments and the environment nproc : `int`, optional number of parallel cores to use for file reading, default: ``1`` frametype : `str`, optional` the frametype of the target channels, if not given, this will be guessed based on the channel name(s) statevector : `bool`, optional whether you want to load `~gwpy.timeseries.StateVector` rather than `~gwpy.timeseries.TimeSeries` data datafind_error : `str`, optional what to do in the event of a datafind error, one of - 'raise' : stop immediately upon error - 'warn' : print warning and continue as if no frames had been found - 'ignore' : print nothing and continue with no frames return_ : `bool`, optional whether you actually want anything returned to you, or you are just calling this function to load data for use later **ioargs all other keyword arguments are passed to the relevant data reading method (either `~gwpy.timeseries.TimeSeriesDict.read` or `~gwpy.timeseries.TimeSeriesDict.fetch` or state-vector equivalents) Returns ------- datalist : `dict` of `~gwpy.timeseries.TimeSeriesList` a set of `(channel, TimeSeriesList`) pairs """ # separate channels by type if query: if frametype is not None: frametypes = {(None, frametype): channels} else: frametypes = dict() allchannels = set([ c for group in map(split_channel_combination, channels) for c in group]) for channel in allchannels: channel = get_channel(channel) ifo = channel.ifo ftype = (None if cache and not channel.frametype else find_frame_type(channel)) id_ = (ifo, ftype) if id_ in frametypes: frametypes[id_].append(channel) else: frametypes[id_] = [channel] for ftype, channellist in frametypes.items(): _get_timeseries_dict(channellist, segments, config=config, cache=cache, query=query, nds=nds, nproc=nproc, frametype=ftype[1], statevector=statevector, return_=False, datafind_error=datafind_error, **ioargs) if not return_: return else: out = OrderedDict() for name in channels: channel = get_channel(name) out[channel.ndsname] = get_with_math( name, segments, _get_timeseries_dict, get_timeseries, config=config, query=False, statevector=statevector, **ioargs) return out
@use_segmentlist def _get_timeseries_dict(channels, segments, config=None, cache=None, query=True, nds=None, frametype=None, nproc=1, return_=True, statevector=False, archive=True, datafind_error='raise', dtype=None, **ioargs): """Internal method to retrieve the data for a set of like-typed channels using the :meth:`TimeSeriesDict.read` accessor. """ channels = list(map(get_channel, channels)) # set classes if statevector: ListClass = StateVectorList DictClass = StateVectorDict else: ListClass = TimeSeriesList DictClass = TimeSeriesDict # check we have a configparser if config is None: config = GWSummConfigParser() # read segments from global memory keys = dict((c.ndsname, make_globalv_key(c)) for c in channels) havesegs = reduce(operator.and_, (globalv.DATA.get(keys[channel.ndsname], ListClass()).segments for channel in channels)) new = segments - havesegs # read channel information filter_ = dict() resample = dict() dtype_ = dict() for channel in channels: name = str(channel) try: filter_[name] = channel.filter except AttributeError: pass try: resample[name] = float(channel.resample) except AttributeError: pass if channel.dtype is not None: dtype_[name] = channel.dtype elif dtype is not None: dtype_[name] = dtype # work out whether to use NDS or not if nds is None and cache is not None: nds = False elif nds is None: nds = 'LIGO_DATAFIND_SERVER' not in os.environ # read new data query &= (abs(new) > 0) if cache is not None: query &= len(cache) > 0 if query: for channel in channels: globalv.DATA.setdefault(keys[channel.ndsname], ListClass()) ifo = channels[0].ifo # open NDS connection if nds: if config.has_option('nds', 'host'): host = config.get('nds', 'host') port = config.getint('nds', 'port') ndsconnection = io_nds2.connect(host, port) else: ndsconnection = None frametype = source = 'nds' ndstype = channels[0].type # get NDS channel segments if ndsconnection is not None and ndsconnection.get_protocol() > 1: span = list(map(int, new.extent())) avail = io_nds2.get_availability( channels, *span, connection=ndsconnection ) new &= avail.intersection(avail.keys()) # or find frame type and check cache else: frametype = frametype or channels[0].frametype new = exclude_short_trend_segments(new, ifo, frametype) if cache is not None: fcache = sieve_cache(cache, ifo=ifo[0], tag=frametype) else: fcache = [] if (cache is None or len(fcache) == 0) and len(new): span = new.extent() fcache, frametype = find_best_frames( ifo, frametype, span[0], span[1], config=config, gaps='ignore', onerror=datafind_error) # parse discontiguous cache blocks and rebuild segment list new &= cache_segments(fcache) source = 'files' # if reading Virgo h(t) GWF data, filter out files that don't # contain the channel (Virgo state-vector only) _names = set(map(str, channels)) _virgohoft = _names.intersection(VIRGO_HOFT_CHANNELS) if _virgohoft: vprint(" Determining available segments for " "Virgo h(t) data...") new &= data_segments(fcache, _virgohoft.pop()) # set channel type if reading with frameCPP if fcache and all_adc(fcache): ioargs['type'] = 'adc' # store frametype for display in Channel Information tables for channel in channels: channel.frametype = frametype # check whether each channel exists for all new times already qchannels = [] for channel in channels: oldsegs = globalv.DATA.get(keys[channel.ndsname], ListClass()).segments if abs(new - oldsegs) != 0 and nds: qchannels.append(channel.ndsname) elif abs(new - oldsegs) != 0: qchannels.append(str(channel)) # loop through segments, recording data for each if len(new): vprint(" Fetching data (from %s) for %d channels [%s]:\n" % (source, len(qchannels), nds and ndstype or frametype or '')) vstr = " [{0[0]}, {0[1]})" for segment in new: # force reading integer-precision segments segment = type(segment)(int(segment[0]), int(segment[1])) if abs(segment) < 1: continue # reset to minute trend sample times if frame_trend_type(ifo, frametype) == 'minute': segment = Segment(*io_nds2.minute_trend_times(*segment)) if abs(segment) < 60: continue if nds: # fetch tsd = DictClass.fetch(qchannels, segment[0], segment[1], connection=ndsconnection, type=ndstype, verbose=vstr.format(segment), **ioargs) else: # read # NOTE: this sieve explicitly casts our segment to # ligo.segments.segment to prevent `TypeError` from # a mismatch with ligo.segments.segment segcache = sieve_cache(fcache, segment=segment) segstart, segend = map(float, segment) tsd = DictClass.read(segcache, qchannels, start=segstart, end=segend, nproc=nproc, verbose=vstr.format(segment), **ioargs) vprint(" post-processing...\n") # apply type casting (copy=False means same type just returns) for chan, ts in tsd.items(): tsd[chan] = ts.astype(dtype_.get(chan, ts.dtype), casting='unsafe', copy=False) # apply resampling tsd = resample_timeseries_dict(tsd, nproc=1, **resample) # post-process for c, data in tsd.items(): channel = get_channel(c) key = keys[channel.ndsname] if (key in globalv.DATA and data.span in globalv.DATA[key].segments): continue if data.unit is None: data.unit = 'undef' for i, seg in enumerate(globalv.DATA[key].segments): if seg in data.span: # new data completely covers existing segment # (and more), so just remove the old stuff globalv.DATA[key].pop(i) break elif seg.intersects(data.span): # new data extends existing segment, so only keep # the really new stuff data = data.crop(*(data.span - seg)) break # filter try: filt = filter_[str(channel)] except KeyError: pass else: data = filter_timeseries(data, filt) if isinstance(data, StateVector) or ':GRD-' in str(channel): data.override_unit(units.dimensionless_unscaled) if hasattr(channel, 'bits'): data.bits = channel.bits elif data.unit is None: data.override_unit(channel.unit) # update channel type for trends if data.channel.type is None and ( data.channel.trend is not None): if data.dt.to('s').value == 1: data.channel.type = 's-trend' elif data.dt.to('s').value == 60: data.channel.type = 'm-trend' # append and coalesce add_timeseries(data, key=key, coalesce=True) # rebuilt global channel list with new parameters update_channel_params() if not return_: return return locate_data(channels, segments, list_class=ListClass)
[docs] def locate_data(channels, segments, list_class=TimeSeriesList): """Find and return available (already loaded) data """ keys = dict((c.ndsname, make_globalv_key(c)) for c in channels) # return correct data out = OrderedDict() for channel in channels: data = list_class() if keys[channel.ndsname] not in globalv.DATA: out[channel.ndsname] = list_class() else: for ts in globalv.DATA[keys[channel.ndsname]]: for seg in segments: if abs(seg) == 0 or abs(seg) < ts.dt.value: continue if ts.span.intersects(seg): common = map(float, ts.span & seg) cropped = ts.crop(*common, copy=False) if cropped.size: data.append(cropped) out[channel.ndsname] = data.coalesce() return out
[docs] @use_segmentlist def get_timeseries(channel, segments, config=None, cache=None, query=True, nds=None, nproc=1, frametype=None, statevector=False, return_=True, datafind_error='raise', **ioargs): """Retrieve data for channel Parameters ---------- channel : `str` or `~gwpy.detector.Channel` the name of the channel you want segments : `~gwpy.segments.SegmentList` the data segments of interest config : `~gwsumm.config.GWSummConfigParser` the configuration for this analysis cache : `~glue.lal.Cache` or `list` of `str` a cache of data files from which to read query : `bool`, optional whether you want to retrieve new data from the source if it hasn't been loaded already nds : `bool`, optional whether to try and use NDS2 for data access, default is to guess based on other arguments and the environment nproc : `int`, optional number of parallel cores to use for file reading, default: ``1`` frametype : `str`, optional` the frametype of the target channels, if not given, this will be guessed based on the channel name(s) statevector : `bool`, optional whether you want to load `~gwpy.timeseries.StateVector` rather than `~gwpy.timeseries.TimeSeries` data datafind_error : `str`, optional what to do in the event of a datafind error, one of - 'raise' : stop immediately upon error - 'warn' : print warning and continue as if no frames had been found - 'ignore' : print nothing and continue with no frames return_ : `bool`, optional whether you actually want anything returned to you, or you are just calling this function to load data for use later **ioargs all other keyword arguments are passed to the relevant data reading method (either `~gwpy.timeseries.TimeSeries.read` or `~gwpy.timeseries.TimeSeries.fetch` or state-vector equivalents) Returns ------- data : `~gwpy.timeseries.TimeSeriesList` a list of `TimeSeries` """ channel = get_channel(channel) out = get_timeseries_dict([channel.ndsname], segments, config=config, cache=cache, query=query, nds=nds, nproc=nproc, frametype=frametype, statevector=statevector, return_=return_, datafind_error=datafind_error, **ioargs) if return_: return out[channel.ndsname] return
[docs] def add_timeseries(timeseries, key=None, coalesce=True): """Add a `TimeSeries` to the global memory cache Parameters ---------- timeseries : `TimeSeries` or `StateVector` the data series to add key : `str`, optional the key with which to store these data, defaults to the `~gwpy.timeseries.TimeSeries.name` of the series coalesce : `bool`, optional coalesce contiguous series after adding, defaults to `True` """ if timeseries.channel is not None: # transfer parameters from timeseries.channel to the globalv channel update_missing_channel_params(timeseries.channel) if key is None: key = timeseries.name or timeseries.channel.ndsname if isinstance(timeseries, StateVector): globalv.DATA.setdefault(key, StateVectorList()) else: globalv.DATA.setdefault(key, TimeSeriesList()) globalv.DATA[key].append(timeseries) if coalesce: globalv.DATA[key].coalesce()
[docs] def resample_timeseries_dict(tsd, nproc=1, **sampling_dict): """Resample a `TimeSeriesDict` Parameters ---------- tsd : `~gwpy.timeseries.TimeSeriesDict` the input dict to resample nproc : `int`, optional the number of parallel processes to use **sampling_dict ``<name>=<sampling frequency>`` pairs defining new sampling frequencies for keys of ``tsd`` Returns ------- resampled : `~gwpy.timeseries.TimeSeriesDict` a new dict with the keys from ``tsd`` and resampled values, if that key was included in ``sampling_dict``, or the original value """ # define resample function (must take single argument) def _resample(args): ts, fs = args if fs and units.Quantity(fs, "Hz") == ts.sample_rate: warnings.warn( "requested resample rate for {0} matches native rate ({1}), " "please update configuration".format(ts.name, ts.sample_rate), UserWarning, ) elif fs: return ts.resample(fs, ftype='fir', window='hamming') return ts # group timeseries with new sampling frequencies inputs = ((ts, sampling_dict.get(name)) for name, ts in tsd.items()) # apply resampling resampled = multiprocess_with_queues(nproc, _resample, inputs) # map back to original dict keys return dict(zip(list(tsd), resampled))
[docs] def filter_timeseries(ts, filt): """Filter a `TimeSeris` using a function or a ZPK definition. """ # filter with function if callable(filt): try: return filt(ts) except TypeError as e: if 'Can only apply' in str(e): # units error ts.value[:] = filt(ts.value) return ts else: raise # filter with gain else: return ts.filter(*filt)