Source code for toolbox_scs.detectors.digitizers

""" Digitizers related sub-routines

    Copyright (2021) SCS Team.

    (contributions preferrably comply with pep8 code structure
    guidelines.)
"""

import logging
import os

import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from scipy.signal import find_peaks

from ..misc.bunch_pattern_external import is_pulse_at
from ..mnemonics_machinery import mnemonics_for_run
from extra_data import open_run
from extra_data.read_machinery import find_proposal
from extra.components import XrayPulses, OpticalLaserPulses

__all__ = [
    'check_peak_params',
    'get_digitizer_peaks',
    'get_dig_avg_trace',
    'load_processed_peaks',
    'check_processed_peak_params'
]

log = logging.getLogger(__name__)


def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop,
                         period=None, npulses=None, extra_dim=None):
    """
    Computes peaks from raw digitizer traces by trapezoidal integration.

    Parameters
    ----------
    traces: xarray DataArray or numpy array containing raw traces. If
        numpy array is provided, the second dimension is that of the samples.
    pulseStart: int or list or 1D-numpy array
        trace index of integration start. If 1d array, each value is the start
        of one peak. The period and npulses parameters are ignored.
    pulseStop: int
        trace index of integration stop.
    baseStart: int
        trace index of background start.
    baseStop: int
        trace index of background stop.
    period: int
        number of samples between two peaks. Ignored if intstart is a 1D-array.
    npulses: int
        number of pulses. Ignored if intstart is a 1D-array.
    extra_dim: str
        Name given to the dimension along the peaks. Defaults to 'pulseId'.

    Returns
    -------
    xarray DataArray
    """

    assert traces.ndim == 2

    if isinstance(traces, xr.DataArray):
        ntid = traces.sizes['trainId']
        coords = traces.coords
        traces = traces.values
        if traces.shape[0] != ntid:
            traces = traces.T
    else:
        coords = None

    if hasattr(pulseStart, '__len__'):
        pulseStart = np.array(pulseStart)
        pulses = pulseStart - pulseStart[0]
        pulseStart = pulseStart[0]
    else:
        if period is None or period == 0:
            period = 1
        pulses = range(0, npulses*period, period)

    if extra_dim is None:
        extra_dim = 'pulseId'
    results = xr.DataArray(np.empty((traces.shape[0], len(pulses))),
                           coords=coords,
                           dims=['trainId', extra_dim])

    for i, p in enumerate(pulses):
        a = pulseStart + p
        b = pulseStop + p
        bkga = baseStart + p
        bkgb = baseStop + p
        if b > traces.shape[1]:
            break
        bg = np.outer(np.median(traces[:, bkga:bkgb], axis=1),
                      np.ones(b-a))
        results[:, i] = np.trapz(traces[:, a:b] - bg, axis=1)
    return results


