Source code for toolbox_scs.load

# -*- coding: utf-8 -*-
"""
    Toolbox for SCS.

    Various utilities function to quickly process data measured at the SCS
    instruments.

    Copyright (2019) SCS Team.
"""

import logging
import os

import numpy as np
import xarray as xr
import extra_data as ed
from extra_data.read_machinery import find_proposal

from .constants import mnemonics as _mnemonics
from .mnemonics_machinery import mnemonics_for_run
from .util.exceptions import ToolBoxValueError
import toolbox_scs.detectors as tbdet
from .misc.bunch_pattern import (npulses_has_changed,
                                 get_unique_sase_pId, load_bpt)

__all__ = [
    'concatenateRuns',
    'find_run_path',
    'get_array',
    'load',
    'open_run',
    'run_by_path',
    'load_run_values',
    'check_data_rate',
]

log = logging.getLogger(__name__)


[docs]def load(proposalNB=None, runNB=None, fields=None, data='all', display=False, validate=False, subset=None, rois={}, extract_digitizers=True, extract_xgm=True, extract_bam=True, bunchPattern='sase3', parallelize=True, ): """ Load a run and extract the data. Output is an xarray with aligned trainIds. Parameters ---------- proposalNB: str, int proposal number e.g. 'p002252' or 2252 runNB: str, int run number as integer fields: str, list of str, list of dict list of mnemonics to load specific data such as "fastccd", "SCS_XGM", or dictionnaries defining a custom mnemonic such as {"extra": {'source': 'SCS_CDIFFT_MAG/SUPPLY/CURRENT', 'key': 'actual_current.value', 'dim': None}} data: str or Sequence of str 'raw', 'proc' (processed), or any other location relative to the proposal path with data per run to access. May also be ‘all’ (both ‘raw’ and ‘proc’) or a sequence of strings to load data from several locations, with later locations overwriting sources present in earlier ones. The default is 'raw'. display: bool whether to show the run.info or not validate: bool whether to run extra-data-validate or not subset: slice or extra_data.by_index or numpy.s_ a subset of train that can be loaded with extra_data.by_index[:5] for the first 5 trains. If None, all trains are retrieved. rois: dict a dictionnary of mnemonics with a list of rois definition and the desired names, for example: {'fastccd': {'ref': {'roi': by_index[730:890, 535:720], 'dim': ['ref_x', 'ref_y']}, 'sam': {'roi':by_index[1050:1210, 535:720], 'dim': ['sam_x', 'sam_y']}}} extract_digitizers: bool If True, extracts the peaks from digitizer variables and aligns the pulse Id according to the fadc_bp bunch pattern. extract_xgm: bool If True, extracts the values from XGM variables (e.g. 'SCS_SA3', 'XTD10_XGM') and aligns the pulse Id with the sase1 / sase3 bunch pattern. extract_bam: bool If True, extracts the values from BAM variables (e.g. 'BAM1932M') and aligns the pulse Id with the sase3 bunch pattern. bunchPattern: str bunch pattern used to extract the Fast ADC pulses. A string or a dict as in:: {'FFT_PD2': 'sase3', 'ILH_I0': 'scs_ppl'} Ignored if extract_digitizers=False. parallelize: bool from EXtra-Data: enable or disable opening files in parallel. Particularly useful if creating child processes is not allowed (e.g. in a daemonized multiprocessing.Process). Returns ------- run, ds: DataCollection, xarray.Dataset extra_data DataCollection of the proposal and run number and an xarray Dataset with aligned trainIds and pulseIds Example ------- >>> import toolbox_scs as tb >>> run, data = tb.load(2212, 208, ['SCS_SA3', 'MCP2apd', 'nrj']) """ run = ed.open_run(proposalNB, runNB, data=data, parallelize=parallelize) if subset is not None: run = run.select_trains(subset) if fields is None: return run, xr.Dataset() if isinstance(fields, str): fields = [fields] if validate: # get_ipython().system('extra-data-validate ' + runFolder) pass if display: run.info() data_arrays = [] run_mnemonics = mnemonics_for_run(run) for f in fields: if type(f) == dict: # extracting mnemomic defined on the spot if len(f.keys()) > 1: print('Loading only one "on-the-spot" mnemonic at a time, ' 'skipping all others !') k = list(f.keys())[0] v = f[k] else: # extracting mnemomic from the table if f in run_mnemonics: v = run_mnemonics[f] k = f else: if f in _mnemonics: log.warning(f'Mnemonic "{f}" not found in run. Skipping!') print(f'Mnemonic "{f}" not found in run. Skipping!') else: log.warning(f'Unknow mnemonic "{f}". Skipping!') print(f'Unknow mnemonic "{f}". Skipping!') continue if k in [d.name for d in data_arrays]: continue # already loaded, skip if display: print(f'Loading {k}') if v['source'] not in run.all_sources: log.warning(f'Source {v["source"]} not found in run. Skipping!') print(f'Source {v["source"]} not found in run. Skipping!') continue if k == 'MTE3': arr = run.get_array(v['source'], v['key'], extra_dims=v['dim'], name=k) tpi = run.get_array('SCS_XTD10_TPI/DCTRL/SHUTTER', 'hardwareStatusBitField.value', name=k) tpi_open = iter(tpi.trainId[tpi & (1 << 12) > 0]) mte3_tids = [] last = 0 current = next(tpi_open, None) if current is None: data_arrays.append(arr) else: for tid in arr.trainId: while current < tid: last = current current = next(tpi_open, tid) mte3_tids.append(last) data_arrays.append( arr.assign_coords(trainId=np.array(mte3_tids, dtype='u8'))) elif k not in rois: # no ROIs selection, we read everything arr = run.get_array(v['source'], v['key'], extra_dims=v['dim'], name=k) if len(arr) == 0: log.warning(f'Empty array for {f}: {v["source"]}, {v["key"]}. ' 'Skipping!') print(f'Empty array for {f}: {v["source"]}, {v["key"]}. ' 'Skipping!') continue data_arrays.append(arr) else: # ROIs selection, for each ROI we select a region of the data and # save it with new name and dimensions for nk, nv in rois[k].items(): arr = run.get_array(v['source'], v['key'], extra_dims=nv['dim'], roi=nv['roi'], name=nk) if len(arr) == 0: log.warning(f'Empty array for {f}: {v["source"]}, ' f'{v["key"]}. Skipping!') print(f'Empty array for {f}: {v["source"]}, {v["key"]}. ' 'Skipping!') continue data_arrays.append(arr) # Check missing trains for arr in data_arrays: if 'hRIXS' in arr.name: continue rate = arr.sizes["trainId"] / len(run.train_ids) if rate < 0.95: log.warning(f'{arr.name}: only {rate*100:.1f}% of trains ' f'({arr.sizes["trainId"]} out of ' f'{len(run.train_ids)}) contain data.') ds = xr.merge(data_arrays, join='inner') ds.attrs['runNB'] = runNB if isinstance(proposalNB, int): proposalNB = 'p{:06d}'.format(proposalNB) ds.attrs['proposal'] = find_proposal(proposalNB) ds.attrs['data'] = data if extract_digitizers: bp = bunchPattern for k, v in run_mnemonics.items(): if k not in ds or v.get('extract') != 'peaks': continue if isinstance(bunchPattern, dict): bp = bunchPattern.get(k) if bp is None: continue ds = tbdet.get_digitizer_peaks( run, mnemonic=k, merge_with=ds, bunchPattern=bp) if extract_xgm: for k, v in run_mnemonics.items(): if k not in ds or v.get('extract') != 'XGM': continue ds = tbdet.get_xgm(run, mnemonics=k, merge_with=ds) if extract_bam: for k, v in run_mnemonics.items(): if k not in ds or v.get('extract') != 'BAM': continue ds = tbdet.get_bam(run, mnemonics=k, merge_with=ds) return run, ds
[docs]def run_by_path(path): """ Return specified run Wraps the extra_data RunDirectory routine, to ease its use for the scs-toolbox user. Parameters ---------- path: str path to the run directory Returns ------- run : extra_data.DataCollection DataCollection object containing information about the specified run. Data can be loaded using built-in class methods. """ return ed.RunDirectory(path)
[docs]def find_run_path(proposalNB, runNB, data='raw'): """ Return the run path given the specified proposal and run numbers. Parameters ---------- proposalNB: (str, int) proposal number e.g. 'p002252' or 2252 runNB: (str, int) run number as integer data: str 'raw', 'proc' (processed) or 'all' (both 'raw' and 'proc') to access data from either or both of those folders. If 'all' is used, sources present in 'proc' overwrite those in 'raw'. The default is 'raw'. Returns ------- path: str The run path. """ if isinstance(runNB, int): runNB = 'r{:04d}'.format(runNB) if isinstance(proposalNB, int): proposalNB = 'p{:06d}'.format(proposalNB) return os.path.join(find_proposal(proposalNB), data, runNB)
[docs]def open_run(proposalNB, runNB, subset=None, **kwargs): """ Get extra_data.DataCollection in a given proposal. Wraps the extra_data open_run routine and adds subset selection, out of convenience for the toolbox user. More information can be found in the extra_data documentation. Parameters ---------- proposalNB: (str, int) proposal number e.g. 'p002252' or 2252 runNB: (str, int) run number e.g. 17 or 'r0017' subset: slice or extra_data.by_index or numpy.s_ a subset of train that can be loaded with extra_data.by_index[:5] for the first 5 trains. If None, all trains are retrieved. **kwargs -------- data: str default -> 'raw' include: str default -> '*' Returns ------- run : extra_data.DataCollection DataCollection object containing information about the specified run. Data can be loaded using built-in class methods. """ run = ed.open_run(proposalNB, runNB, **kwargs) if subset is not None: run = run.select_trains(subset) return run
[docs]def get_array(run=None, mnemonic=None, stepsize=None, subset=None, data='raw', proposalNB=None, runNB=None): """ Loads one data array for the specified mnemonic and rounds its values to integer multiples of stepsize for consistent grouping (except for stepsize=None). Returns a 1D array of ones if mnemonic is set to None. Parameters ---------- run: extra_data.DataCollection DataCollection containing the data. Used if proposalNB and runNB are None. mnemonic: str Identifier of a single item in the mnemonic collection. None creates a dummy 1D array of ones with length equal to the number of trains. stepsize : float nominal stepsize of the array data - values will be rounded to integer multiples of this value. subset: slice or extra_data.by_index or numpy.s_ a subset of train that can be loaded with extra_data.by_index[:5] for the first 5 trains. If None, all trains are retrieved. data: str or Sequence of str 'raw', 'proc' (processed), or any other location relative to the proposal path with data per run to access. May also be ‘all’ (both ‘raw’ and ‘proc’) or a sequence of strings to load data from several locations, with later locations overwriting sources present in earlier ones. The default is 'raw'. proposalNB: (str, int) proposal number e.g. 'p002252' or 2252. runNB: (str, int) run number e.g. 17 or 'r0017'. Returns ------- data : xarray.DataArray xarray DataArray containing rounded array values using the trainId as coordinate. Raises ------ ToolBoxValueError: Exception Toolbox specific exception, indicating a non-valid mnemonic entry Example ------- >>> import toolbox_scs as tb >>> run = tb.open_run(2212, 235) >>> mnemonic = 'PP800_PhaseShifter' >>> data_PhaseShifter = tb.get_array(run, mnemonic, 0.5) """ if run is None: run = open_run(proposalNB, runNB, subset, data=data) else: if not isinstance(run, ed.DataCollection): raise TypeError(f'run argument has type {type(run)} but ' 'expected type is extra_data.DataCollection') if subset is not None: run = run.select_trains(subset) run_mnemonics = mnemonics_for_run(run) try: if mnemonic is None: da = xr.DataArray( np.ones(len(run.train_ids), dtype=np.int16), dims=['trainId'], coords={'trainId': run.train_ids}) elif mnemonic in run_mnemonics: mnem = run_mnemonics[mnemonic] da = run.get_array(mnem['source'], mnem['key'], extra_dims=mnem['dim'], name=mnemonic) else: raise ToolBoxValueError("Invalid mnemonic", mnemonic) if stepsize is not None: da = stepsize * np.round(da / stepsize) log.debug(f"Got data for {mnemonic}") except ToolBoxValueError as err: log.error(f"{err.message}") raise return da
[docs]def load_run_values(prop_or_run, runNB=None, which='mnemonics'): """ Load the run value for each mnemonic whose source is a CONTORL source (see extra-data DataCollection.get_run_value() for details) Parameters ---------- prop_or_run: extra_data DataCollection or int The run (DataCollection) to check for mnemonics. Alternatively, the proposal number (int), for which the runNB is also required. runNB: int The run number. Only used if the first argument is the proposal number. which: str 'mnemonics' or 'all'. If 'mnemonics', only the run values for the ToolBox mnemonics are retrieved. If 'all', a compiled dictionnary of all control sources run values is returned. Output ------ run_values: a dictionnary containing the mnemonic or all run values. """ if which not in ['mnemonics', 'all']: raise ValueError('`which` should be either "mnemonics" or "all"') run = prop_or_run if runNB is not None: run = open_run(prop_or_run, runNB) if which == 'all': run_values = {} for c in run.control_sources: v = run.get_run_values(c) run_values[c] = v return run_values mnemos = mnemonics_for_run(run) run_values = {} for m in mnemos: val = None try: if mnemos[m]['source'] in run.control_sources: val = run.get_run_value(mnemos[m]['source'], mnemos[m]['key']) except Exception as e: log.info(f'Error while retrieving {m} mnemonic: {e}') continue run_values[m] = val return run_values
[docs]def concatenateRuns(runs): """ Sorts and concatenate a list of runs with identical data variables along the trainId dimension. Input: runs: (list) the xarray Datasets to concatenate Output: a concatenated xarray Dataset """ firstTid = {i: int(run.trainId[0].values) for i, run in enumerate(runs)} orderedDict = dict(sorted(firstTid.items(), key=lambda t: t[1])) orderedRuns = [runs[i] for i in orderedDict] keys = orderedRuns[0].keys() for run in orderedRuns[1:]: if run.keys() != keys: print('data fields between different runs are not identical. ' 'Cannot combine runs.') return result = xr.concat(orderedRuns, dim='trainId') for k in orderedRuns[0].attrs.keys(): result.attrs[k] = [run.attrs[k] for run in orderedRuns] return result
[docs]def check_data_rate(run, fields=None): """ Calculates the fraction of train ids that contain data in a run. Parameters ---------- run: extra_data DataCollection the DataCollection associated to the data. fields: str, list of str or dict mnemonics to check. If None, all mnemonics in the run are checked. A custom mnemonic can be defined with a dictionnary: {'extra': {'source': 'SCS_CDIFFT_MAG/SUPPLY/CURRENT', 'key': 'actual_current.value'}} Output ------ ret: dictionnary dictionnary with mnemonic as keys and fraction of train ids that contain data as values. """ run_mnemonics = mnemonics_for_run(run) if fields is None: fields = run_mnemonics fields = [fields] if isinstance(fields, str) else fields ret = {} for f in fields: if isinstance(f, dict): name = list(f.keys())[0] val = f[name] f = name elif f not in run_mnemonics: log.warning(f'mnemonic {f} not found. Skipping!') continue else: val = run_mnemonics[f] counts = run[val['source']][val['key']].data_counts(False) npulses = counts.max() if npulses == 0: # (only missing data) rate = 0. else: counts = counts / npulses # to only count trains and not pulses rate = counts.sum() / len(run.train_ids) ret[f] = rate return ret