# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2013)
#
# This file is part of GWSumm.
#
# GWSumm is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWSumm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWSumm. If not, see <http://www.gnu.org/licenses/>.
"""Utilities for GWSumm
"""
import os
import re
import sys
from socket import getfqdn
# import filter evals
from math import pi # noqa: F401
import numpy # noqa: F401
from . import globalv
re_cchar = re.compile(r"[\W_]+")
re_quote = re.compile(r'^[\s\"\']+|[\s\"\']+$')
re_flagdiv = re.compile(r"(&|!=|!|\|)")
# define some colours
WARNC = r'\033[93m'
ERRC = r'\033[91m'
ENDC = r'\033[0m'
# bad things to eval
UNSAFE_EVAL_STRS = [r'os\.(?![$\'\" ])', 'shutil', r'\.rm', r'\.mv']
UNSAFE_EVAL = re.compile(r'(%s)' % '|'.join(UNSAFE_EVAL_STRS))
# -- utilities ----------------------------------------------------------------
[docs]
def elapsed_time():
"""Return the time (seconds) since this job started
"""
import time
return time.time() - globalv.START
[docs]
def vprint(message, verbose=True, stream=sys.stdout, profile=True):
"""Prints the given message to the stream.
Parameters
----------
message : `str`
string to print
verbose : `bool`, optional, default: `True`
flag to print or not, default: print
stream : `file`, optional, default: `stdout`
file object stream in which to print, default: stdout
profile : `bool`, optional, default: `True`
flag to print timestamp for debugging and profiling purposes
"""
if stream != sys.stderr:
profile &= globalv.PROFILE
verbose &= globalv.VERBOSE
if profile and message.endswith("\n"):
message = "%s (%.2f)\n" % (message.rstrip("\n"), elapsed_time())
if verbose:
stream.write(message)
stream.flush()
[docs]
def mkdir(*paths):
"""Conditional mkdir operation, for convenience
"""
for path in paths:
path = os.path.normpath(path)
if not os.path.isdir(path):
os.makedirs(path)
[docs]
def nat_sorted(iterable, key=None):
"""Sorted a list in the way that humans expect.
Parameters
----------
iterable : `iterable`
iterable to sort
key : `callable`
sorting key
Returns
-------
lsorted : `list`
sorted() version of input ``l``
"""
k = key and list(map(key, iterable)) or iterable
def convert(text):
if text.isdigit():
return int(text)
else:
return text
def alphanum_key(key):
return [convert(c) for c in re.split(
'([0-9]+)', k[iterable.index(key)])]
return sorted(iterable, key=alphanum_key)
_re_odc = re.compile('(OUTMON|OUT_DQ|LATCH)')
[docs]
def get_odc_bitmask(odcchannel):
return _re_odc.sub('BITMASK', str(odcchannel))
[docs]
def safe_eval(val, strict=False, globals_=None, locals_=None):
"""Evaluate the given string as a line of python, if possible
If the :meth:`eval` fails, a `str` is returned instead, unless
`strict=True` is given.
Parameters
----------
val : `str`
input text to evaluate
strict : `bool`, optional, default: `False`
raise an exception when the `eval` call fails (`True`) otherwise
return the input as a `str` (`False`, default)
globals_ : `dict`, optional
dict of global variables to pass to `eval`, defaults to current
`globals`
locals_ : `dict`, optional
dict of local variables to pass to `eval`, defaults to current
`locals`
.. note::
Note the trailing underscore on the `globals_` and `locals_`
kwargs, this is required to not clash with the builtin `globals`
and `locals` methods`.
Raises
------
ValueError
if the input string is considered unsafe to evaluate, normally
meaning it contains something that might interact with the filesystem
(e.g. `os.path` calls)
NameError
SyntaxError
if the input cannot be evaluated, and `strict=True` is given
See also
--------
eval
for more documentation on the underlying evaluation method
"""
# don't evaluate non-strings
if not isinstance(val, str):
return val
# check that we aren't evaluating something dangerous
try:
match = UNSAFE_EVAL.search(val).group()
except AttributeError:
pass
else:
raise ValueError("Will not evaluate string containing %r: %r"
% (match, val))
# format args for eval
if globals_ is None and locals_ is None:
args = ()
elif globals_ is None and locals_ is not None:
args = (globals(), locals_)
elif locals_ is None and globals_ is not None:
args = (globals_,)
else:
args = (globals_, locals_)
# try and eval str
try:
return eval(val, *args)
except (NameError, SyntaxError):
return str(val)
# -- IFO parsing --------------------------------------------------------------
OBSERVATORY_MAP = {
'G1': 'GEO',
'H1': 'LIGO Hanford',
'K1': 'KAGRA',
'L1': 'LIGO Livingston',
'V1': 'Virgo',
}
[docs]
def get_default_ifo(fqdn=getfqdn()):
"""Find the default interferometer prefix (IFO) for the given host
Parameters
----------
fqdn : `str`
the fully-qualified domain name (FQDN) of the host on which you
wish to find the default IFO
Returns
-------
IFO : `str`
the upper-case X1-style prefix for the default IFO, if found, e.g. `L1`
Raises
------
ValueError
if not default interferometer prefix can be parsed
"""
if '.uni-hannover.' in fqdn or '.atlas.' in fqdn:
return 'G1'
elif '.ligo-wa.' in fqdn:
return 'H1'
elif '.ligo-la.' in fqdn:
return 'L1'
elif '.virgo.' in fqdn or '.ego-gw.' in fqdn:
return 'V1'
raise ValueError("Cannot determine default IFO for host %r" % fqdn)