def peaks_from_apd(array, params, digitizer, bpt, bunchPattern):
    """
    Extract peak-integrated data according to the bunch pattern.

    Parameters
    ----------
    array: xarray DataArray
        2D array containing peak-integrated data
    params: dict
        peak-integration parameters of the digitizer
    digitizer: str
        digitizer type, one of {'FastADC', 'ADQ412'}
    bpt: xarray DataArray
        bunch pattern table
    bunchPattern: str
        one of {'sase1', 'sase3', 'scs_ppl'}, used to select pulses and
        assign name of the pulse id dimension.

    Returns
    -------
    xarray DataArray with pulse id coordinates.
    """
    if params['enable'] == 0 or (array == 1.).all():
        raise ValueError('The digitizer did not record integrated peaks. '
                         'Consider using raw traces from the same channel '
                         'for peak integration.')
    if digitizer == 'FastADC':
        min_distance = 24
    if digitizer == 'ADQ412':
        min_distance = 440
    period = params['period']
    if period % min_distance != 0:
        log.warning(f'Warning: the pulse period ({period} samples) of '
                    'digitizer is not a multiple of the pulse separation at '
                    f'4.5 MHz ({min_distance} samples). Pulse id assignment '
                    'is likely to fail.')
    stride = int(period/min_distance)
    npulses_apd = params['npulses']
    dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId'}
    pulse_dim = dim_names[bunchPattern]
    arr_dim = [dim for dim in array.dims if dim != 'trainId'][0]
    if npulses_apd > array.sizes[arr_dim]:
        log.warning(f'The digitizer was set to record {npulses_apd} pulses '
                    f'but the array length is only {array.sizes[arr_dim]}.')
        npulses_apd = array.sizes[arr_dim]
    mask = is_pulse_at(bpt, bunchPattern).rename({'pulse_slot': pulse_dim})
    mask = mask.where(mask.trainId.isin(array.trainId), drop=True)
    mask = mask.assign_coords({pulse_dim: np.arange(bpt.sizes['pulse_slot'])})
    pid = np.sort(np.unique(np.where(mask)[1]))
    npulses_bpt = len(pid)
    apd_coords = np.arange(pid[0], pid[0] + stride*npulses_apd, stride)
    noalign = False
    if len(np.intersect1d(apd_coords, pid, assume_unique=True)) < npulses_bpt:
        log.warning('Not all pulses were recorded. The digitizer '
                    'was set to record pulse ids '
                    f'{apd_coords[apd_coords<bpt.sizes["pulse_slot"]]} but the'
                    'bunch pattern for'
                    f' {bunchPattern} is {pid}. Skipping pulse ID alignment.')
        noalign = True
    array = array.isel({arr_dim: slice(0, npulses_apd)})
    array = array.where(array != 1.)
    if noalign:
        return array
    array = array.rename(
        {arr_dim: pulse_dim}).assign_coords({pulse_dim: apd_coords})
    array, mask = xr.align(array, mask, join='inner')
    array = array.where(mask, drop=True)
    return array


