Source code for gwsumm.archive

# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm.  If not, see <http://www.gnu.org/licenses/>.

"""This module handles HDF archiving of data.

In production for LIGO, the LIGO Summary Pages (LSP) Service runs at regular
intervals (about every 10 minutes), so an HDF5 file is used to archive the
data read and produced from one instance so that the next instance doesn't
have to re-read and re-produce the same data.

All data products are stored just using the 'standard' gwpy `.write()` method
for that object.
"""

import tempfile
import shutil
import warnings
import re
import datetime
import os

from numpy import (unicode_, ndarray)
from h5py import File

from astropy.table import Table

from gwpy.time import (from_gps, to_gps)
from gwpy.timeseries import (StateVector, TimeSeries)
from gwpy.spectrogram import Spectrogram
from gwpy.segments import (SegmentList, Segment, DataQualityFlag)

from . import (globalv, mode)
from .channels import get_channel
from .data import (add_timeseries, add_spectrogram,
                   add_coherence_component_spectrogram)
from .triggers import (EventTable, add_triggers)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

re_rate = re.compile('_EVENT_RATE_')


[docs] def write_data_archive(outfile, channels=True, timeseries=True, spectrogram=True, segments=True, triggers=True): """Build and save an HDF archive of data processed in this job. Parameters ---------- outfile : `str` path to target HDF5 file timeseries : `bool`, optional include `TimeSeries` data in archive spectrogram : `bool`, optional include `Spectrogram` data in archive segments : `bool`, optional include `DataQualityFlag` data in archive triggers : `bool`, optional include `EventTable` data in archive """ # Initialize 'temp_outfile' with None to prevent potential # deletion errors in case of a temporary file creation failure. temp_outfile = None try: # Create a temporary file that won't be automatically deleted # as this is opened again with the hdf5 file handler. # Creating a temporary file for the archive data avoids # overwriting the existing backup. with tempfile.NamedTemporaryFile(prefix="gw_summary_archive_", delete=False, dir=None, suffix=".h5", ) as temp_file: temp_outfile = temp_file.name with File(temp_outfile, 'w') as h5file: # -- channels ----------------------- if channels and globalv.CHANNELS: cols = ('name', 'sample_rate', 'frametype', 'unit') rows = [] for chan in globalv.CHANNELS: rows.append(( chan.ndsname, chan.sample_rate.to('Hz').value if chan.sample_rate is not None else 0, str(getattr(chan, 'frametype', None)) or '', str(chan.unit) if chan.unit else '', )) Table(names=cols, rows=rows).write(h5file, 'channels') # -- timeseries --------------------- if timeseries: tgroup = h5file.create_group('timeseries') sgroup = h5file.create_group('statevector') # loop over channels for c, tslist in globalv.DATA.items(): c = get_channel(c) # ignore trigger rate TimeSeries if re_rate.search(str(c)): continue # loop over time-series for ts in tslist: # ignore fast channels who weren't used # for a timeseries: if (not isinstance(ts, StateVector) and ts.sample_rate.value > 16.01 and not getattr(c, '_timeseries', False)): continue # archive timeseries try: name = '%s,%s,%s' % (ts.name, ts.channel.ndsname, ts.t0.value) except AttributeError: name = '%s,%s' % (ts.name, ts.t0.value) if isinstance(ts, StateVector): group = sgroup else: group = tgroup _write_object(ts, group, path=name, format='hdf5') # -- spectrogram -------------------- if spectrogram: for tag, gdict in zip( ['spectrogram', 'coherence-components'], [globalv.SPECTROGRAMS, globalv.COHERENCE_COMPONENTS]): group = h5file.create_group(tag) # loop over channels for key, speclist in gdict.items(): # loop over time-series for spec in speclist: name = '%s,%s' % (key, spec.t0.value) _write_object(spec, group, path=name, format='hdf5') # -- segments ----------------------- if segments: group = h5file.create_group('segments') # loop over channels for name, dqflag in globalv.SEGMENTS.items(): dqflag.write(group, path=name, format='hdf5') # -- triggers ----------------------- if triggers: group = h5file.create_group('triggers') for key in globalv.TRIGGERS: archive_table(globalv.TRIGGERS[key], key, group) # -- file corruption check ---------- # Make sure that the saved file is not corrupted by trying to read # all the items in the data. # simple lambda function here to do nothing but visit each item. h5file.visititems(lambda name, obj: None) # moves the new file to the backup directory shutil.move(temp_outfile, outfile) # Changes permission to allow reading from any user os.chmod(outfile, 0o664) except Exception: # if it fails for any reason, raise and continue raise finally: # Delete the temporary file if saving encountered an error and # it wasn't moved if temp_outfile is not None and os.path.isfile(temp_outfile): os.remove(temp_outfile)
[docs] def read_data_archive(sourcefile, rm_source_on_fail=True): """Read archived data from an HDF5 archive source This method reads all found data into the data containers defined by the `gwsumm.globalv` module, then returns nothing. Parameters ---------- sourcefile : `str` path to source HDF5 file rm_source_on_fail : `bool`, optional remove the source HDF5 file if there was an OSError when opening the file """ # Check that the HDF5 source file is able to be opened. # A common failure mode is that the HDF5 file is corrupted and this brings # down the whole workflow, requiring manual intervention. Here, we attempt # to automatically catch a common failure try: h5file = File(sourcefile, 'r') except FileNotFoundError: raise except OSError as exc: # file is corrupt, so we remove it to start fresh if not rm_source_on_fail: raise warnings.warn(f"failed to read {sourcefile} [{exc}], removing...") os.remove(sourcefile) return with File(sourcefile, 'r') as h5file: # Make sure that each part of the archive file is not corrupted by # trying to read the data. If any part is broken, delete the file and # return without loading anything into the gwsumm.globalv variables try: # simple lambda function here to do nothing but visit each item h5file.visititems(lambda name, obj: None) except RuntimeError as exc: if not rm_source_on_fail: raise warnings.warn(f"failed to read {sourcefile} [{exc}], removing...") os.remove(sourcefile) return # -- channels --------------------------- try: ctable = Table.read(h5file['channels']) except KeyError: # no channels table written pass else: for row in ctable: chan = get_channel(row['name']) for p in ctable.colnames[1:]: if row[p]: setattr(chan, p, row[p]) # -- timeseries ------------------------- for dataset in h5file.get('timeseries', {}).values(): ts = TimeSeries.read(dataset, format='hdf5') if (re.search(r'\.(rms|min|mean|max|n)\Z', ts.channel.name) and ts.sample_rate.value == 1.0): ts.channel.type = 's-trend' elif re.search(r'\.(rms|min|mean|max|n)\Z', ts.channel.name): ts.channel.type = 'm-trend' ts.channel = get_channel(ts.channel) try: add_timeseries(ts, key=ts.channel.ndsname) except ValueError: if mode.get_mode() != mode.Mode.day: raise warnings.warn('Caught ValueError in combining daily archives') # get end time globalv.DATA[ts.channel.ndsname].pop(-1) t = globalv.DATA[ts.channel.ndsname][-1].span[-1] add_timeseries(ts.crop(start=t), key=ts.channel.ndsname) # -- statevector -- --------------------- for dataset in h5file.get('statevector', {}).values(): sv = StateVector.read(dataset, format='hdf5') sv.channel = get_channel(sv.channel) add_timeseries(sv, key=sv.channel.ndsname) # -- spectrogram ------------------------ for tag, add_ in zip( ['spectrogram', 'coherence-components'], [add_spectrogram, add_coherence_component_spectrogram]): for key, dataset in h5file.get(tag, {}).items(): key = key.rsplit(',', 1)[0] spec = Spectrogram.read(dataset, format='hdf5') spec.channel = get_channel(spec.channel) add_(spec, key=key) # -- segments --------------------------- for name, dataset in h5file.get('segments', {}).items(): dqflag = DataQualityFlag.read(h5file, path=dataset.name, format='hdf5') globalv.SEGMENTS += {name: dqflag} # -- triggers --------------------------- for dataset in h5file.get('triggers', {}).values(): load_table(dataset)
[docs] def find_daily_archives(start, end, ifo, tag, basedir=os.curdir): """Find the daily archives spanning the given GPS [start, end) interval Parameters ---------- start : `float`, `~datetime.datetime`, `~astropy.time.Time`, `str` start time of the archive file to find, any object that can be converted into a `LIGOTimeGPS`, `~astropy.time.Time`, or `~datetime.datetime` is acceptable end : `float`, `~datetime.datetime`, `~astropy.time.Time`, `str` end time of the archive file to find, any object that can be converted into a `LIGOTimeGPS`, `~astropy.time.Time`, or `~datetime.datetime` is acceptable ifo : `str` interferometer string, ex. 'H1' tag : `str` tag string for the archive file basedir : `path-like`, optional base path to archive files, default: '.' Returns ------- archives : `list` list of matching archive files Notes ----- This will only search the day directories with the format `YYYYMMDD` """ archives = [] # Convert start and end to properly formatted datetime objects s = from_gps(to_gps(start)) e = from_gps(to_gps(end)) # append to the archives list as long as the iterating start time is # earlier than the end time while s < e: daybase = mode.get_base(s, mode=mode.Mode.day) ds = to_gps(s) s += datetime.timedelta(days=1) de = to_gps(s) archivedir = os.path.join(basedir, daybase, 'archive') arch = os.path.join(archivedir, f'{ifo}-{tag}-{ds}-{de-ds}.h5') if os.path.isfile(arch): archives.append(arch) return archives
# -- utility methods -------------------------------------------------------- def _write_object(data, *args, **kwargs): """Internal method to write something to HDF5 with error handling Parameters ---------- data : gwpy object TimeSeries or FrequencySeries args : passed to underlying write method kwargs : passed to underlying write method Warns ----- TypeError ValueError RuntimeError If the name already exists in the HDF5 file Raises ------ RuntimeError Any error other than the object name already exists in the HDF5 file """ try: return data.write(*args, **kwargs) except (TypeError, ValueError) as e: warnings.warn(str(e)) except RuntimeError as e: if 'name already exists' in str(e).lower(): warnings.warn(str(e)) else: raise
[docs] def segments_to_array(segmentlist): """Convert a `SegmentList` to a 2-dimensional `numpy.ndarray` Parameters ---------- segmentlist : `~gwpy.segments.SegmentList` input segment list to convert Returns ------- out : `float` `numpy.ndarray` output segment list as a numpy array """ out = ndarray((len(segmentlist), 2), dtype=float) for i, seg in enumerate(segmentlist): out[i] = seg return out
[docs] def segments_from_array(array): """Convert a 2-dimensional `numpy.ndarray` to a `SegmentList` Parameters ---------- array : `float` `numpy.ndarray` input numpy array to convert into a segment list Returns ------- out : `~gwpy.segments.SegmentList` output segment list """ out = SegmentList() for row in array: out.append(Segment(*row)) return out
[docs] def archive_table(table, key, parent): """Add a table to the given HDF5 group .. warning:: If the input ``table`` is empty, it will not be archived Parameters ---------- table : `~astropy.table.Table` the data to archive key : `str` the path (relative to ``parent``) at which to store the table parent : `h5py.Group` the h5py group in which to add this dataset """ # copy table so we can modify meta without impacting anything table = table.copy(copy_data=False) if len(table) == 0: warnings.warn("%r table is empty and will not be archived" % key) return table.meta.pop('psd', None) # pycbc_live table.meta.pop('loudest', None) # pycbc_live try: table.meta['segments'] = segments_to_array(table.meta['segments']) except KeyError: pass for col in table.columns: if table[col].dtype.type is unicode_: table.replace_column(col, table[col].astype(str)) table.write(parent, path=key, format='hdf5') return key
[docs] def load_table(dataset): """Read table from the given HDF5 group The `EventTable` is read, stored in the memory archive, then returned Parameters ---------- dataset : `h5py.Dataset` n-dimensional table to load from hdf5 Returns ------- table : `~gwpy.table.EventTable` the table of events loaded from hdf5 """ table = EventTable.read(dataset, format='hdf5') try: table.meta['segments'] = segments_from_array(table.meta['segments']) except KeyError: table.meta['segments'] = SegmentList() add_triggers(table, dataset.name.split('/')[-1]) return table