""" Digitizers related sub-routines
Copyright (2021) SCS Team.
(contributions preferrably comply with pep8 code structure
guidelines.)
"""
import logging
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from scipy.signal import fftconvolve
from ..misc.bunch_pattern_external import is_pulse_at
from ..util.exceptions import ToolBoxValueError
from ..mnemonics_machinery import (mnemonics_to_process,
mnemonics_for_run)
__all__ = [
'check_peak_params',
'get_digitizer_peaks',
'get_laser_peaks',
'get_peaks',
'get_tim_peaks',
'digitizer_signal_description',
'get_dig_avg_trace'
]
log = logging.getLogger(__name__)
def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop,
period, npulses, extra_dim):
"""
Computes peaks from raw digitizer traces by trapezoidal integration.
Parameters
----------
traces: xarray DataArray or numpy array containing raw traces. If
numpy array is provided, the second dimension is that of the samples.
pulseStart: int or list or 1D-numpy array
trace index of integration start. If 1d array, each value is the start
of one peak. The period and npulses parameters are ignored.
pulseStop: int
trace index of integration stop.
baseStart: int
trace index of background start.
baseStop: int
trace index of background stop.
period: int
number of samples between two peaks. Ignored if intstart is a 1D-array.
npulses: int
number of pulses. Ignored if intstart is a 1D-array.
extra_dim: str
Name given to the dimension along the peaks.
Returns
-------
xarray DataArray
"""
assert len(traces.shape) == 2
if isinstance(traces, xr.DataArray):
ntid = traces.sizes['trainId']
coords = traces.coords
traces = traces.values
if traces.shape[0] != ntid:
traces = traces.T
else:
coords = None
if hasattr(pulseStart, '__len__'):
pulseStart = np.array(pulseStart)
pulses = pulseStart - pulseStart[0]
pulseStart = pulseStart[0]
else:
pulses = range(0, npulses*period, period)
results = xr.DataArray(np.empty((traces.shape[0], len(pulses))),
coords=coords,
dims=['trainId', extra_dim])
for i, p in enumerate(pulses):
a = pulseStart + p
b = pulseStop + p
bkga = baseStart + p
bkgb = baseStop + p
if b > traces.shape[1]:
break
bg = np.outer(np.median(traces[:, bkga:bkgb], axis=1),
np.ones(b-a))
results[:, i] = np.trapz(traces[:, a:b] - bg, axis=1)
return results
def peaks_from_apd(array, params, digitizer, bpt, bunchPattern):
"""
Extract peak-integrated data according to the bunch pattern.
Parameters
----------
array: xarray DataArray
2D array containing peak-integrated data
params: dict
peak-integration parameters of the digitizer
digitizer: str
digitizer type, one of {'FastADC', 'ADQ412'}
bpt: xarray DataArray
bunch pattern table
bunchPattern: str
one of {'sase1', 'sase3', 'scs_ppl'}, used to select pulses and
assign name of the pulse id dimension.
Returns
-------
xarray DataArray with pulse id coordinates.
"""
if params['enable'] == 0 or (array == 1.).all():
raise ValueError('The digitizer did not record integrated peaks. '
'Consider using raw traces from the same channel '
'for peak integration.')
if digitizer == 'FastADC':
min_distance = 24
if digitizer == 'ADQ412':
min_distance = 440
period = params['period']
if period % min_distance != 0:
log.warning(f'Warning: the pulse period ({period} samples) of '
'digitizer is not a multiple of the pulse separation at '
f'4.5 MHz ({min_distance} samples). Pulse id assignment '
'is likely to fail.')
stride = int(period/min_distance)
npulses_apd = params['npulses']
dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId'}
pulse_dim = dim_names[bunchPattern]
arr_dim = [dim for dim in array.dims if dim != 'trainId'][0]
if npulses_apd > array.sizes[arr_dim]:
log.warning(f'The digitizer was set to record {npulses_apd} pulses '
f'but the array length is only {array.sizes[arr_dim]}.')
npulses_apd = array.sizes[arr_dim]
mask = is_pulse_at(bpt, bunchPattern).rename({'pulse_slot': pulse_dim})
mask = mask.where(mask.trainId.isin(array.trainId), drop=True)
mask = mask.assign_coords({pulse_dim: np.arange(bpt.sizes['pulse_slot'])})
pid = np.sort(np.unique(np.where(mask)[1]))
npulses_bpt = len(pid)
apd_coords = np.arange(pid[0], pid[0] + stride*npulses_apd, stride)
noalign = False
if len(np.intersect1d(apd_coords, pid, assume_unique=True)) < npulses_bpt:
log.warning('Not all pulses were recorded. The digitizer '
'was set to record pulse ids '
f'{apd_coords[apd_coords<bpt.sizes["pulse_slot"]]} but the'
'bunch pattern for'
f' {bunchPattern} is {pid}. Skipping pulse ID alignment.')
noalign = True
array = array.isel({arr_dim: slice(0, npulses_apd)})
array = array.where(array != 1.)
if noalign:
return array
array = array.rename(
{arr_dim: pulse_dim}).assign_coords({pulse_dim: apd_coords})
array, mask = xr.align(array, mask, join='inner')
array = array.where(mask, drop=True)
return array
[docs]def get_peaks(run,
data,
mnemonic,
useRaw=True,
autoFind=True,
integParams=None,
bunchPattern='sase3',
bpt=None,
extra_dim=None,
indices=None,
):
"""
Extract peaks from one source (channel) of a digitizer.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data
data: xarray DataArray or str
array containing the raw traces or peak-integrated values from the
digitizer. If str, must be one of the ToolBox mnemonics. If None,
the data is loaded via the source and key arguments.
mnemonic: str or dict
ToolBox mnemonic or dict with source and key as in
{'source': 'SCS_UTC1_ADQ/ADC/1:network',
'key': 'digitizers.channel_1_A.raw.samples'}
useRaw: bool
If True, extract peaks from raw traces. If False, uses the APD (or
peaks) data from the digitizer.
autoFind: bool
If True, finds integration parameters by inspecting the average raw
trace. Only valid if useRaw is True.
integParams: dict
dictionnary containing the integration parameters for raw trace
integration: 'pulseStart', 'pulseStop', 'baseStart', 'baseStop',
'period', 'npulses'. Not used if autoFind is True. All keys are
required when bunch pattern is missing.
bunchPattern: str
match the peaks to the bunch pattern: 'sase1', 'sase3', 'scs_ppl'.
This will dictate the name of the pulse ID coordinates: 'sa1_pId',
'sa3_pId' or 'scs_ppl'.
bpt: xarray DataArray
bunch pattern table
extra_dim: str
Name given to the dimension along the peaks. If None, the name is given
according to the bunchPattern.
indices: array, slice
indices from the peak-integrated data to retrieve. Only required
when bunch pattern is missing and useRaw is False.
Returns
-------
xarray.DataArray containing digitizer peaks with pulse coordinates
"""
arr = data
dim = [d for d in arr.dims if d != 'trainId'][0]
# Load bunch pattern table
run_mnemonics = mnemonics_for_run(run)
if bpt is None and bunchPattern != 'None':
if 'bunchPatternTable' in run_mnemonics:
m = run_mnemonics['bunchPatternTable']
bpt = run.get_array(m['source'], m['key'], m['dim'])
pattern = bunchPattern
else:
pattern = bunchPattern
if bunchPattern == 'None':
bpt = None
# Find digitizer type
m = mnemonic if isinstance(mnemonic, dict) else run_mnemonics[mnemonic]
digitizer = digitizer_type(run, m.get('source'))
# 1. Peak-integrated data from digitizer
if useRaw is False:
# 1.1 No bunch pattern provided
if bpt is None:
log.info('Missing bunch pattern info.')
if indices is None:
raise TypeError('indices argument must be provided '
'when bunch pattern info is missing.')
if extra_dim is None:
extra_dim = 'pulseId'
return arr.isel({dim: indices}).rename({dim: extra_dim})
# 1.2 Bunch pattern is provided
if isinstance(mnemonic, dict):
peak_params = channel_peak_params(run, mnemonic.get('source'),
mnemonic.get('key'))
else:
peak_params = channel_peak_params(run, mnemonic)
log.debug(f'Digitizer peak integration parameters: {peak_params}')
return peaks_from_apd(arr, peak_params, digitizer, bpt, bunchPattern)
# 2. Use raw data from digitizer
# minimum pulse period @ 4.5MHz, according to digitizer type
min_distance = 1
if digitizer == 'FastADC':
min_distance = 24
if digitizer == 'ADQ412':
min_distance = 440
if autoFind:
stride = int(np.max([1, np.floor(arr.sizes['trainId']/200)]))
trace = arr.isel(trainId=slice(0, None, stride)).mean(dim='trainId')
try:
integParams = find_integ_params(trace)
except ValueError as err:
log.warning(f'{err}, trying with averaged trace over all trains.')
trace = arr.mean(dim='trainId')
integParams = find_integ_params(trace)
log.debug(f'Auto find peaks result: {integParams}')
required_keys = ['pulseStart', 'pulseStop', 'baseStart',
'baseStop', 'period', 'npulses']
if integParams is None or not all(name in integParams
for name in required_keys):
raise TypeError('All keys of integParams argument '
f'{required_keys} are required when '
'bunch pattern info is missing.')
# 2.1. No bunch pattern provided
if bpt is None:
log.info('Missing bunch pattern info.')
log.debug(f'Retrieving {integParams["npulses"]} pulses.')
if extra_dim is None:
extra_dim = 'pulseId'
return peaks_from_raw_trace(arr, integParams['pulseStart'],
integParams['pulseStop'],
integParams['baseStart'],
integParams['baseStop'],
integParams['period'],
integParams['npulses'],
extra_dim=extra_dim)
# 2.2 Bunch pattern is provided
# load mask and extract pulse Id:
dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId',
'scs_ppl': 'ol_pId', 'None': 'pulseId'}
extra_dim = dim_names[pattern]
valid_tid = np.intersect1d(arr.trainId, bpt.trainId, assume_unique=True)
mask = is_pulse_at(bpt, pattern)
pattern_changed = ~(mask == mask[0]).all().values
if not pattern_changed:
pid = np.nonzero(mask[0].values)[0]
valid_arr = arr
else:
mask = is_pulse_at(bpt.sel(trainId=valid_tid), pattern)
mask = mask.rename({'pulse_slot': extra_dim})
mask = mask.assign_coords({extra_dim: np.arange(bpt.sizes['pulse_slot'])})
mask_on = mask.where(mask, drop=True).fillna(False).astype(bool)
log.info(f'Bunch pattern of {pattern} changed during the run.')
pid = mask_on.coords[extra_dim]
# select trains containing pulses
valid_arr = arr.sel(trainId=mask_on.trainId)
npulses = len(pid)
log.debug(f'Bunch pattern: {npulses} pulses for {pattern}.')
if npulses == 1:
period_bpt = 0
else:
period_bpt = np.min(np.diff(pid))
if autoFind and period_bpt*min_distance != integParams['period']:
log.warning('The period from the bunch pattern is different than '
'that found by the peak-finding algorithm. Either '
'the algorithm failed or the bunch pattern source '
f'({bunchPattern}) is not correct.')
# create array of sample indices for peak integration
sample_id = (pid-pid[0])*min_distance
# override auto find parameters
if isinstance(integParams['pulseStart'], (int, np.integer)):
integParams['pulseStart'] = integParams['pulseStart'] + sample_id
peaks = peaks_from_raw_trace(valid_arr, integParams['pulseStart'],
integParams['pulseStop'],
integParams['baseStart'],
integParams['baseStop'],
integParams['period'],
integParams['npulses'],
extra_dim)
if pattern_changed:
peaks = peaks.where(mask_on, drop=True)
return peaks.assign_coords({extra_dim: pid})
[docs]def get_dig_avg_trace(run, mnemonic, ntrains=None):
"""
Compute the average over ntrains evenly spaced accross all trains
of a digitizer trace.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'.
ntrains: int
Number of trains used to calculate the average raw trace.
If None, all trains are used.
Returns
-------
trace: DataArray
The average digitizer trace
"""
run_mnemonics = mnemonics_for_run(run)
if ntrains is None:
sel = run
else:
total_tid = len(run.train_ids)
stride = int(np.max([1, np.floor(total_tid/ntrains)]))
sel = run.select_trains(np.s_[0:None:stride])
m =run_mnemonics[mnemonic]
raw_trace = sel.get_array(m['source'], m['key'], m['dim'])
raw_trace = raw_trace.mean(dim='trainId')
return raw_trace
def channel_peak_params(run, source, key=None, channel=None, board=None):
"""
Extract peak-integration parameters used by a channel of the digitizer.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
source: str
ToolBox mnemonic of a digitizer data, e.g. "MCP2apd" or
"FastADC4peaks", or name of digitizer source, e.g.
'SCS_UTC1_ADQ/ADC/1:network'.
key: str
optional, used in combination of source (if source is not a ToolBox
mnemonics) instead of digitizer, channel and board.
channel: int or str
The digitizer channel for which to retrieve the parameters. If None,
inferred from the source mnemonic or source + key arguments.
board: int
Board of the ADQ412. If None, inferred from the source mnemonic or
source + key arguments.
Returns
-------
dict with peak integration parameters.
"""
run_mnemonics = mnemonics_for_run(run)
if source in run_mnemonics:
m = run_mnemonics[source]
source = m['source']
key = m['key']
if 'network' in source:
return adq412_channel_peak_params(run, source, key, channel, board)
else:
return fastADC_channel_peak_params(run, source, channel)
def fastADC_channel_peak_params(run, source, channel=None):
"""
Extract peak-integration parameters used by a channel of the Fast ADC.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
source: str
Name of Fast ADC source, e.g. 'SCS_UTC1_MCP/ADC/1:channel_5.output'.
channel: int
The Fast ADC channel for which to retrieve the parameters. If None,
inferred from the source.
Returns
-------
dict with peak integration parameters.
"""
fastADC_keys = {
'baseStart': ('baselineSettings.baseStart.value',
'baseStart.value'),
'baseStop': ('baseStop.value',),
'baseLength': ('baselineSettings.length.value',),
'enable': ('enablePeakComputation.value',),
'pulseStart': ('initialDelay.value',),
'pulseLength': ('peakSamples.value',),
'npulses': ('numPulses.value',),
'period': ('pulsePeriod.value',)
}
if channel is None:
channel = int(source.split(':')[1].split('.')[0].split('_')[1])
baseName = f'channel_{channel}.'
source = source.split(':')[0]
data = run.select(source).train_from_index(0)[1][source]
result = {}
for mnemo, versions in fastADC_keys.items():
for v in versions:
key = baseName + v
if key in data:
result[mnemo] = data[key]
if 'baseLength' in result:
result['baseStop'] = result['baseStart'] + \
result.pop('baseLength')
if 'pulseLength' in result:
result['pulseStop'] = result['pulseStart'] + \
result.pop('pulseLength')
return result
def adq412_channel_peak_params(run, source, key=None,
channel=None, board=None):
"""
Extract peak-integration parameters used by a channel of the ADQ412.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
source: str
Nname of ADQ412 source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'.
key: str
optional, e.g. 'digitizers.channel_1_D.apd.pulseIntegral', used in
combination of source instead of channel and board.
channel: int or str
The ADQ412 channel for which to retrieve the parameters. If None,
inferred from the source mnemonic or source + key arguments.
board: int
Board of the ADQ412. If None, inferred from the source mnemonic or
source + key arguments.
Returns
-------
dict with peak integration parameters.
"""
if key is None:
if channel is None or board is None:
raise ValueError('key or channel + board arguments are '
'missing.')
else:
k = key.split('.')[1].split('_')
ch_to_int = {'A': 0, 'B': 1, 'C': 2, 'D': 3}
channel = ch_to_int[k[2]]
board = k[1]
baseName = f'board{board}.apd.channel_{channel}.'
source = source.split(':')[0]
adq412_keys = {
'baseStart': (baseName + 'baseStart.value',),
'baseStop': (baseName + 'baseStop.value',),
'enable': (baseName + 'enable.value',),
'pulseStart': (baseName + 'pulseStart.value',),
'pulseStop': (baseName + 'pulseStop.value',),
'initialDelay': (baseName + 'initialDelay.value',),
'upperLimit': (baseName + 'upperLimit.value',),
'npulses': (f'board{board}.apd.numberOfPulses.value',)
}
data = run.select(source).train_from_index(0)[1][source]
result = {}
for mnemo, versions in adq412_keys.items():
for key in versions:
if key in data:
result[mnemo] = data[key]
result['period'] = result.pop('upperLimit') - \
result.pop('initialDelay')
return result
def find_integ_params(trace, height=None, width=1):
"""
Find integration parameters for peak integration of a raw
digitizer trace. Based on scipy find_peaks().
Parameters
----------
trace: numpy array or xarray DataArray
The digitier raw trace used to find peaks
height: int
minimum threshold for peak determination
width: int
minimum width of peak
Returns
-------
dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop',
'period', 'npulses' and values in number of samples.
"""
if isinstance(trace, xr.DataArray):
trace = trace.values
trace_norm = trace - np.median(trace)
trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm
SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100])
if SNR < 10:
log.warning('signal-to-noise ratio too low: cannot '
'automatically find peaks.')
return {'pulseStart': 2, 'pulseStop': 3,
'baseStart': 0, 'baseStop': 1,
'period': 0, 'npulses': 1}
# Compute autocorrelation using fftconvolve
# from "https://stackoverflow.com/questions/12323959/"
# "fast-cross-correlation-method-in-python"
if len(trace_norm) % 2 == 0:
n = len(trace_norm)
else:
n = len(trace_norm) - 1
hl = int(n/2)
a = trace_norm[:n]
b = np.zeros(2*n)
b[hl: hl + n] = a
# Do an array flipped convolution, which is a correlation.
ac_trace = fftconvolve(b, a[::-1], mode='valid')
# slower approach:
# ac_trace = np.correlate(trace_norm, trace_norm, mode='same')
n = int(len(ac_trace)/2)
# estimate quality of ac_trace and define min height to detect peaks
factor = np.abs(ac_trace.max() / np.median(ac_trace))
factor = 3 if factor > 20 else 1.5
ac_peaks = find_peaks(ac_trace[n:], height=ac_trace[n:].max()/factor)
if len(ac_peaks[0]) == 0:
period = 0
distance = 1
elif len(ac_peaks[0]) == 1:
period = ac_peaks[0][0]
distance = max(1, period-6)
else:
period = int(np.median(ac_peaks[0][1:] - ac_peaks[0][:-1]))
distance = max(1, period-6) # smaller than period to account for all peaks
# define min height to detect peaks depending on signal quality
f = np.max([3, np.min([(SNR/10), 4])])
height = trace_norm.max() / f
peaks, dic = find_peaks(trace_norm, distance=distance,
height=height, width=1)
# filter out peaks that are not periodically spaced
idx = np.ones(len(peaks), dtype=bool)
idx[1:] = np.isclose(peaks[1:] - peaks[:-1],
np.ones(len(peaks[1:]))*period, atol=6)
peaks = peaks[idx]
for d in dic:
dic[d] = dic[d][idx]
npulses = len(peaks)
if npulses == 0:
raise ValueError('Could not automatically find peaks.')
pulseStart = np.mean(dic['left_ips'] - peaks + peaks[0], dtype=int)
pulseStop = np.mean(dic['right_ips'] - peaks + peaks[0], dtype=int)
baseStop = pulseStart - np.mean(peaks - dic['left_ips'])/2 - 1
baseStop = np.min([baseStop, pulseStart]).astype(int)
baseStop = np.max([baseStop, 0]).astype(int)
baseStart = baseStop - np.mean(dic['widths'])/2
baseStart = np.max([baseStart, 0]).astype(int)
result = {'pulseStart': pulseStart, 'pulseStop': pulseStop,
'baseStart': baseStart, 'baseStop': baseStop,
'period': period, 'npulses': npulses}
return result
def get_peak_params(run, mnemonic, raw_trace=None, ntrains=200):
"""
Get the peak region and baseline region of a raw digitizer trace used
to compute the peak integration. These regions are either those of the
digitizer peak-integration settings or those determined in get_tim_peaks
or get_fast_adc_peaks when the inputs are raw traces.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'.
raw_trace: optional, 1D numpy array or xarray DataArray
Raw trace to display. If None, the average raw trace over
ntrains of the corresponding channel is loaded (this can
be time-consuming).
ntrains: optional, int
Only used if raw_trace is None. Number of trains used to
calculate the average raw trace of the corresponding channel.
"""
run_mnemonics = mnemonics_for_run(run)
if mnemonic not in run_mnemonics:
raise ToolBoxValueError("mnemonic must be a ToolBox mnemonics")
if "raw" not in mnemonic:
params = channel_peak_params(run, mnemonic)
if 'enable' in params and params['enable'] == 0:
log.warning('The digitizer did not record peak-integrated data.')
title = 'Digitizer peak params'
else:
title = 'Auto-find peak params'
if raw_trace is None:
raw_trace = get_dig_avg_trace(run, mnemonic, ntrains)
params = find_integ_params(raw_trace)
log.debug(f'{title} for {mnemonic}: {params}')
return params
[docs]def check_peak_params(run, mnemonic, raw_trace=None, ntrains=200, params=None,
plot=True, show_all=False, bunchPattern='sase3'):
"""
Checks and plots the peak parameters (pulse window and baseline window
of a raw digitizer trace) used to compute the peak integration. These
parameters are either set by the digitizer peak-integration settings,
or are determined by a peak finding algorithm (used in get_tim_peaks
or get_fast_adc_peaks) when the inputs are raw traces. The parameters
can also be provided manually for visual inspection. The plot either
shows the first and last pulse of the trace or the entire trace.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'.
raw_trace: optional, 1D numpy array or xarray DataArray
Raw trace to display. If None, the average raw trace over
ntrains of the corresponding channel is loaded (this can
be time-consuming).
ntrains: optional, int
Only used if raw_trace is None. Number of trains used to
calculate the average raw trace of the corresponding channel.
plot: bool
If True, displays the raw trace and peak integration regions.
show_all: bool
If True, displays the entire raw trace and all peak integration
regions (this can be time-consuming).
If False, shows the first and last pulse according to the bunchPattern.
bunchPattern: optional, str
Only used if plot is True. Checks the bunch pattern against
the digitizer peak parameters and shows potential mismatch.
Returns
-------
dictionnary of peak integration parameters
"""
run_mnemonics = mnemonics_for_run(run)
if "raw" in mnemonic:
mnemo_raw = mnemonic
title = 'Auto-find peak params'
else:
mnemo_raw = mnemonic.replace('peaks', 'raw').replace('apd', 'raw')
title = 'Digitizer peak params'
if raw_trace is None:
raw_trace = get_dig_avg_trace(run, mnemonic, ntrains)
if params is None:
params = get_peak_params(run, mnemonic, raw_trace)
if 'enable' in params and params['enable'] == 0:
log.warning('The digitizer did not record peak-integrated data.')
if not plot:
return params
digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source'])
min_distance = 24 if digitizer == "FastADC" else 440
if 'bunchPatternTable' in run_mnemonics and bunchPattern != 'None':
sel = run.select_trains(np.s_[:ntrains])
bp_params = {}
m = run_mnemonics['bunchPatternTable']
bpt = sel.get_array(m['source'], m['key'], m['dim'])
mask = is_pulse_at(bpt, bunchPattern)
pid = np.sort(np.unique(np.where(mask)[1]))
bp_params['npulses'] = len(pid)
if bp_params['npulses'] == 1:
bp_params['period'] = 0
else:
bp_params['period'] = np.diff(pid)[0] * min_distance
print(f'bunch pattern {bunchPattern}: {bp_params["npulses"]} pulses,'
f' {bp_params["period"]} samples between two pulses')
else:
bp_params = None
print(f'{title}: {params["npulses"]} pulses, {params["period"]}'
' samples between two pulses')
fig, ax = plotPeakIntegrationWindow(raw_trace, params, bp_params, show_all)
ax[0].set_ylabel(mnemo_raw)
fig.suptitle(title, size=12)
return params
def plotPeakIntegrationWindow(raw_trace, params, bp_params, show_all=False):
if show_all:
fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True)
n = params['npulses']
p = params['period']
for i in range(n):
lbl = 'baseline' if i == 0 else None
lp = 'peak' if i == 0 else None
ax.axvline(params['baseStart'] + i*p, ls='--', color='k')
ax.axvline(params['baseStop'] + i*p, ls='--', color='k')
ax.axvspan(params['baseStart'] + i*p, params['baseStop'] + i*p,
alpha=0.5, color='grey', label=lbl)
ax.axvline(params['pulseStart'] + i*p, ls='--', color='r')
ax.axvline(params['pulseStop'] + i*p, ls='--', color='r')
ax.axvspan(params['pulseStart'] + i*p, params['pulseStop'] + i*p,
alpha=0.2, color='r', label=lp)
ax.plot(raw_trace, color='C0', label='raw trace')
ax.legend(fontsize=8)
return fig, [ax]
if bp_params is not None:
npulses = bp_params['npulses']
period = bp_params['period']
else:
npulses = params['npulses']
period = params['period']
xmin = np.max([0, params['baseStart']-100])
xmax = np.min([params['pulseStop']+100, raw_trace.size])
fig, ax = plt.subplots(1, 2, figsize=(6, 3), constrained_layout=True)
ax[0].axvline(params['baseStart'], ls='--', color='k')
ax[0].axvline(params['baseStop'], ls='--', color='k')
ax[0].axvspan(params['baseStart'], params['baseStop'],
alpha=0.5, color='grey', label='baseline')
ax[0].axvline(params['pulseStart'], ls='--', color='r')
ax[0].axvline(params['pulseStop'], ls='--', color='r')
ax[0].axvspan(params['pulseStart'], params['pulseStop'],
alpha=0.2, color='r', label='peak')
ax[0].plot(np.arange(xmin, xmax), raw_trace[xmin:xmax], color='C0',
label='1st pulse')
ax[0].legend(fontsize=8)
ax[0].set_xlim(xmin, xmax)
ax[0].set_xlabel('digitizer samples')
ax[0].set_title('First pulse', size=10)
xmin2 = xmin + (npulses-1) * period
xmax2 = xmax + (npulses-1) * period
p = params['period']
lbl = 'baseline'
lp = 'peak'
for i in range(params['npulses']):
mi = params['baseStart'] + i*p
if not xmin2 < mi < xmax2:
continue
ax[1].axvline(params['baseStart'] + i*p, ls='--', color='k')
ax[1].axvline(params['baseStop'] + i*p, ls='--', color='k')
ax[1].axvspan(params['baseStart'] + i*p, params['baseStop'] + i*p,
alpha=0.5, color='grey', label=lbl)
ax[1].axvline(params['pulseStart'] + i*p, ls='--', color='r')
ax[1].axvline(params['pulseStop'] + i*p, ls='--', color='r')
ax[1].axvspan(params['pulseStart'] + i*p, params['pulseStop'] + i*p,
alpha=0.2, color='r', label=lp)
lbl = None
lp = None
if xmax2 < raw_trace.size:
ax[1].plot(np.arange(xmin2, xmax2), raw_trace[xmin2:xmax2], color='C0',
label='last pulse')
else:
log.warning('The digitizer raw trace is too short to contain ' +
'all the pulses.')
ax[1].legend(fontsize=8)
ax[1].set_xlabel('digitizer samples')
ax[1].set_xlim(xmin2, xmax2)
ax[1].set_title('Last pulse', size=10)
return fig, ax
def digitizer_type(run, source):
"""
Finds the digitizer type based on the class Id / name of the source.
Example source: 'SCS_UTC1_MCP/ADC/1'. Defaults to ADQ412 if not found.
"""
ret = None
if '_MCP/ADC/1' in source:
ret = 'FastADC'
if '_ADQ/ADC/1' in source:
ret = 'ADQ412'
if ret is None:
digi_dict = {'FastAdc': 'FastADC',
'FastAdcLegacy': 'FastADC',
'AdqDigitizer': 'ADQ412',
'PyADCChannel': 'FastADC',
'PyADCChannelLegacy': 'FastADC'
}
try:
source = source.split(':')[0]
classId = run.get_run_value(source, 'classId.value')
ret = digi_dict.get(classId)
except Exception as e:
log.warning(str(e))
log.warning(f'Could not find digitizer type from source {source}.')
ret = 'ADQ412'
return ret
[docs]def get_tim_peaks(run, mnemonic=None, merge_with=None,
bunchPattern='sase3', integParams=None,
keepAllSase=False):
"""
Automatically computes TIM peaks. Sources can be loaded on the
fly via the mnemonics argument, or processed from an existing data
set (merge_with). The bunch pattern table is used to assign the
pulse id coordinates.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
mnemonics for TIM, e.g. "MCP2apd".
merge_with: xarray Dataset
If provided, the resulting Dataset will be merged with this
one. The TIM variables of merge_with (if any) will also be
computed and merged.
bunchPattern: str
'sase1' or 'sase3' or 'scs_ppl', bunch pattern
used to extract peaks. The pulse ID dimension will be named
'sa1_pId', 'sa3_pId' or 'ol_pId', respectively.
integParams: dict
dictionnary for raw trace integration, e.g.
{'pulseStart':100, 'pulsestop':200, 'baseStart':50,
'baseStop':99, 'period':24, 'npulses':500}.
If None, integration parameters are computed automatically.
keepAllSase: bool
Only relevant in case of sase-dedicated trains. If True, all
trains are kept, else only those of the bunchPattern are kept.
Returns
-------
xarray Dataset with TIM variables substituted by
the peak caclulated values (e.g. "MCP2raw" becomes
"MCP2peaks"), merged with Dataset *merge_with* if provided.
"""
return get_digitizer_peaks(run, mnemonic, merge_with,
bunchPattern, integParams,
keepAllSase)
[docs]def get_laser_peaks(run, mnemonic=None, merge_with=None,
bunchPattern='scs_ppl', integParams=None):
"""
Extracts laser photodiode signal (peak intensity) from Fast ADC
digitizer. Sources can be loaded on the fly via the mnemonics
argument, and/or processed from an existing data set (merge_with).
The PP laser bunch pattern is used to assign the pulse idcoordinates.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
mnemonic for FastADC corresponding to laser signal, e.g.
"FastADC2peaks" or 'I0_ILHraw'.
merge_with: xarray Dataset
If provided, the resulting Dataset will be merged with this
one. The FastADC variables of merge_with (if any) will also be
computed and merged.
bunchPattern: str
'sase1' or 'sase3' or 'scs_ppl', bunch pattern
used to extract peaks.
integParams: dict
dictionnary for raw trace integration, e.g.
{'pulseStart':100, 'pulsestop':200, 'baseStart':50,
'baseStop':99, 'period':24, 'npulses':500}.
If None, integration parameters are computed
automatically.
Returns
-------
xarray Dataset with all Fast ADC variables substituted by
the peak caclulated values (e.g. "FastADC2raw" becomes
"FastADC2peaks").
"""
return get_digitizer_peaks(run, mnemonic, merge_with,
bunchPattern, integParams, False)
[docs]def get_digitizer_peaks(run, mnemonic, merge_with=None,
bunchPattern='sase3', integParams=None,
keepAllSase=False):
"""
Automatically computes digitizer peaks. A source can be loaded on the
fly via the mnemonic argument, or processed from an existing data set
(merge_with). The bunch pattern table is used to assign the pulse
id coordinates.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd".
The data is either loaded from the DataCollection or taken from
merge_with.
merge_with: xarray Dataset
If provided, the resulting Dataset will be merged with this one.
bunchPattern: str or dict
'sase1' or 'sase3' or 'scs_ppl', 'None': bunch pattern
integParams: dict
dictionnary for raw trace integration, e.g.
{'pulseStart':100, 'pulsestop':200, 'baseStart':50,
'baseStop':99, 'period':24, 'npulses':500}.
If None, integration parameters are computed automatically.
keepAllSase: bool
Only relevant in case of sase-dedicated trains. If True, all
trains are kept, else only those of the bunchPattern are kept.
Returns
-------
xarray Dataset with digitizer peak variables. Raw variables are
substituted by the peak caclulated values (e.g. "FastADC2raw" becomes
"FastADC2peaks").
"""
run_mnemonics = mnemonics_for_run(run)
if mnemonic not in run_mnemonics:
log.warning('Mnemonic not found in run. Skipping.')
return merge_with
if bool(merge_with) and mnemonic in merge_with:
for d in merge_with[mnemonic].dims:
if d in ['sa3_pId', 'ol_pId']:
log.warning(f'{mnemonic} already extracted. '
'Skipping.')
return merge_with
# check if bunch pattern table exists
bpt = None
if bool(merge_with) and 'bunchPatternTable' in merge_with:
bpt = merge_with['bunchPatternTable']
log.debug('Using bpt from merge_with dataset.')
elif 'bunchPatternTable' in run_mnemonics:
m = run_mnemonics['bunchPatternTable']
bpt = run.get_array(m['source'], m['key'], m['dim'])
log.debug('Loaded bpt from DataCollection.')
elif 'bunchPatternTable_SA3' in run_mnemonics:
m = run_mnemonics['bunchPatternTable_SA3']
bpt = run.get_array(m['source'], m['key'], m['dim'])
log.debug('Loaded bpt from DataCollection.')
else:
log.warning('Could not load bunch pattern table.')
# prepare resulting dataset
if bool(merge_with):
mw_ds = merge_with.drop(mnemonic, errors='ignore')
else:
mw_ds = xr.Dataset()
# iterate over mnemonics and merge arrays in dataset
autoFind = True if integParams is None else False
m = run_mnemonics[mnemonic]
useRaw = True if 'raw' in mnemonic else False
if bool(merge_with) and mnemonic in merge_with:
data = merge_with[mnemonic]
else:
data = run.get_array(m['source'], m['key'], m['dim'])
peaks = get_peaks(run, data, mnemonic,
useRaw=useRaw,
autoFind=autoFind,
integParams=integParams,
bunchPattern=bunchPattern,
bpt=bpt)
name = mnemonic.replace('raw', 'peaks').replace('apd', 'peaks')
join = 'outer' if keepAllSase else 'inner'
ds = mw_ds.merge(peaks.rename(name), join=join)
return ds
[docs]def digitizer_signal_description(run, digitizer=None):
"""
Check for the existence of signal description and return all corresponding
channels in a dictionnary.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
digitizer: str or list of str (default=None)
Name of the digitizer: one in ['FastADC', FastADC2',
'ADQ412', 'ADQ412_2]
If None, all digitizers are used
Returns
-------
signal_description: dictionnary containing the signal description of
the digitizer channels.
Example
-------
import toolbox_scs as tb
run = tb.open_run(3481, 100)
signals = tb.digitizer_signal_description(run)
signals_fadc2 = tb.digitizer_signal_description(run, 'FastADC2')
"""
if digitizer not in [None, 'FastADC', 'FastADC2']:
raise ValueError('digitizer must be one in '
'["FastADC", "FastADC2"]')
if digitizer is None:
digitizer = ['FastADC', 'FastADC2', 'ADQ412', 'ADQ412_2']
else:
digitizer = [digitizer]
def key_fadc(i):
if i > 9:
return None
return f'channel_{i}.signalDescription.value'
def key_adq412(i):
if i > 3:
return None
return f'board1.channel_{i}.description.value'
sources = {'FastADC': ['SCS_UTC1_MCP/ADC/1', key_fadc],
'FastADC2': ['SCS_UTC2_FADC/ADC/1', key_fadc],
'ADQ412': ['SCS_UTC1_ADQ/ADC/1', key_adq412],
'ADQ412_2': ['SCS_UTC2_ADQ/ADC/1', key_adq412]}
res = {}
for d in digitizer:
if sources[d][0] not in run.all_sources:
continue
if sources[d][1](0) not in run.get_run_values(sources[d][0]):
raise ValueError('No signal description available for '
f'{d}: {sources[d][0]}')
for ch in range(10):
val = sources[d][1](ch)
if val is None:
break
desc = run.get_run_value(sources[d][0], val)
res[f'{d}_Ch{ch}'] = desc
return res
def calibrateTIM(data, rollingWindow=200, mcp=1, plot=False, use_apd=True, intstart=None,
intstop=None, bkgstart=None, bkgstop=None, t_offset=None, npulses_apd=None):
''' Calibrate TIM signal (Peak-integrated signal) to the slow ion signal of SCS_XGM
(photocurrent read by Keithley, channel 'pulseEnergy.photonFlux.value').
The aim is to find F so that E_tim_peak[uJ] = F x TIM_peak. For this, we want to
match the SASE3-only average TIM pulse peak per train (TIM_avg) to the slow XGM
signal E_slow.
Since E_slow is the average energy per pulse over all SASE1 and SASE3
pulses (N1 and N3), we first extract the relative contribution C of the SASE3 pulses
by looking at the pulse-resolved signals of the SA3_XGM in the tunnel.
There, the signal of SASE1 is usually strong enough to be above noise level.
Let TIM_avg be the average of the TIM pulses (SASE3 only).
The calibration factor is then defined as: F = E_slow * C * (N1+N3) / ( N3 * TIM_avg ).
If N3 changes during the run, we locate the indices for which N3 is maximum and define
a window where to apply calibration (indices start/stop).
Warning: the calibration does not include the transmission by the KB mirrors!
Inputs:
data: xarray Dataset
rollingWindow: length of running average to calculate TIM_avg
mcp: MCP channel
plot: boolean. If True, plot calibration results.
use_apd: boolean. If False, the TIM pulse peaks are extract from raw traces using
getTIMapd
intstart: trace index of integration start
intstop: trace index of integration stop
bkgstart: trace index of background start
bkgstop: trace index of background stop
t_offset: index separation between two pulses
npulses_apd: number of pulses
Output:
F: float, TIM calibration factor.
'''
start = 0
stop = None
npulses = data['npulses_sase3']
ntrains = npulses.shape[0]
if not np.all(npulses == npulses[0]):
start = np.argmax(npulses.values)
stop = ntrains + np.argmax(npulses.values[::-1]) - 1
if stop - start < rollingWindow:
print('not enough consecutive data points with the largest number of pulses per train')
start += rollingWindow
stop = np.min((ntrains, stop+rollingWindow))
filteredTIM = getTIMapd(data, mcp, use_apd, intstart, intstop, bkgstart, bkgstop, t_offset, npulses_apd)
sa3contrib = saseContribution(data, 'sase3', 'XTD10_XGM')
avgFast = filteredTIM.mean(axis=1).rolling(trainId=rollingWindow).mean()
ratio = ((data['npulses_sase3']+data['npulses_sase1']) *
data['SCS_photonFlux'] * sa3contrib) / (avgFast*data['npulses_sase3'])
F = float(ratio[start:stop].median().values)
if plot:
fig = plt.figure(figsize=(8,5))
ax = plt.subplot(211)
ax.set_title('E[uJ] = {:2e} x TIM (MCP{})'.format(F, mcp))
ax.plot(data['SCS_photonFlux'], label='SCS XGM slow (all SASE)', color='C0')
slow_avg_sase3 = data['SCS_photonFlux']*(data['npulses_sase1']
+data['npulses_sase3'])*sa3contrib/data['npulses_sase3']
ax.plot(slow_avg_sase3, label='SCS XGM slow (SASE3 only)', color='C1')
ax.plot(avgFast*F, label='Calibrated TIM rolling avg', color='C2')
ax.legend(loc='upper left', fontsize=8)
ax.set_ylabel('Energy [$\mu$J]', size=10)
ax.plot(filteredTIM.mean(axis=1)*F, label='Calibrated TIM train avg', alpha=0.2, color='C9')
ax.legend(loc='best', fontsize=8, ncol=2)
plt.xlabel('train in run')
ax = plt.subplot(234)
xgm_fast = selectSASEinXGM(data)
ax.scatter(filteredTIM, xgm_fast, s=5, alpha=0.1, rasterized=True)
fit, cov = np.polyfit(filteredTIM.values.flatten(),xgm_fast.values.flatten(),1, cov=True)
y=np.poly1d(fit)
x=np.linspace(filteredTIM.min(), filteredTIM.max(), 10)
ax.plot(x, y(x), lw=2, color='r')
ax.set_ylabel('Raw HAMP [$\mu$J]', size=10)
ax.set_xlabel('TIM (MCP{}) signal'.format(mcp), size=10)
ax.annotate(s='y(x) = F x + A\n'+
'F = %.3e\n$\Delta$F/F = %.2e\n'%(fit[0],np.abs(np.sqrt(cov[0,0])/fit[0]))+
'A = %.3e'%fit[1],
xy=(0.5,0.6), xycoords='axes fraction', fontsize=10, color='r')
print('TIM calibration factor: %e'%(F))
ax = plt.subplot(235)
ax.hist(filteredTIM.values.flatten()*F, bins=50, rwidth=0.8)
ax.set_ylabel('number of pulses', size=10)
ax.set_xlabel('Pulse energy MCP{} [uJ]'.format(mcp), size=10)
ax.set_yscale('log')
ax = plt.subplot(236)
if not use_apd:
pulseStart = intstart
pulseStop = intstop
else:
pulseStart = data.attrs['run'].get_array(
'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStart.value')[0].values
pulseStop = data.attrs['run'].get_array(
'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStop.value')[0].values
if 'MCP{}raw'.format(mcp) not in data:
tid, data = data.attrs['run'].train_from_index(0)
trace = data['SCS_UTC1_ADQ/ADC/1:network']['digitizers.channel_1_D.raw.samples']
print('no raw data for MCP{}. Loading trace from MCP1'.format(mcp))
label_trace='MCP1 Voltage [V]'
else:
trace = data['MCP{}raw'.format(mcp)][0]
label_trace='MCP{} Voltage [V]'.format(mcp)
ax.plot(trace[:pulseStop+25], 'o-', ms=2, label='trace')
ax.axvspan(pulseStart, pulseStop, color='C2', alpha=0.2, label='APD region')
ax.axvline(pulseStart, color='gray', ls='--')
ax.axvline(pulseStop, color='gray', ls='--')
ax.set_xlim(pulseStart - 25, pulseStop + 25)
ax.set_ylabel(label_trace, size=10)
ax.set_xlabel('sample #', size=10)
ax.legend(fontsize=8)
return F
''' TIM calibration table
Dict with key= photon energy and value= array of polynomial coefficients for each MCP (1,2,3).
The polynomials correspond to a fit of the logarithm of the calibration factor as a function
of MCP voltage. If P is a polynomial and V the MCP voltage, the calibration factor (in microjoule
per APD signal) is given by -exp(P(V)).
This table was generated from the calibration of March 2019, proposal 900074, semester 201930,
runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323
runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334
runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349
'''
tim_calibration_table = {
705.5: np.array([
[-6.85344690e-12, 5.00931986e-08, -1.27206912e-04, 1.15596821e-01, -3.15215367e+01],
[ 1.25613942e-11, -5.41566381e-08, 8.28161004e-05, -7.27230153e-02, 3.10984925e+01],
[ 1.14094964e-12, 7.72658935e-09, -4.27504907e-05, 4.07253378e-02, -7.00773062e+00]]),
779: np.array([
[ 4.57610777e-12, -2.33282497e-08, 4.65978738e-05, -6.43305156e-02, 3.73958623e+01],
[ 2.96325102e-11, -1.61393276e-07, 3.32600044e-04, -3.28468195e-01, 1.28328844e+02],
[ 1.14521506e-11, -5.81980336e-08, 1.12518434e-04, -1.19072484e-01, 5.37601559e+01]]),
851: np.array([
[ 3.15774215e-11, -1.71452934e-07, 3.50316512e-04, -3.40098861e-01, 1.31064501e+02],
[5.36341958e-11, -2.92533156e-07, 6.00574534e-04, -5.71083140e-01, 2.10547161e+02],
[ 3.69445588e-11, -1.97731342e-07, 3.98203522e-04, -3.78338599e-01, 1.41894119e+02]])
}
def timFactorFromTable(voltage, photonEnergy, mcp=1):
''' Returns an energy calibration factor for TIM integrated peak signal (APD)
according to calibration from March 2019, proposal 900074, semester 201930,
runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323
runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334
runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349
Uses the tim_calibration_table declared above.
Inputs:
voltage: MCP voltage in volts.
photonEnergy: FEL photon energy in eV. Calibration factor is linearly
interpolated between the known values from the calibration table.
mcp: MCP channel (1, 2, or 3).
Output:
f: calibration factor in microjoule per APD signal
'''
energies = np.sort([key for key in tim_calibration_table])
if photonEnergy not in energies:
if photonEnergy > energies.max():
photonEnergy = energies.max()
elif photonEnergy < energies.min():
photonEnergy = energies.min()
else:
idx = np.searchsorted(energies, photonEnergy) - 1
polyA = np.poly1d(tim_calibration_table[energies[idx]][mcp-1])
polyB = np.poly1d(tim_calibration_table[energies[idx+1]][mcp-1])
fA = -np.exp(polyA(voltage))
fB = -np.exp(polyB(voltage))
f = fA + (fB-fA)/(energies[idx+1]-energies[idx])*(photonEnergy - energies[idx])
return f
poly = np.poly1d(tim_calibration_table[photonEnergy][mcp-1])
f = -np.exp(poly(voltage))
return f