[docs]def get_dig_avg_trace(run, mnemonic, ntrains=None): """ Compute the average over ntrains evenly spaced accross all trains of a digitizer trace. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. ntrains: int Number of trains used to calculate the average raw trace. If None, all trains are used. Returns ------- trace: DataArray The average digitizer trace """ run_mnemonics = mnemonics_for_run(run) if ntrains is None: sel = run else: total_tid = len(run.train_ids) stride = int(np.max([1, np.floor(total_tid/ntrains)])) sel = run.select_trains(np.s_[0:None:stride]) m = run_mnemonics[mnemonic] raw_trace = sel.get_array(m['source'], m['key'], m['dim']) raw_trace = raw_trace.mean(dim='trainId') return raw_trace
def channel_peak_params(run, source, key=None, channel=None, board=None): """ Extract peak-integration parameters used by a channel of the digitizer. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. source: str ToolBox mnemonic of a digitizer data, e.g. "MCP2apd" or "FastADC4peaks", or name of digitizer source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'. key: str optional, used in combination of source (if source is not a ToolBox mnemonics) instead of digitizer, channel and board. channel: int or str The digitizer channel for which to retrieve the parameters. If None, inferred from the source mnemonic or source + key arguments. board: int Board of the ADQ412. If None, inferred from the source mnemonic or source + key arguments. Returns ------- dict with peak integration parameters. """ run_mnemonics = mnemonics_for_run(run) if source in run_mnemonics: m = run_mnemonics[source] source = m['source'] key = m['key'] if 'network' in source: return adq412_channel_peak_params(run, source, key, channel, board) else: return fastADC_channel_peak_params(run, source, channel) def fastADC_channel_peak_params(run, source, channel=None): """ Extract peak-integration parameters used by a channel of the Fast ADC. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. source: str Name of Fast ADC source, e.g. 'SCS_UTC1_MCP/ADC/1:channel_5.output'. channel: int The Fast ADC channel for which to retrieve the parameters. If None, inferred from the source. Returns ------- dict with peak integration parameters. """ fastADC_keys = { 'baseStart': ('baselineSettings.baseStart.value', 'baseStart.value'), 'baseStop': ('baseStop.value',), 'baseLength': ('baselineSettings.length.value',), 'enable': ('enablePeakComputation.value',), 'pulseStart': ('initialDelay.value',), 'pulseLength': ('peakSamples.value',), 'npulses': ('numPulses.value',), 'period': ('pulsePeriod.value',) } if channel is None: channel = int(source.split(':')[1].split('.')[0].split('_')[1]) baseName = f'channel_{channel}.' source = source.split(':')[0] data = run.select(source).train_from_index(0)[1][source] result = {} for mnemo, versions in fastADC_keys.items(): for v in versions: key = baseName + v if key in data: result[mnemo] = data[key] if 'baseLength' in result: result['baseStop'] = result['baseStart'] + \ result.pop('baseLength') if 'pulseLength' in result: result['pulseStop'] = result['pulseStart'] + \ result.pop('pulseLength') return result def adq412_channel_peak_params(run, source, key=None, channel=None, board=None): """ Extract peak-integration parameters used by a channel of the ADQ412. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. source: str Nname of ADQ412 source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'. key: str optional, e.g. 'digitizers.channel_1_D.apd.pulseIntegral', used in combination of source instead of channel and board. channel: int or str The ADQ412 channel for which to retrieve the parameters. If None, inferred from the source mnemonic or source + key arguments. board: int Board of the ADQ412. If None, inferred from the source mnemonic or source + key arguments. Returns ------- dict with peak integration parameters. """ if key is None: if channel is None or board is None: raise ValueError('key or channel + board arguments are ' 'missing.') else: k = key.split('.')[1].split('_') ch_to_int = {'A': 0, 'B': 1, 'C': 2, 'D': 3} channel = ch_to_int[k[2]] board = k[1] baseName = f'board{board}.apd.channel_{channel}.' source = source.split(':')[0] adq412_keys = { 'baseStart': (baseName + 'baseStart.value',), 'baseStop': (baseName + 'baseStop.value',), 'enable': (baseName + 'enable.value',), 'pulseStart': (baseName + 'pulseStart.value',), 'pulseStop': (baseName + 'pulseStop.value',), 'initialDelay': (baseName + 'initialDelay.value',), 'upperLimit': (baseName + 'upperLimit.value',), 'npulses': (f'board{board}.apd.numberOfPulses.value',) } data = run.select(source).train_from_index(0)[1][source] result = {} for mnemo, versions in adq412_keys.items(): for key in versions: if key in data: result[mnemo] = data[key] result['period'] = result.pop('upperLimit') - \ result.pop('initialDelay') return result def find_peaks_in_raw_trace(trace, height=None, width=1, distance=24): """ Find integration parameters for peak integration of a raw digitizer trace. Based on scipy find_peaks(). Parameters ---------- trace: numpy array or xarray DataArray The digitier raw trace used to find peaks height: int minimum height for peak determination width: int minimum width of peak distance: int minimum distance between two peaks Returns ------- dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', 'period', 'npulses' and values in number of samples. """ if isinstance(trace, xr.DataArray): trace = trace.values trace_norm = trace - np.median(trace) trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100]) if SNR < 10: log.warning('signal-to-noise ratio too low: cannot ' 'automatically find peaks.') return {'pulseStart': 2, 'pulseStop': 3, 'baseStart': 0, 'baseStop': 1, 'period': 0, 'npulses': 1} min_height = min(3 * SNR, np.max(np.abs(trace_norm)) / 3) peaks, prop = find_peaks(trace_norm, distance=distance, height=min_height, width=2) params = {} start = int(prop['left_ips'][0]) if len(peaks) == 1: params['pulseStart'] = start params['period'] = 0 else: pulse_period = int(np.median(np.diff(peaks))) pulse_ids = np.digitize(peaks, np.arange(peaks[0] - pulse_period/2, peaks[-1] + pulse_period/2, pulse_period)) - 1 if len(np.unique(np.diff(pulse_ids))) == 1: # Regular pattern params['pulseStart'] = start params['period'] = pulse_period else: # Irregular pattern params['pulseStart'] = start + pulse_ids * pulse_period params['period'] = 0 params['pulseStop'] = int(prop['right_ips'][0]) params['baseStop'] = (3 * start - peaks[0]) / 2 params['baseStop'] = np.min([params['baseStop'], int(prop['right_ips'][0])]).astype(int) params['baseStart'] = params['baseStop'] - np.mean(prop['widths'])/2 params['baseStart'] = np.max([params['baseStart'], 0]).astype(int) params['npulses'] = len(peaks) return params def find_peak_integration_parameters(run, mnemonic, raw_trace=None, integParams=None, pattern=None, ntrains=None): ''' Finds peak integration parameters. ''' run_mnemonics = mnemonics_for_run(run) digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) pulse_period = 24 if digitizer == "FastADC" else 440 if integParams is None: # load raw trace and find peaks autoFind = True if raw_trace is None: raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) params = find_peaks_in_raw_trace(raw_trace, distance=pulse_period) else: # inspect provided parameters autoFind = False required_keys = ['pulseStart', 'pulseStop', 'baseStart', 'baseStop'] add_text = '' if pattern is None and not hasattr(integParams['pulseStart'], '__len__'): required_keys += ['period', 'npulses'] add_text = 'Bunch pattern not provided. ' if not all(name in integParams for name in required_keys): raise ValueError(add_text + 'All keys of integParams argument ' f'{required_keys} are required.') params = integParams.copy() # extract pulse ids from the parameters (starting at 0) pulse_ids_params = None if hasattr(params['pulseStart'], '__len__'): if params.get('npulses') is not None and ( params.get('npulses') != len(params['pulseStart'])): log.warning('The number of pulses does not match the length ' 'of pulseStart. Using length of pulseStart as ' 'the number of pulses.') params['npulses'] = len(params['pulseStart']) pulse_ids_params = ((np.array(params['pulseStart']) - params['pulseStart'][0]) / pulse_period).astype(int) elif 'npulses' in params and 'period' in params: if params['npulses'] == 1: pulse_ids_params = np.array([0]) else: pulse_ids_params = np.arange(0, params['npulses'] * params['period'] / pulse_period, params['period'] / pulse_period).astype(int) # Extract pulse_ids, period and npulses from bunch pattern pulse_ids_bp, npulses_bp, period_bp = None, None, 0 regular = True if pattern is not None: bunchPattern = 'sase3' if hasattr(pattern, 'sase') else 'scs_ppl' if pattern.is_constant_pattern() is False: log.warning('The number of pulses changed during the run.') pulse_ids_bp = np.unique(pattern.pulse_ids(labelled=False, copy=False)) npulses_bp, period_bp = None, 0 regular = False else: pulse_ids_bp = pattern.peek_pulse_ids(labelled=False) npulses_bp = len(pulse_ids_bp) if npulses_bp > 1: periods = np.diff(pulse_ids_bp) if len(np.unique(periods)) > 1: regular = False else: period_bp = np.unique(periods)[0] * pulse_period # Compare parameters with bunch pattern if pulse_ids_params is None: params['npulses'] = npulses_bp params['period'] = period_bp pulse_ids_params = np.arange(0, params['npulses'] * params['period'] / pulse_period, params['period'] / pulse_period).astype(int) if len(pulse_ids_params) == len(pulse_ids_bp): if not (pulse_ids_params == pulse_ids_bp - pulse_ids_bp[0]).all(): log.warning('The provided pulseStart parameters do not match ' f'the {bunchPattern} bunch pattern pulse ids. ' 'Using bunch pattern parameters.') pulse_ids_params = pulse_ids_bp if (npulses_bp != params.get('npulses') or period_bp != params.get('period')): if autoFind: add_text = 'Automatically found ' else: add_text = 'Provided ' log.warning(add_text + 'integration parameters ' f'(npulses={params.get("npulses")}, ' + f'period={params.get("period")}) do not match the ' f'{bunchPattern} bunch pattern (npulses=' f'{npulses_bp}, period={period_bp}). Using bunch ' 'pattern parameters.') pulse_ids_params = pulse_ids_bp params['npulses'] = npulses_bp params['period'] = period_bp if not regular: # Irregular pattern if hasattr(params['pulseStart'], '__len__'): start = params['pulseStart'][0] else: start = params['pulseStart'] params['pulseStart'] = np.array( [int(start + (pid - pulse_ids_params[0]) * pulse_period) for pid in pulse_ids_params]) params['period'] = 0 else: # Regular pattern if hasattr(params['pulseStart'], '__len__'): params['pulseStart'] = params['pulseStart'][0] if len(pulse_ids_params) == 1: params['period'] = 0 return params, pulse_ids_params, regular, raw_trace
[docs]def check_peak_params(proposal, runNB, mnemonic, raw_trace=None, ntrains=200, integParams=None, bunchPattern='sase3', plot=True, show_all=False): """ Checks and plots the peak parameters (pulse window and baseline window of a raw digitizer trace) used to compute the peak integration. These parameters are either set by the digitizer peak-integration settings, or are determined by a peak finding algorithmes. The parameters can also be provided manually for visual inspection. The plot either shows the first and last pulse of the trace or the entire trace. Parameters ---------- proposal: int the proposal number runNB: int the run number mnemonic: str ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. raw_trace: optional, 1D numpy array or xarray DataArray Raw trace to display. If None, the average raw trace over ntrains of the corresponding channel is loaded (this can be time-consuming). ntrains: optional, int Only used if raw_trace is None. Number of trains used to calculate the average raw trace of the corresponding channel. plot: bool If True, displays the raw trace and peak integration regions. show_all: bool If True, displays the entire raw trace and all peak integration regions (this can be time-consuming). If False, shows the first and last pulse according to the bunchPattern. bunchPattern: optional, str Only used if plot is True. Checks the bunch pattern against the digitizer peak parameters and shows potential mismatch. Returns ------- dictionnary of peak integration parameters """ run = open_run(proposal, runNB) run_mnemonics = mnemonics_for_run(run) digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) pulse_period = 24 if digitizer == "FastADC" else 440 pattern = None try: if bunchPattern == 'sase3': pattern = XrayPulses(run) if bunchPattern == 'scs_ppl': pattern = OpticalLaserPulses(run) except Exception as e: log.warning(e) bunchPattern = None params, pulse_ids, regular, raw_trace = find_peak_integration_parameters( run, mnemonic, raw_trace, integParams, pattern) if bunchPattern: if regular: add_text = '' if len(pulse_ids) > 1: add_text = f's, {(pulse_ids[1]-pulse_ids[0]) * pulse_period}'\ + ' samples between two pulses' print(f'Bunch pattern {bunchPattern}: {len(pulse_ids)} pulse' + add_text) else: print(f'Bunch pattern {bunchPattern}: Not a regular pattern. ' f'{len(pulse_ids)} pulses, pulse_ids=' f'{pulse_ids}.') if integParams is None: title = 'Auto-find peak parameters' else: title = 'Provided peak parameters' no_change = True for k, v in integParams.items(): no_change = no_change & (v == params[k]) if hasattr(no_change, '__len__'): no_change = no_change.all() if no_change is False: print('The provided parameters did not match the bunch ' 'pattern and were adjusted.') title += ' (adjusted)' if regular: add_text = '' if params['npulses'] > 1: add_text = f's, {params["period"]} samples between two pulses' print(title + ': ' + f' {params["npulses"]} pulse' + add_text) else: print(f'{title}: Not a regular pattern. ' f'{len(pulse_ids)} pulses, pulse_ids=' f'{pulse_ids}.') if plot: if raw_trace is None: raw_trace = get_dig_avg_trace(run, mnemonic) fig, ax = plotPeakIntegrationWindow(raw_trace, params, show_all=show_all) fig.suptitle(f'p{proposal} r{runNB} ' + title, size=12) return params
def plotPeakIntegrationWindow(raw_trace, params, show_all=False): if hasattr(params['pulseStart'], '__len__'): starts = np.array(params['pulseStart']) stops = params['pulseStop'] + starts - params['pulseStart'][0] baseStarts = params['baseStart'] + starts - params['pulseStart'][0] baseStops = params['baseStop'] + starts - params['pulseStart'][0] else: n = params['npulses'] p = params['period'] starts = [params['pulseStart'] + i*p for i in range(n)] stops = [params['pulseStop'] + i*p for i in range(n)] baseStarts = [params['baseStart'] + i*p for i in range(n)] baseStops = [params['baseStop'] + i*p for i in range(n)] if show_all: fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True) for i in range(len(starts)): lbl = 'baseline' if i == 0 else None lp = 'peak' if i == 0 else None ax.axvline(baseStarts[i], ls='--', color='k') ax.axvline(baseStops[i], ls='--', color='k') ax.axvspan(baseStarts[i], baseStops[i], alpha=0.5, color='grey', label=lbl) ax.axvline(starts[i], ls='--', color='r') ax.axvline(stops[i], ls='--', color='r') ax.axvspan(starts[i], stops[i], alpha=0.2, color='r', label=lp) ax.plot(raw_trace, color='C0', label='raw trace') ax.legend(fontsize=8) return fig, [ax] fig, ax = plt.subplots(1, 2, figsize=(6, 3), constrained_layout=True) for plot in range(2): title = 'First pulse' if plot == 0 else 'Last pulse' i = 0 if plot == 0 else - 1 for j in range(min(2, len(starts))): if plot == 1: j = -j label = 'baseline' if j == 0 else '' ax[plot].axvline(baseStarts[i+j], ls='--', color='k') ax[plot].axvline(baseStops[i+j], ls='--', color='k') ax[plot].axvspan(baseStarts[i+j], baseStops[i+j], alpha=0.5, color='grey', label=label) label = 'peak' if j == 0 else '' ax[plot].axvline(starts[i+j], ls='--', color='r') ax[plot].axvline(stops[i+j], ls='--', color='r') ax[plot].axvspan(starts[i+j], stops[i+j], alpha=0.2, color='r', label=label) if len(starts) > 1: period = starts[1] - starts[0] xmin = np.max([0, baseStarts[i] - int(1.5*period)]) xmax = np.min([stops[i] + int(1.5*period), raw_trace.size]) else: xmin = np.max([0, baseStarts[i] - 200]) xmax = np.min([stops[i] + 200, raw_trace.size]) ax[plot].plot(np.arange(xmin, xmax), raw_trace[xmin:xmax], color='C0', label='raw trace') ax[plot].legend(fontsize=8) ax[plot].set_xlim(xmin, xmax) ax[plot].set_xlabel('digitizer samples') ax[plot].set_title(title, size=10) return fig, ax def digitizer_type(run, source): """ Finds the digitizer type based on the class Id / name of the source. Example source: 'SCS_UTC1_MCP/ADC/1'. Defaults to ADQ412 if not found. """ ret = None if '_MCP/ADC/1' in source: ret = 'FastADC' if '_ADQ/ADC/1' in source: ret = 'ADQ412' if ret is None: digi_dict = {'FastAdc': 'FastADC', 'FastAdcLegacy': 'FastADC', 'AdqDigitizer': 'ADQ412', 'PyADCChannel': 'FastADC', 'PyADCChannelLegacy': 'FastADC'} try: source = source.split(':')[0] classId = run.get_run_value(source, 'classId.value') ret = digi_dict.get(classId) except Exception as e: log.warning(str(e)) log.warning(f'Could not find digitizer type from source {source}.') ret = 'ADQ412' return ret
[docs]def get_digitizer_peaks(proposal, runNB, mnemonic, merge_with=None, bunchPattern='sase3', integParams=None, save=False, subdir='usr/processed_runs'): """ Extract the peaks from digitizer raw traces. The result can be merged to an existing `merge_with` dataset and/or saved into an HDF file. The calculation is a trapezoidal integration between 'pulseStart' and 'pulseStop' with subtraction of a baseline defined as the median between 'baseStart' and 'baseStop'. The integration parameters can either be provided using integParams, or computed by a peak finding algorithm if integParams is None. If the bunchPattern argument is provided, the pulse ids are aligned to it. If there is a mismatch between the provided parameters or the computed parameters and the bunch pattern, the bunch pattern parameters prevail. Parameters ---------- proposal: int the proposal number runNB: int the run number mnemonic: str mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd". The data is either loaded from the DataCollection or taken from merge_with. merge_with: xarray Dataset If provided, the resulting Dataset will be merged with this one. bunchPattern: str or dict 'sase1' or 'sase3' or 'scs_ppl', 'None': bunch pattern integParams: dict dictionnary for raw trace integration, e.g. {'pulseStart':100, 'pulsestop':200, 'baseStart':50, 'baseStop':99, 'period':24, 'npulses':500}. If None, integration parameters are computed automatically. Returns ------- xarray Dataset with digitizer peak variables. Raw variables are substituted by the peak caclulated values (e.g. "FastADC2raw" becomes "FastADC2peaks"). """ # prepare resulting dataset if bool(merge_with): mw_ds = merge_with.drop(mnemonic, errors='ignore') else: mw_ds = xr.Dataset() run = open_run(proposal, runNB) run_mnemonics = mnemonics_for_run(run) mnemo = run_mnemonics.get(mnemonic) if mnemo is None: log.warning('Mnemonic not found. Skipping.') return xr.DataArray() source, key = mnemo['source'], mnemo['key'] extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern) if extra_dim is None: extra_dim = 'pulseId' pattern = None try: if bunchPattern == 'sase3': pattern = XrayPulses(run) if bunchPattern == 'scs_ppl': pattern = OpticalLaserPulses(run) except Exception as e: log.warning(e) # load traces and calculate the average traces = run[source, key].xarray(name=mnemonic, extra_dims=mnemo['dim']) trace = traces.mean('trainId').rename(mnemonic.replace('raw', 'avg')) # find integration parameters params, pulse_ids, regular, trace = find_peak_integration_parameters( run, mnemonic, trace, integParams, pattern) # extract peaks peaks = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim) peaks = peaks.rename(mnemonic.replace('raw', 'peaks')) # Align pulse id if regular is True: peaks = peaks.assign_coords({extra_dim: pulse_ids}) elif pattern: mask = pattern.pulse_mask(labelled=False) mask = xr.DataArray(mask, dims=['trainId', extra_dim], coords={'trainId': run[source, key].train_ids, extra_dim: np.arange(mask.shape[1])}) mask = mask.sel({extra_dim: pulse_ids}) peaks = peaks.where(mask, drop=True) # add peak integration parameters attributes for p in params: if params[p] is None: params[p] = 'None' peaks.attrs[f'{peaks.name}_{p}'] = params[p] # save peaks if save: save_peaks(proposal, runNB, peaks, trace, subdir) # merge with existing dataset ds = mw_ds.merge(peaks, join='inner') return ds
def save_peaks(proposal, runNB, peaks, avg_trace, subdir): ''' Save the peaks extracted with extract_digitizer_peaks() as well as the average raw trace in a dataset at the proposal + subdir location. If a dataset already exists, the new data is merged with it. Parameters ---------- proposal: int the proposal number runNB: int the run number peaks: xarray DataArray the 2D-array obtained by extract_digitizer_peaks() avg_trace: xarray DataArray the average raw trace over the trains subdir: str subdirectory. The data is stored in <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 Returns ------- None ''' root = find_proposal(f'p{proposal:06d}') path = os.path.join(root, subdir + f'/r{runNB:04d}/') os.makedirs(path, exist_ok=True) fname = path + f'r{runNB:04d}-digitizers-data.h5' ds_peaks = peaks.to_dataset(promote_attrs=True) if os.path.isfile(fname): ds = xr.load_dataset(fname) ds = ds.drop_vars([peaks.name, avg_trace.name], errors='ignore') for dim in ds.dims: if all(dim not in ds[v].dims for v in ds): ds = ds.drop_dims(dim) dim_name = 'sampleId' if 'sampleId' in ds.dims and ds.sizes['sampleId'] != len(avg_trace): dim_name = 'sampleId2' avg_trace = avg_trace.rename({avg_trace.dims[0]: dim_name}) if f'params_{peaks.name}' in ds.attrs: del ds.attrs[f'params_{peaks.name}'] ds = xr.merge([ds, ds_peaks, avg_trace], combine_attrs='drop_conflicts', join='inner') else: ds = ds_peaks.merge(avg_trace.rename({avg_trace.dims[0]: 'sampleId'})) ds.to_netcdf(fname, format='NETCDF4') print(f'saved data into {fname}.') return None
[docs]def load_processed_peaks(proposal, runNB, mnemonic=None, data='usr/processed_runs', merge_with=None): """ Load processed digitizer peaks data. Parameters ---------- proposal: int the proposal number runNB: int the run number mnemonic: str the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. If None, the entire dataset is loaded data: str subdirectory. The data is stored in <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 merge_with: xarray Dataset A dataset to merge the data with. Returns ------- xarray DataArray if menmonic is not None and merge_with is None xarray Dataset if mnemonic is None or merge_with is not None. Example ------- # load the mono energy and the MCP_BIG peaks run, ds = tb.load(proposal, runNB, 'nrj') ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks', merge_with=ds) """ if mnemonic is None: return load_all_processed_peaks(proposal, runNB, data, merge_with) root = find_proposal(f'p{proposal:06d}') path = os.path.join(root, data + f'/r{runNB:04d}/') fname = path + f'r{runNB:04d}-digitizers-data.h5' if os.path.isfile(fname): ds = xr.load_dataset(fname) if mnemonic not in ds: log.warning(f'Mnemonic {mnemonic} not found in {fname}') return {} da = ds[mnemonic] da.attrs = {k: ds.attrs[k] for k in ds.attrs if f'{mnemonic}_' in k} if merge_with is not None: return merge_with.merge(da.to_dataset(promote_attrs=True), combine_attrs='drop_conflicts', join='inner') else: return da else: log.warning(f'Mnemonic {mnemonic} not found in {fname}') return merge_with
def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs', merge_with=None): """ Load processed digitizer peaks dataset. The data contains the peaks, the average raw trace and the integration parameters (attribute) of each processed digitizer source. Parameters ---------- proposal: int the proposal number runNB: int the run number data: str subdirectory. The data is stored in <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 merge_with: xarray Dataset A dataset to merge the data with. Returns ------- xarray Dataset """ root = find_proposal(f'p{proposal:06d}') path = os.path.join(root, data + f'/r{runNB:04d}/') fname = path + f'r{runNB:04d}-digitizers-data.h5' if os.path.isfile(fname): if merge_with is not None: return merge_with.merge(xr.load_dataset(fname), combine_attrs='drop_conflicts', join='inner') return xr.load_dataset(fname) else: log.warning(f'{fname} not found.') return merge_with
[docs]def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_runs', plot=True, show_all=False): """ Check the integration parameters used to generate the processed peak values. Parameters ---------- proposal: int the proposal number runNB: int the run number mnemonic: str the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. If None, the entire dataset is loaded data: str subdirectory. The data is stored in <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 plot: bool If True, displays the raw trace and peak integration regions. show_all: bool If True, displays the entire raw trace and all peak integration regions (this can be time-consuming). If False, shows the first and last pulses. Returns ------- params: dict the integration parameters with keys ['pulseStart', 'pulseStop', 'baseStart', 'baseStop', 'period', 'npulses']. See `extract_digitizer_peaks()`. """ root = find_proposal(f'p{proposal:06d}') path = os.path.join(root, data + f'/r{runNB:04d}/') fname = path + f'r{runNB:04d}-digitizers-data.h5' if os.path.isfile(fname): ds = xr.load_dataset(fname) if mnemonic.replace('raw', 'peaks') not in ds: log.warning(f'Mnemonic {mnemonic} not found in {fname}') return {} params = {k.replace(f'{mnemonic}_', ''): ds.attrs[k] for k in ds.attrs if f'{mnemonic}_' in k} if plot: title = 'Processed peak parameters' fig, ax = plotPeakIntegrationWindow( ds[mnemonic.replace('peaks', 'avg')], params, show_all=show_all) fig.suptitle(f'p{proposal} r{runNB} ' + title, size=12) return params else: log.warning(f'{fname} not found.') return {}