Source code for toolbox_scs.detectors.digitizers

""" Digitizers related sub-routines

    Copyright (2021) SCS Team.

    (contributions preferrably comply with pep8 code structure
    guidelines.)
"""

import logging

import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from scipy.signal import fftconvolve

from ..misc.bunch_pattern_external import is_pulse_at
from ..util.exceptions import ToolBoxValueError
from ..mnemonics_machinery import (mnemonics_to_process,
                                   mnemonics_for_run)

__all__ = [
    'check_peak_params',
    'get_digitizer_peaks',
    'get_laser_peaks',
    'get_peaks',
    'get_tim_peaks',
    'digitizer_signal_description',
    'get_dig_avg_trace'
]

log = logging.getLogger(__name__)


def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop,
                         period, npulses, extra_dim):
    """
    Computes peaks from raw digitizer traces by trapezoidal integration.

    Parameters
    ----------
    traces: xarray DataArray or numpy array containing raw traces. If
        numpy array is provided, the second dimension is that of the samples.
    pulseStart: int or list or 1D-numpy array
        trace index of integration start. If 1d array, each value is the start
        of one peak. The period and npulses parameters are ignored.
    pulseStop: int
        trace index of integration stop.
    baseStart: int
        trace index of background start.
    baseStop: int
        trace index of background stop.
    period: int
        number of samples between two peaks. Ignored if intstart is a 1D-array.
    npulses: int
        number of pulses. Ignored if intstart is a 1D-array.
    extra_dim: str
        Name given to the dimension along the peaks.

    Returns
    -------
    xarray DataArray
    """

    assert len(traces.shape) == 2

    if isinstance(traces, xr.DataArray):
        ntid = traces.sizes['trainId']
        coords = traces.coords
        traces = traces.values
        if traces.shape[0] != ntid:
            traces = traces.T
    else:
        coords = None

    if hasattr(pulseStart, '__len__'):
        pulseStart = np.array(pulseStart)
        pulses = pulseStart - pulseStart[0]
        pulseStart = pulseStart[0]
    else:
        pulses = range(0, npulses*period, period)

    results = xr.DataArray(np.empty((traces.shape[0], len(pulses))),
                           coords=coords,
                           dims=['trainId', extra_dim])

    for i, p in enumerate(pulses):
        a = pulseStart + p
        b = pulseStop + p
        bkga = baseStart + p
        bkgb = baseStop + p
        if b > traces.shape[1]:
            break
        bg = np.outer(np.median(traces[:, bkga:bkgb], axis=1),
                      np.ones(b-a))
        results[:, i] = np.trapz(traces[:, a:b] - bg, axis=1)
    return results


def peaks_from_apd(array, params, digitizer, bpt, bunchPattern):
    """
    Extract peak-integrated data according to the bunch pattern.

    Parameters
    ----------
    array: xarray DataArray
        2D array containing peak-integrated data
    params: dict
        peak-integration parameters of the digitizer
    digitizer: str
        digitizer type, one of {'FastADC', 'ADQ412'}
    bpt: xarray DataArray
        bunch pattern table
    bunchPattern: str
        one of {'sase1', 'sase3', 'scs_ppl'}, used to select pulses and
        assign name of the pulse id dimension.

    Returns
    -------
    xarray DataArray with pulse id coordinates.
    """
    if params['enable'] == 0 or (array == 1.).all():
        raise ValueError('The digitizer did not record integrated peaks. '
                         'Consider using raw traces from the same channel '
                         'for peak integration.')
    if digitizer == 'FastADC':
        min_distance = 24
    if digitizer == 'ADQ412':
        min_distance = 440
    period = params['period']
    if period % min_distance != 0:
        log.warning(f'Warning: the pulse period ({period} samples) of '
                    'digitizer is not a multiple of the pulse separation at '
                    f'4.5 MHz ({min_distance} samples). Pulse id assignment '
                    'is likely to fail.')
    stride = int(period/min_distance)
    npulses_apd = params['npulses']
    dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId'}
    pulse_dim = dim_names[bunchPattern]
    arr_dim = [dim for dim in array.dims if dim != 'trainId'][0]
    if npulses_apd > array.sizes[arr_dim]:
        log.warning(f'The digitizer was set to record {npulses_apd} pulses '
                    f'but the array length is only {array.sizes[arr_dim]}.')
        npulses_apd = array.sizes[arr_dim]
    mask = is_pulse_at(bpt, bunchPattern).rename({'pulse_slot': pulse_dim})
    mask = mask.where(mask.trainId.isin(array.trainId), drop=True)
    mask = mask.assign_coords({pulse_dim: np.arange(bpt.sizes['pulse_slot'])})
    pid = np.sort(np.unique(np.where(mask)[1]))
    npulses_bpt = len(pid)
    apd_coords = np.arange(pid[0], pid[0] + stride*npulses_apd, stride)
    noalign = False
    if len(np.intersect1d(apd_coords, pid, assume_unique=True)) < npulses_bpt:
        log.warning('Not all pulses were recorded. The digitizer '
                    'was set to record pulse ids '
                    f'{apd_coords[apd_coords<bpt.sizes["pulse_slot"]]} but the'
                    'bunch pattern for'
                    f' {bunchPattern} is {pid}. Skipping pulse ID alignment.')
        noalign = True
    array = array.isel({arr_dim: slice(0, npulses_apd)})
    array = array.where(array != 1.)
    if noalign:
        return array
    array = array.rename(
        {arr_dim: pulse_dim}).assign_coords({pulse_dim: apd_coords})
    array, mask = xr.align(array, mask, join='inner')
    array = array.where(mask, drop=True)
    return array


