# -*- coding: utf-8 -*-
"""
Toolbox for SCS.
Various utilities function to quickly process data measured at the SCS
instruments.
Copyright (2019) SCS Team.
"""
import logging
import os
import numpy as np
import xarray as xr
import extra_data as ed
from extra_data.read_machinery import find_proposal
from .constants import mnemonics as _mnemonics
from .mnemonics_machinery import mnemonics_for_run
from .util.exceptions import ToolBoxValueError
import toolbox_scs.detectors as tbdet
from .misc.bunch_pattern import (npulses_has_changed,
get_unique_sase_pId, load_bpt)
__all__ = [
'concatenateRuns',
'find_run_path',
'get_array',
'load',
'open_run',
'run_by_path',
'load_run_values',
'check_data_rate',
]
log = logging.getLogger(__name__)
[docs]def load(proposalNB=None, runNB=None,
fields=None,
data='all',
display=False,
validate=False,
subset=None,
rois={},
extract_digitizers=True,
extract_xgm=True,
extract_bam=True,
bunchPattern='sase3',
parallelize=True,
):
"""
Load a run and extract the data. Output is an xarray with aligned
trainIds.
Parameters
----------
proposalNB: str, int
proposal number e.g. 'p002252' or 2252
runNB: str, int
run number as integer
fields: str, list of str, list of dict
list of mnemonics to load specific data such as "fastccd",
"SCS_XGM", or dictionnaries defining a custom mnemonic such as
{"extra": {'source': 'SCS_CDIFFT_MAG/SUPPLY/CURRENT',
'key': 'actual_current.value',
'dim': None}}
data: str or Sequence of str
'raw', 'proc' (processed), or any other location relative to the
proposal path with data per run to access. May also be ‘all’
(both ‘raw’ and ‘proc’) or a sequence of strings to load data
from several locations, with later locations overwriting sources
present in earlier ones. The default is 'raw'.
display: bool
whether to show the run.info or not
validate: bool
whether to run extra-data-validate or not
subset: slice or extra_data.by_index or numpy.s_
a subset of train that can be loaded with extra_data.by_index[:5] for
the first 5 trains. If None, all trains are retrieved.
rois: dict
a dictionnary of mnemonics with a list of rois definition and
the desired names, for example:
{'fastccd': {'ref': {'roi': by_index[730:890, 535:720],
'dim': ['ref_x', 'ref_y']},
'sam': {'roi':by_index[1050:1210, 535:720],
'dim': ['sam_x', 'sam_y']}}}
extract_digitizers: bool
If True, extracts the peaks from digitizer variables and aligns the
pulse Id according to the fadc_bp bunch pattern.
extract_xgm: bool
If True, extracts the values from XGM variables (e.g. 'SCS_SA3',
'XTD10_XGM') and aligns the pulse Id with the sase1 / sase3 bunch
pattern.
extract_bam: bool
If True, extracts the values from BAM variables (e.g. 'BAM1932M')
and aligns the pulse Id with the sase3 bunch pattern.
bunchPattern: str
bunch pattern used to extract the Fast ADC pulses.
A string or a dict as in::
{'FFT_PD2': 'sase3', 'ILH_I0': 'scs_ppl'}
Ignored if extract_digitizers=False.
parallelize: bool
from EXtra-Data: enable or disable opening files in parallel.
Particularly useful if creating child processes is not allowed
(e.g. in a daemonized multiprocessing.Process).
Returns
-------
run, ds: DataCollection, xarray.Dataset
extra_data DataCollection of the proposal and run number and an
xarray Dataset with aligned trainIds and pulseIds
Example
-------
>>> import toolbox_scs as tb
>>> run, data = tb.load(2212, 208, ['SCS_SA3', 'MCP2apd', 'nrj'])
"""
run = ed.open_run(proposalNB, runNB, data=data, parallelize=parallelize)
if subset is not None:
run = run.select_trains(subset)
if fields is None:
return run, xr.Dataset()
if isinstance(fields, str):
fields = [fields]
if validate:
# get_ipython().system('extra-data-validate ' + runFolder)
pass
if display:
run.info()
data_arrays = []
run_mnemonics = mnemonics_for_run(run)
for f in fields:
if type(f) == dict:
# extracting mnemomic defined on the spot
if len(f.keys()) > 1:
print('Loading only one "on-the-spot" mnemonic at a time, '
'skipping all others !')
k = list(f.keys())[0]
v = f[k]
else:
# extracting mnemomic from the table
if f in run_mnemonics:
v = run_mnemonics[f]
k = f
else:
if f in _mnemonics:
log.warning(f'Mnemonic "{f}" not found in run. Skipping!')
print(f'Mnemonic "{f}" not found in run. Skipping!')
else:
log.warning(f'Unknow mnemonic "{f}". Skipping!')
print(f'Unknow mnemonic "{f}". Skipping!')
continue
if k in [d.name for d in data_arrays]:
continue # already loaded, skip
if display:
print(f'Loading {k}')
if v['source'] not in run.all_sources:
log.warning(f'Source {v["source"]} not found in run. Skipping!')
print(f'Source {v["source"]} not found in run. Skipping!')
continue
if k == 'MTE3':
arr = run.get_array(v['source'], v['key'],
extra_dims=v['dim'], name=k)
tpi = run.get_array('SCS_XTD10_TPI/DCTRL/SHUTTER',
'hardwareStatusBitField.value', name=k)
tpi_open = iter(tpi.trainId[tpi & (1 << 12) > 0])
mte3_tids = []
last = 0
current = next(tpi_open, None)
if current is None:
data_arrays.append(arr)
else:
for tid in arr.trainId:
while current < tid:
last = current
current = next(tpi_open, tid)
mte3_tids.append(last)
data_arrays.append(
arr.assign_coords(trainId=np.array(mte3_tids, dtype='u8')))
elif k not in rois:
# no ROIs selection, we read everything
arr = run.get_array(v['source'], v['key'],
extra_dims=v['dim'], name=k)
if len(arr) == 0:
log.warning(f'Empty array for {f}: {v["source"]}, {v["key"]}. '
'Skipping!')
print(f'Empty array for {f}: {v["source"]}, {v["key"]}. '
'Skipping!')
continue
data_arrays.append(arr)
else:
# ROIs selection, for each ROI we select a region of the data and
# save it with new name and dimensions
for nk, nv in rois[k].items():
arr = run.get_array(v['source'], v['key'],
extra_dims=nv['dim'],
roi=nv['roi'],
name=nk)
if len(arr) == 0:
log.warning(f'Empty array for {f}: {v["source"]}, '
f'{v["key"]}. Skipping!')
print(f'Empty array for {f}: {v["source"]}, {v["key"]}. '
'Skipping!')
continue
data_arrays.append(arr)
# Check missing trains
for arr in data_arrays:
if 'hRIXS' in arr.name:
continue
rate = arr.sizes["trainId"] / len(run.train_ids)
if rate < 0.95:
log.warning(f'{arr.name}: only {rate*100:.1f}% of trains '
f'({arr.sizes["trainId"]} out of '
f'{len(run.train_ids)}) contain data.')
ds = xr.merge(data_arrays, join='inner')
ds.attrs['runNB'] = runNB
if isinstance(proposalNB, int):
proposalNB = 'p{:06d}'.format(proposalNB)
ds.attrs['proposal'] = find_proposal(proposalNB)
ds.attrs['data'] = data
if extract_digitizers:
bp = bunchPattern
for k, v in run_mnemonics.items():
if k not in ds or v.get('extract') != 'peaks':
continue
if isinstance(bunchPattern, dict):
bp = bunchPattern.get(k)
if bp is None:
continue
ds = tbdet.get_digitizer_peaks(
run, mnemonic=k, merge_with=ds, bunchPattern=bp)
if extract_xgm:
for k, v in run_mnemonics.items():
if k not in ds or v.get('extract') != 'XGM':
continue
ds = tbdet.get_xgm(run, mnemonics=k, merge_with=ds)
if extract_bam:
for k, v in run_mnemonics.items():
if k not in ds or v.get('extract') != 'BAM':
continue
ds = tbdet.get_bam(run, mnemonics=k, merge_with=ds)
return run, ds
[docs]def run_by_path(path):
"""
Return specified run
Wraps the extra_data RunDirectory routine, to ease its use for the
scs-toolbox user.
Parameters
----------
path: str
path to the run directory
Returns
-------
run : extra_data.DataCollection
DataCollection object containing information about the specified
run. Data can be loaded using built-in class methods.
"""
return ed.RunDirectory(path)
[docs]def find_run_path(proposalNB, runNB, data='raw'):
"""
Return the run path given the specified proposal and run numbers.
Parameters
----------
proposalNB: (str, int)
proposal number e.g. 'p002252' or 2252
runNB: (str, int)
run number as integer
data: str
'raw', 'proc' (processed) or 'all' (both 'raw' and 'proc') to access
data from either or both of those folders. If 'all' is used, sources
present in 'proc' overwrite those in 'raw'. The default is 'raw'.
Returns
-------
path: str
The run path.
"""
if isinstance(runNB, int):
runNB = 'r{:04d}'.format(runNB)
if isinstance(proposalNB, int):
proposalNB = 'p{:06d}'.format(proposalNB)
return os.path.join(find_proposal(proposalNB), data, runNB)
[docs]def open_run(proposalNB, runNB, subset=None, **kwargs):
"""
Get extra_data.DataCollection in a given proposal.
Wraps the extra_data open_run routine and adds subset selection, out of
convenience for the toolbox user. More information can be found in the
extra_data documentation.
Parameters
----------
proposalNB: (str, int)
proposal number e.g. 'p002252' or 2252
runNB: (str, int)
run number e.g. 17 or 'r0017'
subset: slice or extra_data.by_index or numpy.s_
a subset of train that can be loaded with extra_data.by_index[:5] for
the first 5 trains. If None, all trains are retrieved.
**kwargs
--------
data: str
default -> 'raw'
include: str
default -> '*'
Returns
-------
run : extra_data.DataCollection
DataCollection object containing information about the specified
run. Data can be loaded using built-in class methods.
"""
run = ed.open_run(proposalNB, runNB, **kwargs)
if subset is not None:
run = run.select_trains(subset)
return run
[docs]def get_array(run=None, mnemonic=None, stepsize=None,
subset=None, data='raw',
proposalNB=None, runNB=None):
"""
Loads one data array for the specified mnemonic and rounds its values to
integer multiples of stepsize for consistent grouping (except for
stepsize=None).
Returns a 1D array of ones if mnemonic is set to None.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the data.
Used if proposalNB and runNB are None.
mnemonic: str
Identifier of a single item in the mnemonic collection. None creates a
dummy 1D array of ones with length equal to the number of trains.
stepsize : float
nominal stepsize of the array data - values will be rounded to integer
multiples of this value.
subset: slice or extra_data.by_index or numpy.s_
a subset of train that can be loaded with extra_data.by_index[:5] for
the first 5 trains. If None, all trains are retrieved.
data: str or Sequence of str
'raw', 'proc' (processed), or any other location relative to the
proposal path with data per run to access. May also be ‘all’
(both ‘raw’ and ‘proc’) or a sequence of strings to load data
from several locations, with later locations overwriting sources
present in earlier ones. The default is 'raw'.
proposalNB: (str, int)
proposal number e.g. 'p002252' or 2252.
runNB: (str, int)
run number e.g. 17 or 'r0017'.
Returns
-------
data : xarray.DataArray
xarray DataArray containing rounded array values using the trainId as
coordinate.
Raises
------
ToolBoxValueError: Exception
Toolbox specific exception, indicating a non-valid mnemonic entry
Example
-------
>>> import toolbox_scs as tb
>>> run = tb.open_run(2212, 235)
>>> mnemonic = 'PP800_PhaseShifter'
>>> data_PhaseShifter = tb.get_array(run, mnemonic, 0.5)
"""
if run is None:
run = open_run(proposalNB, runNB, subset, data=data)
else:
if not isinstance(run, ed.DataCollection):
raise TypeError(f'run argument has type {type(run)} but '
'expected type is extra_data.DataCollection')
if subset is not None:
run = run.select_trains(subset)
run_mnemonics = mnemonics_for_run(run)
try:
if mnemonic is None:
da = xr.DataArray(
np.ones(len(run.train_ids), dtype=np.int16),
dims=['trainId'], coords={'trainId': run.train_ids})
elif mnemonic in run_mnemonics:
mnem = run_mnemonics[mnemonic]
da = run.get_array(mnem['source'], mnem['key'],
extra_dims=mnem['dim'], name=mnemonic)
else:
raise ToolBoxValueError("Invalid mnemonic", mnemonic)
if stepsize is not None:
da = stepsize * np.round(da / stepsize)
log.debug(f"Got data for {mnemonic}")
except ToolBoxValueError as err:
log.error(f"{err.message}")
raise
return da
[docs]def load_run_values(prop_or_run, runNB=None, which='mnemonics'):
"""
Load the run value for each mnemonic whose source is a CONTORL
source (see extra-data DataCollection.get_run_value() for details)
Parameters
----------
prop_or_run: extra_data DataCollection or int
The run (DataCollection) to check for mnemonics.
Alternatively, the proposal number (int), for which the runNB
is also required.
runNB: int
The run number. Only used if the first argument is the proposal
number.
which: str
'mnemonics' or 'all'. If 'mnemonics', only the run values for the
ToolBox mnemonics are retrieved. If 'all', a compiled dictionnary
of all control sources run values is returned.
Output
------
run_values: a dictionnary containing the mnemonic or all run values.
"""
if which not in ['mnemonics', 'all']:
raise ValueError('`which` should be either "mnemonics" or "all"')
run = prop_or_run
if runNB is not None:
run = open_run(prop_or_run, runNB)
if which == 'all':
run_values = {}
for c in run.control_sources:
v = run.get_run_values(c)
run_values[c] = v
return run_values
mnemos = mnemonics_for_run(run)
run_values = {}
for m in mnemos:
val = None
try:
if mnemos[m]['source'] in run.control_sources:
val = run.get_run_value(mnemos[m]['source'],
mnemos[m]['key'])
except Exception as e:
log.info(f'Error while retrieving {m} mnemonic: {e}')
continue
run_values[m] = val
return run_values
[docs]def concatenateRuns(runs):
""" Sorts and concatenate a list of runs with identical data variables
along the trainId dimension.
Input:
runs: (list) the xarray Datasets to concatenate
Output:
a concatenated xarray Dataset
"""
firstTid = {i: int(run.trainId[0].values) for i, run in enumerate(runs)}
orderedDict = dict(sorted(firstTid.items(), key=lambda t: t[1]))
orderedRuns = [runs[i] for i in orderedDict]
keys = orderedRuns[0].keys()
for run in orderedRuns[1:]:
if run.keys() != keys:
print('data fields between different runs are not identical. '
'Cannot combine runs.')
return
result = xr.concat(orderedRuns, dim='trainId')
for k in orderedRuns[0].attrs.keys():
result.attrs[k] = [run.attrs[k] for run in orderedRuns]
return result
[docs]def check_data_rate(run, fields=None):
"""
Calculates the fraction of train ids that contain data in a run.
Parameters
----------
run: extra_data DataCollection
the DataCollection associated to the data.
fields: str, list of str or dict
mnemonics to check. If None, all mnemonics in the run are checked.
A custom mnemonic can be defined with a dictionnary: {'extra':
{'source': 'SCS_CDIFFT_MAG/SUPPLY/CURRENT', 'key':
'actual_current.value'}}
Output
------
ret: dictionnary
dictionnary with mnemonic as keys and fraction of train ids
that contain data as values.
"""
run_mnemonics = mnemonics_for_run(run)
if fields is None:
fields = run_mnemonics
fields = [fields] if isinstance(fields, str) else fields
ret = {}
for f in fields:
if isinstance(f, dict):
name = list(f.keys())[0]
val = f[name]
f = name
elif f not in run_mnemonics:
log.warning(f'mnemonic {f} not found. Skipping!')
continue
else:
val = run_mnemonics[f]
counts = run[val['source']][val['key']].data_counts(False)
npulses = counts.max()
if npulses == 0: # (only missing data)
rate = 0.
else:
counts = counts / npulses # to only count trains and not pulses
rate = counts.sum() / len(run.train_ids)
ret[f] = rate
return ret