[docs]def get_peaks(run, data, mnemonic, useRaw=True, autoFind=True, integParams=None, bunchPattern='sase3', bpt=None, extra_dim=None, indices=None, ): """ Extract peaks from one source (channel) of a digitizer. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data data: xarray DataArray or str array containing the raw traces or peak-integrated values from the digitizer. If str, must be one of the ToolBox mnemonics. If None, the data is loaded via the source and key arguments. mnemonic: str or dict ToolBox mnemonic or dict with source and key as in {'source': 'SCS_UTC1_ADQ/ADC/1:network', 'key': 'digitizers.channel_1_A.raw.samples'} useRaw: bool If True, extract peaks from raw traces. If False, uses the APD (or peaks) data from the digitizer. autoFind: bool If True, finds integration parameters by inspecting the average raw trace. Only valid if useRaw is True. integParams: dict dictionnary containing the integration parameters for raw trace integration: 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', 'period', 'npulses'. Not used if autoFind is True. All keys are required when bunch pattern is missing. bunchPattern: str match the peaks to the bunch pattern: 'sase1', 'sase3', 'scs_ppl'. This will dictate the name of the pulse ID coordinates: 'sa1_pId', 'sa3_pId' or 'scs_ppl'. bpt: xarray DataArray bunch pattern table extra_dim: str Name given to the dimension along the peaks. If None, the name is given according to the bunchPattern. indices: array, slice indices from the peak-integrated data to retrieve. Only required when bunch pattern is missing and useRaw is False. Returns ------- xarray.DataArray containing digitizer peaks with pulse coordinates """ arr = data dim = [d for d in arr.dims if d != 'trainId'][0] # Load bunch pattern table run_mnemonics = mnemonics_for_run(run) if bpt is None and bunchPattern != 'None': if 'bunchPatternTable' in run_mnemonics: m = run_mnemonics['bunchPatternTable'] bpt = run.get_array(m['source'], m['key'], m['dim']) pattern = bunchPattern else: pattern = bunchPattern if bunchPattern == 'None': bpt = None # Find digitizer type m = mnemonic if isinstance(mnemonic, dict) else run_mnemonics[mnemonic] digitizer = digitizer_type(run, m.get('source')) # 1. Peak-integrated data from digitizer if useRaw is False: # 1.1 No bunch pattern provided if bpt is None: log.info('Missing bunch pattern info.') if indices is None: raise TypeError('indices argument must be provided ' 'when bunch pattern info is missing.') if extra_dim is None: extra_dim = 'pulseId' return arr.isel({dim: indices}).rename({dim: extra_dim}) # 1.2 Bunch pattern is provided if isinstance(mnemonic, dict): peak_params = channel_peak_params(run, mnemonic.get('source'), mnemonic.get('key')) else: peak_params = channel_peak_params(run, mnemonic) log.debug(f'Digitizer peak integration parameters: {peak_params}') return peaks_from_apd(arr, peak_params, digitizer, bpt, bunchPattern) # 2. Use raw data from digitizer # minimum pulse period @ 4.5MHz, according to digitizer type min_distance = 1 if digitizer == 'FastADC': min_distance = 24 if digitizer == 'ADQ412': min_distance = 440 if autoFind: stride = int(np.max([1, np.floor(arr.sizes['trainId']/200)])) trace = arr.isel(trainId=slice(0, None, stride)).mean(dim='trainId') try: integParams = find_integ_params(trace) except ValueError as err: log.warning(f'{err}, trying with averaged trace over all trains.') trace = arr.mean(dim='trainId') integParams = find_integ_params(trace) log.debug(f'Auto find peaks result: {integParams}') required_keys = ['pulseStart', 'pulseStop', 'baseStart', 'baseStop', 'period', 'npulses'] if integParams is None or not all(name in integParams for name in required_keys): raise TypeError('All keys of integParams argument ' f'{required_keys} are required when ' 'bunch pattern info is missing.') # 2.1. No bunch pattern provided if bpt is None: log.info('Missing bunch pattern info.') log.debug(f'Retrieving {integParams["npulses"]} pulses.') if extra_dim is None: extra_dim = 'pulseId' return peaks_from_raw_trace(arr, integParams['pulseStart'], integParams['pulseStop'], integParams['baseStart'], integParams['baseStop'], integParams['period'], integParams['npulses'], extra_dim=extra_dim) # 2.2 Bunch pattern is provided # load mask and extract pulse Id: dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId', 'None': 'pulseId'} extra_dim = dim_names[pattern] valid_tid = np.intersect1d(arr.trainId, bpt.trainId, assume_unique=True) mask = is_pulse_at(bpt, pattern) pattern_changed = ~(mask == mask[0]).all().values if not pattern_changed: pid = np.nonzero(mask[0].values)[0] valid_arr = arr else: mask = is_pulse_at(bpt.sel(trainId=valid_tid), pattern) mask = mask.rename({'pulse_slot': extra_dim}) mask = mask.assign_coords({extra_dim: np.arange(bpt.sizes['pulse_slot'])}) mask_on = mask.where(mask, drop=True).fillna(False).astype(bool) log.info(f'Bunch pattern of {pattern} changed during the run.') pid = mask_on.coords[extra_dim] # select trains containing pulses valid_arr = arr.sel(trainId=mask_on.trainId) npulses = len(pid) log.debug(f'Bunch pattern: {npulses} pulses for {pattern}.') if npulses == 1: period_bpt = 0 else: period_bpt = np.min(np.diff(pid)) if autoFind and period_bpt*min_distance != integParams['period']: log.warning('The period from the bunch pattern is different than ' 'that found by the peak-finding algorithm. Either ' 'the algorithm failed or the bunch pattern source ' f'({bunchPattern}) is not correct.') # create array of sample indices for peak integration sample_id = (pid-pid[0])*min_distance # override auto find parameters if isinstance(integParams['pulseStart'], (int, np.integer)): integParams['pulseStart'] = integParams['pulseStart'] + sample_id peaks = peaks_from_raw_trace(valid_arr, integParams['pulseStart'], integParams['pulseStop'], integParams['baseStart'], integParams['baseStop'], integParams['period'], integParams['npulses'], extra_dim) if pattern_changed: peaks = peaks.where(mask_on, drop=True) return peaks.assign_coords({extra_dim: pid})
[docs]def get_dig_avg_trace(run, mnemonic, ntrains=None): """ Compute the average over ntrains evenly spaced accross all trains of a digitizer trace. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. ntrains: int Number of trains used to calculate the average raw trace. If None, all trains are used. Returns ------- trace: DataArray The average digitizer trace """ run_mnemonics = mnemonics_for_run(run) if ntrains is None: sel = run else: total_tid = len(run.train_ids) stride = int(np.max([1, np.floor(total_tid/ntrains)])) sel = run.select_trains(np.s_[0:None:stride]) m =run_mnemonics[mnemonic] raw_trace = sel.get_array(m['source'], m['key'], m['dim']) raw_trace = raw_trace.mean(dim='trainId') return raw_trace
def channel_peak_params(run, source, key=None, channel=None, board=None): """ Extract peak-integration parameters used by a channel of the digitizer. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. source: str ToolBox mnemonic of a digitizer data, e.g. "MCP2apd" or "FastADC4peaks", or name of digitizer source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'. key: str optional, used in combination of source (if source is not a ToolBox mnemonics) instead of digitizer, channel and board. channel: int or str The digitizer channel for which to retrieve the parameters. If None, inferred from the source mnemonic or source + key arguments. board: int Board of the ADQ412. If None, inferred from the source mnemonic or source + key arguments. Returns ------- dict with peak integration parameters. """ run_mnemonics = mnemonics_for_run(run) if source in run_mnemonics: m = run_mnemonics[source] source = m['source'] key = m['key'] if 'network' in source: return adq412_channel_peak_params(run, source, key, channel, board) else: return fastADC_channel_peak_params(run, source, channel) def fastADC_channel_peak_params(run, source, channel=None): """ Extract peak-integration parameters used by a channel of the Fast ADC. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. source: str Name of Fast ADC source, e.g. 'SCS_UTC1_MCP/ADC/1:channel_5.output'. channel: int The Fast ADC channel for which to retrieve the parameters. If None, inferred from the source. Returns ------- dict with peak integration parameters. """ fastADC_keys = { 'baseStart': ('baselineSettings.baseStart.value', 'baseStart.value'), 'baseStop': ('baseStop.value',), 'baseLength': ('baselineSettings.length.value',), 'enable': ('enablePeakComputation.value',), 'pulseStart': ('initialDelay.value',), 'pulseLength': ('peakSamples.value',), 'npulses': ('numPulses.value',), 'period': ('pulsePeriod.value',) } if channel is None: channel = int(source.split(':')[1].split('.')[0].split('_')[1]) baseName = f'channel_{channel}.' source = source.split(':')[0] data = run.select(source).train_from_index(0)[1][source] result = {} for mnemo, versions in fastADC_keys.items(): for v in versions: key = baseName + v if key in data: result[mnemo] = data[key] if 'baseLength' in result: result['baseStop'] = result['baseStart'] + \ result.pop('baseLength') if 'pulseLength' in result: result['pulseStop'] = result['pulseStart'] + \ result.pop('pulseLength') return result def adq412_channel_peak_params(run, source, key=None, channel=None, board=None): """ Extract peak-integration parameters used by a channel of the ADQ412. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. source: str Nname of ADQ412 source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'. key: str optional, e.g. 'digitizers.channel_1_D.apd.pulseIntegral', used in combination of source instead of channel and board. channel: int or str The ADQ412 channel for which to retrieve the parameters. If None, inferred from the source mnemonic or source + key arguments. board: int Board of the ADQ412. If None, inferred from the source mnemonic or source + key arguments. Returns ------- dict with peak integration parameters. """ if key is None: if channel is None or board is None: raise ValueError('key or channel + board arguments are ' 'missing.') else: k = key.split('.')[1].split('_') ch_to_int = {'A': 0, 'B': 1, 'C': 2, 'D': 3} channel = ch_to_int[k[2]] board = k[1] baseName = f'board{board}.apd.channel_{channel}.' source = source.split(':')[0] adq412_keys = { 'baseStart': (baseName + 'baseStart.value',), 'baseStop': (baseName + 'baseStop.value',), 'enable': (baseName + 'enable.value',), 'pulseStart': (baseName + 'pulseStart.value',), 'pulseStop': (baseName + 'pulseStop.value',), 'initialDelay': (baseName + 'initialDelay.value',), 'upperLimit': (baseName + 'upperLimit.value',), 'npulses': (f'board{board}.apd.numberOfPulses.value',) } data = run.select(source).train_from_index(0)[1][source] result = {} for mnemo, versions in adq412_keys.items(): for key in versions: if key in data: result[mnemo] = data[key] result['period'] = result.pop('upperLimit') - \ result.pop('initialDelay') return result def find_integ_params(trace, height=None, width=1): """ Find integration parameters for peak integration of a raw digitizer trace. Based on scipy find_peaks(). Parameters ---------- trace: numpy array or xarray DataArray The digitier raw trace used to find peaks height: int minimum threshold for peak determination width: int minimum width of peak Returns ------- dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', 'period', 'npulses' and values in number of samples. """ if isinstance(trace, xr.DataArray): trace = trace.values trace_norm = trace - np.median(trace) trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100]) if SNR < 10: log.warning('signal-to-noise ratio too low: cannot ' 'automatically find peaks.') return {'pulseStart': 2, 'pulseStop': 3, 'baseStart': 0, 'baseStop': 1, 'period': 0, 'npulses': 1} # Compute autocorrelation using fftconvolve # from "https://stackoverflow.com/questions/12323959/" # "fast-cross-correlation-method-in-python" if len(trace_norm) % 2 == 0: n = len(trace_norm) else: n = len(trace_norm) - 1 hl = int(n/2) a = trace_norm[:n] b = np.zeros(2*n) b[hl: hl + n] = a # Do an array flipped convolution, which is a correlation. ac_trace = fftconvolve(b, a[::-1], mode='valid') # slower approach: # ac_trace = np.correlate(trace_norm, trace_norm, mode='same') n = int(len(ac_trace)/2) # estimate quality of ac_trace and define min height to detect peaks factor = np.abs(ac_trace.max() / np.median(ac_trace)) factor = 3 if factor > 20 else 1.5 ac_peaks = find_peaks(ac_trace[n:], height=ac_trace[n:].max()/factor) if len(ac_peaks[0]) == 0: period = 0 distance = 1 elif len(ac_peaks[0]) == 1: period = ac_peaks[0][0] distance = max(1, period-6) else: period = int(np.median(ac_peaks[0][1:] - ac_peaks[0][:-1])) distance = max(1, period-6) # smaller than period to account for all peaks # define min height to detect peaks depending on signal quality f = np.max([3, np.min([(SNR/10), 4])]) height = trace_norm.max() / f peaks, dic = find_peaks(trace_norm, distance=distance, height=height, width=1) # filter out peaks that are not periodically spaced idx = np.ones(len(peaks), dtype=bool) idx[1:] = np.isclose(peaks[1:] - peaks[:-1], np.ones(len(peaks[1:]))*period, atol=6) peaks = peaks[idx] for d in dic: dic[d] = dic[d][idx] npulses = len(peaks) if npulses == 0: raise ValueError('Could not automatically find peaks.') pulseStart = np.mean(dic['left_ips'] - peaks + peaks[0], dtype=int) pulseStop = np.mean(dic['right_ips'] - peaks + peaks[0], dtype=int) baseStop = pulseStart - np.mean(peaks - dic['left_ips'])/2 - 1 baseStop = np.min([baseStop, pulseStart]).astype(int) baseStop = np.max([baseStop, 0]).astype(int) baseStart = baseStop - np.mean(dic['widths'])/2 baseStart = np.max([baseStart, 0]).astype(int) result = {'pulseStart': pulseStart, 'pulseStop': pulseStop, 'baseStart': baseStart, 'baseStop': baseStop, 'period': period, 'npulses': npulses} return result def get_peak_params(run, mnemonic, raw_trace=None, ntrains=200): """ Get the peak region and baseline region of a raw digitizer trace used to compute the peak integration. These regions are either those of the digitizer peak-integration settings or those determined in get_tim_peaks or get_fast_adc_peaks when the inputs are raw traces. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. raw_trace: optional, 1D numpy array or xarray DataArray Raw trace to display. If None, the average raw trace over ntrains of the corresponding channel is loaded (this can be time-consuming). ntrains: optional, int Only used if raw_trace is None. Number of trains used to calculate the average raw trace of the corresponding channel. """ run_mnemonics = mnemonics_for_run(run) if mnemonic not in run_mnemonics: raise ToolBoxValueError("mnemonic must be a ToolBox mnemonics") if "raw" not in mnemonic: params = channel_peak_params(run, mnemonic) if 'enable' in params and params['enable'] == 0: log.warning('The digitizer did not record peak-integrated data.') title = 'Digitizer peak params' else: title = 'Auto-find peak params' if raw_trace is None: raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) params = find_integ_params(raw_trace) log.debug(f'{title} for {mnemonic}: {params}') return params
[docs]def check_peak_params(run, mnemonic, raw_trace=None, ntrains=200, params=None, plot=True, show_all=False, bunchPattern='sase3'): """ Checks and plots the peak parameters (pulse window and baseline window of a raw digitizer trace) used to compute the peak integration. These parameters are either set by the digitizer peak-integration settings, or are determined by a peak finding algorithm (used in get_tim_peaks or get_fast_adc_peaks) when the inputs are raw traces. The parameters can also be provided manually for visual inspection. The plot either shows the first and last pulse of the trace or the entire trace. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. raw_trace: optional, 1D numpy array or xarray DataArray Raw trace to display. If None, the average raw trace over ntrains of the corresponding channel is loaded (this can be time-consuming). ntrains: optional, int Only used if raw_trace is None. Number of trains used to calculate the average raw trace of the corresponding channel. plot: bool If True, displays the raw trace and peak integration regions. show_all: bool If True, displays the entire raw trace and all peak integration regions (this can be time-consuming). If False, shows the first and last pulse according to the bunchPattern. bunchPattern: optional, str Only used if plot is True. Checks the bunch pattern against the digitizer peak parameters and shows potential mismatch. Returns ------- dictionnary of peak integration parameters """ run_mnemonics = mnemonics_for_run(run) if "raw" in mnemonic: mnemo_raw = mnemonic title = 'Auto-find peak params' else: mnemo_raw = mnemonic.replace('peaks', 'raw').replace('apd', 'raw') title = 'Digitizer peak params' if raw_trace is None: raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) if params is None: params = get_peak_params(run, mnemonic, raw_trace) if 'enable' in params and params['enable'] == 0: log.warning('The digitizer did not record peak-integrated data.') if not plot: return params digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) min_distance = 24 if digitizer == "FastADC" else 440 if 'bunchPatternTable' in run_mnemonics and bunchPattern != 'None': sel = run.select_trains(np.s_[:ntrains]) bp_params = {} m = run_mnemonics['bunchPatternTable'] bpt = sel.get_array(m['source'], m['key'], m['dim']) mask = is_pulse_at(bpt, bunchPattern) pid = np.sort(np.unique(np.where(mask)[1])) bp_params['npulses'] = len(pid) if bp_params['npulses'] == 1: bp_params['period'] = 0 else: bp_params['period'] = np.diff(pid)[0] * min_distance print(f'bunch pattern {bunchPattern}: {bp_params["npulses"]} pulses,' f' {bp_params["period"]} samples between two pulses') else: bp_params = None print(f'{title}: {params["npulses"]} pulses, {params["period"]}' ' samples between two pulses') fig, ax = plotPeakIntegrationWindow(raw_trace, params, bp_params, show_all) ax[0].set_ylabel(mnemo_raw) fig.suptitle(title, size=12) return params
def plotPeakIntegrationWindow(raw_trace, params, bp_params, show_all=False): if show_all: fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True) n = params['npulses'] p = params['period'] for i in range(n): lbl = 'baseline' if i == 0 else None lp = 'peak' if i == 0 else None ax.axvline(params['baseStart'] + i*p, ls='--', color='k') ax.axvline(params['baseStop'] + i*p, ls='--', color='k') ax.axvspan(params['baseStart'] + i*p, params['baseStop'] + i*p, alpha=0.5, color='grey', label=lbl) ax.axvline(params['pulseStart'] + i*p, ls='--', color='r') ax.axvline(params['pulseStop'] + i*p, ls='--', color='r') ax.axvspan(params['pulseStart'] + i*p, params['pulseStop'] + i*p, alpha=0.2, color='r', label=lp) ax.plot(raw_trace, color='C0', label='raw trace') ax.legend(fontsize=8) return fig, [ax] if bp_params is not None: npulses = bp_params['npulses'] period = bp_params['period'] else: npulses = params['npulses'] period = params['period'] xmin = np.max([0, params['baseStart']-100]) xmax = np.min([params['pulseStop']+100, raw_trace.size]) fig, ax = plt.subplots(1, 2, figsize=(6, 3), constrained_layout=True) ax[0].axvline(params['baseStart'], ls='--', color='k') ax[0].axvline(params['baseStop'], ls='--', color='k') ax[0].axvspan(params['baseStart'], params['baseStop'], alpha=0.5, color='grey', label='baseline') ax[0].axvline(params['pulseStart'], ls='--', color='r') ax[0].axvline(params['pulseStop'], ls='--', color='r') ax[0].axvspan(params['pulseStart'], params['pulseStop'], alpha=0.2, color='r', label='peak') ax[0].plot(np.arange(xmin, xmax), raw_trace[xmin:xmax], color='C0', label='1st pulse') ax[0].legend(fontsize=8) ax[0].set_xlim(xmin, xmax) ax[0].set_xlabel('digitizer samples') ax[0].set_title('First pulse', size=10) xmin2 = xmin + (npulses-1) * period xmax2 = xmax + (npulses-1) * period p = params['period'] lbl = 'baseline' lp = 'peak' for i in range(params['npulses']): mi = params['baseStart'] + i*p if not xmin2 < mi < xmax2: continue ax[1].axvline(params['baseStart'] + i*p, ls='--', color='k') ax[1].axvline(params['baseStop'] + i*p, ls='--', color='k') ax[1].axvspan(params['baseStart'] + i*p, params['baseStop'] + i*p, alpha=0.5, color='grey', label=lbl) ax[1].axvline(params['pulseStart'] + i*p, ls='--', color='r') ax[1].axvline(params['pulseStop'] + i*p, ls='--', color='r') ax[1].axvspan(params['pulseStart'] + i*p, params['pulseStop'] + i*p, alpha=0.2, color='r', label=lp) lbl = None lp = None if xmax2 < raw_trace.size: ax[1].plot(np.arange(xmin2, xmax2), raw_trace[xmin2:xmax2], color='C0', label='last pulse') else: log.warning('The digitizer raw trace is too short to contain ' + 'all the pulses.') ax[1].legend(fontsize=8) ax[1].set_xlabel('digitizer samples') ax[1].set_xlim(xmin2, xmax2) ax[1].set_title('Last pulse', size=10) return fig, ax def digitizer_type(run, source): """ Finds the digitizer type based on the class Id / name of the source. Example source: 'SCS_UTC1_MCP/ADC/1'. Defaults to ADQ412 if not found. """ ret = None if '_MCP/ADC/1' in source: ret = 'FastADC' if '_ADQ/ADC/1' in source: ret = 'ADQ412' if ret is None: digi_dict = {'FastAdc': 'FastADC', 'FastAdcLegacy': 'FastADC', 'AdqDigitizer': 'ADQ412', 'PyADCChannel': 'FastADC', 'PyADCChannelLegacy': 'FastADC' } try: source = source.split(':')[0] classId = run.get_run_value(source, 'classId.value') ret = digi_dict.get(classId) except Exception as e: log.warning(str(e)) log.warning(f'Could not find digitizer type from source {source}.') ret = 'ADQ412' return ret
[docs]def get_tim_peaks(run, mnemonic=None, merge_with=None, bunchPattern='sase3', integParams=None, keepAllSase=False): """ Automatically computes TIM peaks. Sources can be loaded on the fly via the mnemonics argument, or processed from an existing data set (merge_with). The bunch pattern table is used to assign the pulse id coordinates. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str mnemonics for TIM, e.g. "MCP2apd". merge_with: xarray Dataset If provided, the resulting Dataset will be merged with this one. The TIM variables of merge_with (if any) will also be computed and merged. bunchPattern: str 'sase1' or 'sase3' or 'scs_ppl', bunch pattern used to extract peaks. The pulse ID dimension will be named 'sa1_pId', 'sa3_pId' or 'ol_pId', respectively. integParams: dict dictionnary for raw trace integration, e.g. {'pulseStart':100, 'pulsestop':200, 'baseStart':50, 'baseStop':99, 'period':24, 'npulses':500}. If None, integration parameters are computed automatically. keepAllSase: bool Only relevant in case of sase-dedicated trains. If True, all trains are kept, else only those of the bunchPattern are kept. Returns ------- xarray Dataset with TIM variables substituted by the peak caclulated values (e.g. "MCP2raw" becomes "MCP2peaks"), merged with Dataset *merge_with* if provided. """ return get_digitizer_peaks(run, mnemonic, merge_with, bunchPattern, integParams, keepAllSase)
[docs]def get_laser_peaks(run, mnemonic=None, merge_with=None, bunchPattern='scs_ppl', integParams=None): """ Extracts laser photodiode signal (peak intensity) from Fast ADC digitizer. Sources can be loaded on the fly via the mnemonics argument, and/or processed from an existing data set (merge_with). The PP laser bunch pattern is used to assign the pulse idcoordinates. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str mnemonic for FastADC corresponding to laser signal, e.g. "FastADC2peaks" or 'I0_ILHraw'. merge_with: xarray Dataset If provided, the resulting Dataset will be merged with this one. The FastADC variables of merge_with (if any) will also be computed and merged. bunchPattern: str 'sase1' or 'sase3' or 'scs_ppl', bunch pattern used to extract peaks. integParams: dict dictionnary for raw trace integration, e.g. {'pulseStart':100, 'pulsestop':200, 'baseStart':50, 'baseStop':99, 'period':24, 'npulses':500}. If None, integration parameters are computed automatically. Returns ------- xarray Dataset with all Fast ADC variables substituted by the peak caclulated values (e.g. "FastADC2raw" becomes "FastADC2peaks"). """ return get_digitizer_peaks(run, mnemonic, merge_with, bunchPattern, integParams, False)
[docs]def get_digitizer_peaks(run, mnemonic, merge_with=None, bunchPattern='sase3', integParams=None, keepAllSase=False): """ Automatically computes digitizer peaks. A source can be loaded on the fly via the mnemonic argument, or processed from an existing data set (merge_with). The bunch pattern table is used to assign the pulse id coordinates. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. mnemonic: str mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd". The data is either loaded from the DataCollection or taken from merge_with. merge_with: xarray Dataset If provided, the resulting Dataset will be merged with this one. bunchPattern: str or dict 'sase1' or 'sase3' or 'scs_ppl', 'None': bunch pattern integParams: dict dictionnary for raw trace integration, e.g. {'pulseStart':100, 'pulsestop':200, 'baseStart':50, 'baseStop':99, 'period':24, 'npulses':500}. If None, integration parameters are computed automatically. keepAllSase: bool Only relevant in case of sase-dedicated trains. If True, all trains are kept, else only those of the bunchPattern are kept. Returns ------- xarray Dataset with digitizer peak variables. Raw variables are substituted by the peak caclulated values (e.g. "FastADC2raw" becomes "FastADC2peaks"). """ run_mnemonics = mnemonics_for_run(run) if mnemonic not in run_mnemonics: log.warning('Mnemonic not found in run. Skipping.') return merge_with if bool(merge_with) and mnemonic in merge_with: for d in merge_with[mnemonic].dims: if d in ['sa3_pId', 'ol_pId']: log.warning(f'{mnemonic} already extracted. ' 'Skipping.') return merge_with # check if bunch pattern table exists bpt = None if bool(merge_with) and 'bunchPatternTable' in merge_with: bpt = merge_with['bunchPatternTable'] log.debug('Using bpt from merge_with dataset.') elif 'bunchPatternTable' in run_mnemonics: m = run_mnemonics['bunchPatternTable'] bpt = run.get_array(m['source'], m['key'], m['dim']) log.debug('Loaded bpt from DataCollection.') elif 'bunchPatternTable_SA3' in run_mnemonics: m = run_mnemonics['bunchPatternTable_SA3'] bpt = run.get_array(m['source'], m['key'], m['dim']) log.debug('Loaded bpt from DataCollection.') else: log.warning('Could not load bunch pattern table.') # prepare resulting dataset if bool(merge_with): mw_ds = merge_with.drop(mnemonic, errors='ignore') else: mw_ds = xr.Dataset() # iterate over mnemonics and merge arrays in dataset autoFind = True if integParams is None else False m = run_mnemonics[mnemonic] useRaw = True if 'raw' in mnemonic else False if bool(merge_with) and mnemonic in merge_with: data = merge_with[mnemonic] else: data = run.get_array(m['source'], m['key'], m['dim']) peaks = get_peaks(run, data, mnemonic, useRaw=useRaw, autoFind=autoFind, integParams=integParams, bunchPattern=bunchPattern, bpt=bpt) name = mnemonic.replace('raw', 'peaks').replace('apd', 'peaks') join = 'outer' if keepAllSase else 'inner' ds = mw_ds.merge(peaks.rename(name), join=join) return ds
[docs]def digitizer_signal_description(run, digitizer=None): """ Check for the existence of signal description and return all corresponding channels in a dictionnary. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data. digitizer: str or list of str (default=None) Name of the digitizer: one in ['FastADC', FastADC2', 'ADQ412', 'ADQ412_2] If None, all digitizers are used Returns ------- signal_description: dictionnary containing the signal description of the digitizer channels. Example ------- import toolbox_scs as tb run = tb.open_run(3481, 100) signals = tb.digitizer_signal_description(run) signals_fadc2 = tb.digitizer_signal_description(run, 'FastADC2') """ if digitizer not in [None, 'FastADC', 'FastADC2']: raise ValueError('digitizer must be one in ' '["FastADC", "FastADC2"]') if digitizer is None: digitizer = ['FastADC', 'FastADC2', 'ADQ412', 'ADQ412_2'] else: digitizer = [digitizer] def key_fadc(i): if i > 9: return None return f'channel_{i}.signalDescription.value' def key_adq412(i): if i > 3: return None return f'board1.channel_{i}.description.value' sources = {'FastADC': ['SCS_UTC1_MCP/ADC/1', key_fadc], 'FastADC2': ['SCS_UTC2_FADC/ADC/1', key_fadc], 'ADQ412': ['SCS_UTC1_ADQ/ADC/1', key_adq412], 'ADQ412_2': ['SCS_UTC2_ADQ/ADC/1', key_adq412]} res = {} for d in digitizer: if sources[d][0] not in run.all_sources: continue if sources[d][1](0) not in run.get_run_values(sources[d][0]): raise ValueError('No signal description available for ' f'{d}: {sources[d][0]}') for ch in range(10): val = sources[d][1](ch) if val is None: break desc = run.get_run_value(sources[d][0], val) res[f'{d}_Ch{ch}'] = desc return res
def calibrateTIM(data, rollingWindow=200, mcp=1, plot=False, use_apd=True, intstart=None, intstop=None, bkgstart=None, bkgstop=None, t_offset=None, npulses_apd=None): ''' Calibrate TIM signal (Peak-integrated signal) to the slow ion signal of SCS_XGM (photocurrent read by Keithley, channel 'pulseEnergy.photonFlux.value'). The aim is to find F so that E_tim_peak[uJ] = F x TIM_peak. For this, we want to match the SASE3-only average TIM pulse peak per train (TIM_avg) to the slow XGM signal E_slow. Since E_slow is the average energy per pulse over all SASE1 and SASE3 pulses (N1 and N3), we first extract the relative contribution C of the SASE3 pulses by looking at the pulse-resolved signals of the SA3_XGM in the tunnel. There, the signal of SASE1 is usually strong enough to be above noise level. Let TIM_avg be the average of the TIM pulses (SASE3 only). The calibration factor is then defined as: F = E_slow * C * (N1+N3) / ( N3 * TIM_avg ). If N3 changes during the run, we locate the indices for which N3 is maximum and define a window where to apply calibration (indices start/stop). Warning: the calibration does not include the transmission by the KB mirrors! Inputs: data: xarray Dataset rollingWindow: length of running average to calculate TIM_avg mcp: MCP channel plot: boolean. If True, plot calibration results. use_apd: boolean. If False, the TIM pulse peaks are extract from raw traces using getTIMapd intstart: trace index of integration start intstop: trace index of integration stop bkgstart: trace index of background start bkgstop: trace index of background stop t_offset: index separation between two pulses npulses_apd: number of pulses Output: F: float, TIM calibration factor. ''' start = 0 stop = None npulses = data['npulses_sase3'] ntrains = npulses.shape[0] if not np.all(npulses == npulses[0]): start = np.argmax(npulses.values) stop = ntrains + np.argmax(npulses.values[::-1]) - 1 if stop - start < rollingWindow: print('not enough consecutive data points with the largest number of pulses per train') start += rollingWindow stop = np.min((ntrains, stop+rollingWindow)) filteredTIM = getTIMapd(data, mcp, use_apd, intstart, intstop, bkgstart, bkgstop, t_offset, npulses_apd) sa3contrib = saseContribution(data, 'sase3', 'XTD10_XGM') avgFast = filteredTIM.mean(axis=1).rolling(trainId=rollingWindow).mean() ratio = ((data['npulses_sase3']+data['npulses_sase1']) * data['SCS_photonFlux'] * sa3contrib) / (avgFast*data['npulses_sase3']) F = float(ratio[start:stop].median().values) if plot: fig = plt.figure(figsize=(8,5)) ax = plt.subplot(211) ax.set_title('E[uJ] = {:2e} x TIM (MCP{})'.format(F, mcp)) ax.plot(data['SCS_photonFlux'], label='SCS XGM slow (all SASE)', color='C0') slow_avg_sase3 = data['SCS_photonFlux']*(data['npulses_sase1'] +data['npulses_sase3'])*sa3contrib/data['npulses_sase3'] ax.plot(slow_avg_sase3, label='SCS XGM slow (SASE3 only)', color='C1') ax.plot(avgFast*F, label='Calibrated TIM rolling avg', color='C2') ax.legend(loc='upper left', fontsize=8) ax.set_ylabel('Energy [$\mu$J]', size=10) ax.plot(filteredTIM.mean(axis=1)*F, label='Calibrated TIM train avg', alpha=0.2, color='C9') ax.legend(loc='best', fontsize=8, ncol=2) plt.xlabel('train in run') ax = plt.subplot(234) xgm_fast = selectSASEinXGM(data) ax.scatter(filteredTIM, xgm_fast, s=5, alpha=0.1, rasterized=True) fit, cov = np.polyfit(filteredTIM.values.flatten(),xgm_fast.values.flatten(),1, cov=True) y=np.poly1d(fit) x=np.linspace(filteredTIM.min(), filteredTIM.max(), 10) ax.plot(x, y(x), lw=2, color='r') ax.set_ylabel('Raw HAMP [$\mu$J]', size=10) ax.set_xlabel('TIM (MCP{}) signal'.format(mcp), size=10) ax.annotate(s='y(x) = F x + A\n'+ 'F = %.3e\n$\Delta$F/F = %.2e\n'%(fit[0],np.abs(np.sqrt(cov[0,0])/fit[0]))+ 'A = %.3e'%fit[1], xy=(0.5,0.6), xycoords='axes fraction', fontsize=10, color='r') print('TIM calibration factor: %e'%(F)) ax = plt.subplot(235) ax.hist(filteredTIM.values.flatten()*F, bins=50, rwidth=0.8) ax.set_ylabel('number of pulses', size=10) ax.set_xlabel('Pulse energy MCP{} [uJ]'.format(mcp), size=10) ax.set_yscale('log') ax = plt.subplot(236) if not use_apd: pulseStart = intstart pulseStop = intstop else: pulseStart = data.attrs['run'].get_array( 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStart.value')[0].values pulseStop = data.attrs['run'].get_array( 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStop.value')[0].values if 'MCP{}raw'.format(mcp) not in data: tid, data = data.attrs['run'].train_from_index(0) trace = data['SCS_UTC1_ADQ/ADC/1:network']['digitizers.channel_1_D.raw.samples'] print('no raw data for MCP{}. Loading trace from MCP1'.format(mcp)) label_trace='MCP1 Voltage [V]' else: trace = data['MCP{}raw'.format(mcp)][0] label_trace='MCP{} Voltage [V]'.format(mcp) ax.plot(trace[:pulseStop+25], 'o-', ms=2, label='trace') ax.axvspan(pulseStart, pulseStop, color='C2', alpha=0.2, label='APD region') ax.axvline(pulseStart, color='gray', ls='--') ax.axvline(pulseStop, color='gray', ls='--') ax.set_xlim(pulseStart - 25, pulseStop + 25) ax.set_ylabel(label_trace, size=10) ax.set_xlabel('sample #', size=10) ax.legend(fontsize=8) return F ''' TIM calibration table Dict with key= photon energy and value= array of polynomial coefficients for each MCP (1,2,3). The polynomials correspond to a fit of the logarithm of the calibration factor as a function of MCP voltage. If P is a polynomial and V the MCP voltage, the calibration factor (in microjoule per APD signal) is given by -exp(P(V)). This table was generated from the calibration of March 2019, proposal 900074, semester 201930, runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 ''' tim_calibration_table = { 705.5: np.array([ [-6.85344690e-12, 5.00931986e-08, -1.27206912e-04, 1.15596821e-01, -3.15215367e+01], [ 1.25613942e-11, -5.41566381e-08, 8.28161004e-05, -7.27230153e-02, 3.10984925e+01], [ 1.14094964e-12, 7.72658935e-09, -4.27504907e-05, 4.07253378e-02, -7.00773062e+00]]), 779: np.array([ [ 4.57610777e-12, -2.33282497e-08, 4.65978738e-05, -6.43305156e-02, 3.73958623e+01], [ 2.96325102e-11, -1.61393276e-07, 3.32600044e-04, -3.28468195e-01, 1.28328844e+02], [ 1.14521506e-11, -5.81980336e-08, 1.12518434e-04, -1.19072484e-01, 5.37601559e+01]]), 851: np.array([ [ 3.15774215e-11, -1.71452934e-07, 3.50316512e-04, -3.40098861e-01, 1.31064501e+02], [5.36341958e-11, -2.92533156e-07, 6.00574534e-04, -5.71083140e-01, 2.10547161e+02], [ 3.69445588e-11, -1.97731342e-07, 3.98203522e-04, -3.78338599e-01, 1.41894119e+02]]) } def timFactorFromTable(voltage, photonEnergy, mcp=1): ''' Returns an energy calibration factor for TIM integrated peak signal (APD) according to calibration from March 2019, proposal 900074, semester 201930, runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 Uses the tim_calibration_table declared above. Inputs: voltage: MCP voltage in volts. photonEnergy: FEL photon energy in eV. Calibration factor is linearly interpolated between the known values from the calibration table. mcp: MCP channel (1, 2, or 3). Output: f: calibration factor in microjoule per APD signal ''' energies = np.sort([key for key in tim_calibration_table]) if photonEnergy not in energies: if photonEnergy > energies.max(): photonEnergy = energies.max() elif photonEnergy < energies.min(): photonEnergy = energies.min() else: idx = np.searchsorted(energies, photonEnergy) - 1 polyA = np.poly1d(tim_calibration_table[energies[idx]][mcp-1]) polyB = np.poly1d(tim_calibration_table[energies[idx+1]][mcp-1]) fA = -np.exp(polyA(voltage)) fB = -np.exp(polyB(voltage)) f = fA + (fB-fA)/(energies[idx+1]-energies[idx])*(photonEnergy - energies[idx]) return f poly = np.poly1d(tim_calibration_table[photonEnergy][mcp-1]) f = -np.exp(poly(voltage)) return f