""" Digitizers related sub-routines
Copyright (2021) SCS Team.
(contributions preferrably comply with pep8 code structure
guidelines.)
"""
import logging
import os
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from ..misc.bunch_pattern_external import is_pulse_at
from ..mnemonics_machinery import mnemonics_for_run
from extra_data import open_run
from extra_data.read_machinery import find_proposal
from extra.components import XrayPulses, OpticalLaserPulses
__all__ = [
'check_peak_params',
'get_digitizer_peaks',
'get_dig_avg_trace',
'load_processed_peaks',
'check_processed_peak_params'
]
log = logging.getLogger(__name__)
def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop,
period=None, npulses=None, extra_dim=None):
"""
Computes peaks from raw digitizer traces by trapezoidal integration.
Parameters
----------
traces: xarray DataArray or numpy array containing raw traces. If
numpy array is provided, the second dimension is that of the samples.
pulseStart: int or list or 1D-numpy array
trace index of integration start. If 1d array, each value is the start
of one peak. The period and npulses parameters are ignored.
pulseStop: int
trace index of integration stop.
baseStart: int
trace index of background start.
baseStop: int
trace index of background stop.
period: int
number of samples between two peaks. Ignored if intstart is a 1D-array.
npulses: int
number of pulses. Ignored if intstart is a 1D-array.
extra_dim: str
Name given to the dimension along the peaks. Defaults to 'pulseId'.
Returns
-------
xarray DataArray
"""
assert traces.ndim == 2
if isinstance(traces, xr.DataArray):
ntid = traces.sizes['trainId']
coords = traces.coords
traces = traces.values
if traces.shape[0] != ntid:
traces = traces.T
else:
coords = None
if hasattr(pulseStart, '__len__'):
pulseStart = np.array(pulseStart)
pulses = pulseStart - pulseStart[0]
pulseStart = pulseStart[0]
else:
if period is None or period == 0:
period = 1
pulses = range(0, npulses*period, period)
if extra_dim is None:
extra_dim = 'pulseId'
results = xr.DataArray(np.empty((traces.shape[0], len(pulses))),
coords=coords,
dims=['trainId', extra_dim])
for i, p in enumerate(pulses):
a = pulseStart + p
b = pulseStop + p
bkga = baseStart + p
bkgb = baseStop + p
if b > traces.shape[1]:
break
bg = np.outer(np.median(traces[:, bkga:bkgb], axis=1),
np.ones(b-a))
results[:, i] = np.trapz(traces[:, a:b] - bg, axis=1)
return results
def peaks_from_apd(array, params, digitizer, bpt, bunchPattern):
"""
Extract peak-integrated data according to the bunch pattern.
Parameters
----------
array: xarray DataArray
2D array containing peak-integrated data
params: dict
peak-integration parameters of the digitizer
digitizer: str
digitizer type, one of {'FastADC', 'ADQ412'}
bpt: xarray DataArray
bunch pattern table
bunchPattern: str
one of {'sase1', 'sase3', 'scs_ppl'}, used to select pulses and
assign name of the pulse id dimension.
Returns
-------
xarray DataArray with pulse id coordinates.
"""
if params['enable'] == 0 or (array == 1.).all():
raise ValueError('The digitizer did not record integrated peaks. '
'Consider using raw traces from the same channel '
'for peak integration.')
if digitizer == 'FastADC':
min_distance = 24
if digitizer == 'ADQ412':
min_distance = 440
period = params['period']
if period % min_distance != 0:
log.warning(f'Warning: the pulse period ({period} samples) of '
'digitizer is not a multiple of the pulse separation at '
f'4.5 MHz ({min_distance} samples). Pulse id assignment '
'is likely to fail.')
stride = int(period/min_distance)
npulses_apd = params['npulses']
dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId'}
pulse_dim = dim_names[bunchPattern]
arr_dim = [dim for dim in array.dims if dim != 'trainId'][0]
if npulses_apd > array.sizes[arr_dim]:
log.warning(f'The digitizer was set to record {npulses_apd} pulses '
f'but the array length is only {array.sizes[arr_dim]}.')
npulses_apd = array.sizes[arr_dim]
mask = is_pulse_at(bpt, bunchPattern).rename({'pulse_slot': pulse_dim})
mask = mask.where(mask.trainId.isin(array.trainId), drop=True)
mask = mask.assign_coords({pulse_dim: np.arange(bpt.sizes['pulse_slot'])})
pid = np.sort(np.unique(np.where(mask)[1]))
npulses_bpt = len(pid)
apd_coords = np.arange(pid[0], pid[0] + stride*npulses_apd, stride)
noalign = False
if len(np.intersect1d(apd_coords, pid, assume_unique=True)) < npulses_bpt:
log.warning('Not all pulses were recorded. The digitizer '
'was set to record pulse ids '
f'{apd_coords[apd_coords<bpt.sizes["pulse_slot"]]} but the'
'bunch pattern for'
f' {bunchPattern} is {pid}. Skipping pulse ID alignment.')
noalign = True
array = array.isel({arr_dim: slice(0, npulses_apd)})
array = array.where(array != 1.)
if noalign:
return array
array = array.rename(
{arr_dim: pulse_dim}).assign_coords({pulse_dim: apd_coords})
array, mask = xr.align(array, mask, join='inner')
array = array.where(mask, drop=True)
return array
[docs]def get_dig_avg_trace(run, mnemonic, ntrains=None):
"""
Compute the average over ntrains evenly spaced accross all trains
of a digitizer trace.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
mnemonic: str
ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'.
ntrains: int
Number of trains used to calculate the average raw trace.
If None, all trains are used.
Returns
-------
trace: DataArray
The average digitizer trace
"""
run_mnemonics = mnemonics_for_run(run)
if ntrains is None:
sel = run
else:
total_tid = len(run.train_ids)
stride = int(np.max([1, np.floor(total_tid/ntrains)]))
sel = run.select_trains(np.s_[0:None:stride])
m = run_mnemonics[mnemonic]
raw_trace = sel.get_array(m['source'], m['key'], m['dim'])
raw_trace = raw_trace.mean(dim='trainId')
return raw_trace
def channel_peak_params(run, source, key=None, channel=None, board=None):
"""
Extract peak-integration parameters used by a channel of the digitizer.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
source: str
ToolBox mnemonic of a digitizer data, e.g. "MCP2apd" or
"FastADC4peaks", or name of digitizer source, e.g.
'SCS_UTC1_ADQ/ADC/1:network'.
key: str
optional, used in combination of source (if source is not a ToolBox
mnemonics) instead of digitizer, channel and board.
channel: int or str
The digitizer channel for which to retrieve the parameters. If None,
inferred from the source mnemonic or source + key arguments.
board: int
Board of the ADQ412. If None, inferred from the source mnemonic or
source + key arguments.
Returns
-------
dict with peak integration parameters.
"""
run_mnemonics = mnemonics_for_run(run)
if source in run_mnemonics:
m = run_mnemonics[source]
source = m['source']
key = m['key']
if 'network' in source:
return adq412_channel_peak_params(run, source, key, channel, board)
else:
return fastADC_channel_peak_params(run, source, channel)
def fastADC_channel_peak_params(run, source, channel=None):
"""
Extract peak-integration parameters used by a channel of the Fast ADC.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
source: str
Name of Fast ADC source, e.g. 'SCS_UTC1_MCP/ADC/1:channel_5.output'.
channel: int
The Fast ADC channel for which to retrieve the parameters. If None,
inferred from the source.
Returns
-------
dict with peak integration parameters.
"""
fastADC_keys = {
'baseStart': ('baselineSettings.baseStart.value',
'baseStart.value'),
'baseStop': ('baseStop.value',),
'baseLength': ('baselineSettings.length.value',),
'enable': ('enablePeakComputation.value',),
'pulseStart': ('initialDelay.value',),
'pulseLength': ('peakSamples.value',),
'npulses': ('numPulses.value',),
'period': ('pulsePeriod.value',)
}
if channel is None:
channel = int(source.split(':')[1].split('.')[0].split('_')[1])
baseName = f'channel_{channel}.'
source = source.split(':')[0]
data = run.select(source).train_from_index(0)[1][source]
result = {}
for mnemo, versions in fastADC_keys.items():
for v in versions:
key = baseName + v
if key in data:
result[mnemo] = data[key]
if 'baseLength' in result:
result['baseStop'] = result['baseStart'] + \
result.pop('baseLength')
if 'pulseLength' in result:
result['pulseStop'] = result['pulseStart'] + \
result.pop('pulseLength')
return result
def adq412_channel_peak_params(run, source, key=None,
channel=None, board=None):
"""
Extract peak-integration parameters used by a channel of the ADQ412.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data.
source: str
Nname of ADQ412 source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'.
key: str
optional, e.g. 'digitizers.channel_1_D.apd.pulseIntegral', used in
combination of source instead of channel and board.
channel: int or str
The ADQ412 channel for which to retrieve the parameters. If None,
inferred from the source mnemonic or source + key arguments.
board: int
Board of the ADQ412. If None, inferred from the source mnemonic or
source + key arguments.
Returns
-------
dict with peak integration parameters.
"""
if key is None:
if channel is None or board is None:
raise ValueError('key or channel + board arguments are '
'missing.')
else:
k = key.split('.')[1].split('_')
ch_to_int = {'A': 0, 'B': 1, 'C': 2, 'D': 3}
channel = ch_to_int[k[2]]
board = k[1]
baseName = f'board{board}.apd.channel_{channel}.'
source = source.split(':')[0]
adq412_keys = {
'baseStart': (baseName + 'baseStart.value',),
'baseStop': (baseName + 'baseStop.value',),
'enable': (baseName + 'enable.value',),
'pulseStart': (baseName + 'pulseStart.value',),
'pulseStop': (baseName + 'pulseStop.value',),
'initialDelay': (baseName + 'initialDelay.value',),
'upperLimit': (baseName + 'upperLimit.value',),
'npulses': (f'board{board}.apd.numberOfPulses.value',)
}
data = run.select(source).train_from_index(0)[1][source]
result = {}
for mnemo, versions in adq412_keys.items():
for key in versions:
if key in data:
result[mnemo] = data[key]
result['period'] = result.pop('upperLimit') - \
result.pop('initialDelay')
return result
def find_peaks_in_raw_trace(trace, height=None, width=1, distance=24):
"""
Find integration parameters for peak integration of a raw
digitizer trace. Based on scipy find_peaks().
Parameters
----------
trace: numpy array or xarray DataArray
The digitier raw trace used to find peaks
height: int
minimum height for peak determination
width: int
minimum width of peak
distance: int
minimum distance between two peaks
Returns
-------
dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop',
'period', 'npulses' and values in number of samples.
"""
if isinstance(trace, xr.DataArray):
trace = trace.values
trace_norm = trace - np.median(trace)
trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm
SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100])
if SNR < 10:
log.warning('signal-to-noise ratio too low: cannot '
'automatically find peaks.')
return {'pulseStart': 2, 'pulseStop': 3,
'baseStart': 0, 'baseStop': 1,
'period': 0, 'npulses': 1}
min_height = min(3 * SNR, np.max(np.abs(trace_norm)) / 3)
peaks, prop = find_peaks(trace_norm, distance=distance,
height=min_height,
width=2)
params = {}
start = int(prop['left_ips'][0])
if len(peaks) == 1:
params['pulseStart'] = start
params['period'] = 0
else:
pulse_period = int(np.median(np.diff(peaks)))
pulse_ids = np.digitize(peaks,
np.arange(peaks[0] - pulse_period/2,
peaks[-1] + pulse_period/2,
pulse_period)) - 1
if len(np.unique(np.diff(pulse_ids))) == 1:
# Regular pattern
params['pulseStart'] = start
params['period'] = pulse_period
else:
# Irregular pattern
params['pulseStart'] = start + pulse_ids * pulse_period
params['period'] = 0
params['pulseStop'] = int(prop['right_ips'][0])
params['baseStop'] = (3 * start - peaks[0]) / 2
params['baseStop'] = np.min([params['baseStop'],
int(prop['right_ips'][0])]).astype(int)
params['baseStart'] = params['baseStop'] - np.mean(prop['widths'])/2
params['baseStart'] = np.max([params['baseStart'], 0]).astype(int)
params['npulses'] = len(peaks)
return params
def find_peak_integration_parameters(run, mnemonic, raw_trace=None,
integParams=None, pattern=None,
ntrains=None):
'''
Finds peak integration parameters.
'''
run_mnemonics = mnemonics_for_run(run)
digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source'])
pulse_period = 24 if digitizer == "FastADC" else 440
if integParams is None:
# load raw trace and find peaks
autoFind = True
if raw_trace is None:
raw_trace = get_dig_avg_trace(run, mnemonic, ntrains)
params = find_peaks_in_raw_trace(raw_trace, distance=pulse_period)
else:
# inspect provided parameters
autoFind = False
required_keys = ['pulseStart', 'pulseStop', 'baseStart',
'baseStop']
add_text = ''
if pattern is None and not hasattr(integParams['pulseStart'],
'__len__'):
required_keys += ['period', 'npulses']
add_text = 'Bunch pattern not provided. '
if not all(name in integParams for name in required_keys):
raise ValueError(add_text + 'All keys of integParams argument '
f'{required_keys} are required.')
params = integParams.copy()
# extract pulse ids from the parameters (starting at 0)
pulse_ids_params = None
if hasattr(params['pulseStart'], '__len__'):
if params.get('npulses') is not None and (
params.get('npulses') != len(params['pulseStart'])):
log.warning('The number of pulses does not match the length '
'of pulseStart. Using length of pulseStart as '
'the number of pulses.')
params['npulses'] = len(params['pulseStart'])
pulse_ids_params = ((np.array(params['pulseStart']) -
params['pulseStart'][0]) / pulse_period).astype(int)
elif 'npulses' in params and 'period' in params:
if params['npulses'] == 1:
pulse_ids_params = np.array([0])
else:
pulse_ids_params = np.arange(0,
params['npulses'] * params['period'] / pulse_period,
params['period'] / pulse_period).astype(int)
# Extract pulse_ids, period and npulses from bunch pattern
pulse_ids_bp, npulses_bp, period_bp = None, None, 0
regular = True
if pattern is not None:
bunchPattern = 'sase3' if hasattr(pattern, 'sase') else 'scs_ppl'
if pattern.is_constant_pattern() is False:
log.warning('The number of pulses changed during the run.')
pulse_ids_bp = np.unique(pattern.pulse_ids(labelled=False,
copy=False))
npulses_bp, period_bp = None, 0
regular = False
else:
pulse_ids_bp = pattern.peek_pulse_ids(labelled=False)
npulses_bp = len(pulse_ids_bp)
if npulses_bp > 1:
periods = np.diff(pulse_ids_bp)
if len(np.unique(periods)) > 1:
regular = False
else:
period_bp = np.unique(periods)[0] * pulse_period
# Compare parameters with bunch pattern
if pulse_ids_params is None:
params['npulses'] = npulses_bp
params['period'] = period_bp
pulse_ids_params = np.arange(0,
params['npulses'] * params['period'] / pulse_period,
params['period'] / pulse_period).astype(int)
if len(pulse_ids_params) == len(pulse_ids_bp):
if not (pulse_ids_params == pulse_ids_bp - pulse_ids_bp[0]).all():
log.warning('The provided pulseStart parameters do not match '
f'the {bunchPattern} bunch pattern pulse ids. '
'Using bunch pattern parameters.')
pulse_ids_params = pulse_ids_bp
if (npulses_bp != params.get('npulses') or
period_bp != params.get('period')):
if autoFind:
add_text = 'Automatically found '
else:
add_text = 'Provided '
log.warning(add_text + 'integration parameters '
f'(npulses={params.get("npulses")}, ' +
f'period={params.get("period")}) do not match the '
f'{bunchPattern} bunch pattern (npulses='
f'{npulses_bp}, period={period_bp}). Using bunch '
'pattern parameters.')
pulse_ids_params = pulse_ids_bp
params['npulses'] = npulses_bp
params['period'] = period_bp
if not regular:
# Irregular pattern
if hasattr(params['pulseStart'], '__len__'):
start = params['pulseStart'][0]
else:
start = params['pulseStart']
params['pulseStart'] = np.array(
[int(start + (pid - pulse_ids_params[0]) * pulse_period)
for pid in pulse_ids_params])
params['period'] = 0
else:
# Regular pattern
if hasattr(params['pulseStart'], '__len__'):
params['pulseStart'] = params['pulseStart'][0]
if len(pulse_ids_params) == 1:
params['period'] = 0
return params, pulse_ids_params, regular, raw_trace
[docs]def check_peak_params(proposal, runNB, mnemonic, raw_trace=None,
ntrains=200, integParams=None, bunchPattern='sase3',
plot=True, show_all=False):
"""
Checks and plots the peak parameters (pulse window and baseline window
of a raw digitizer trace) used to compute the peak integration. These
parameters are either set by the digitizer peak-integration settings,
or are determined by a peak finding algorithmes. The parameters
can also be provided manually for visual inspection. The plot either
shows the first and last pulse of the trace or the entire trace.
Parameters
----------
proposal: int
the proposal number
runNB: int
the run number
mnemonic: str
ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'.
raw_trace: optional, 1D numpy array or xarray DataArray
Raw trace to display. If None, the average raw trace over
ntrains of the corresponding channel is loaded (this can
be time-consuming).
ntrains: optional, int
Only used if raw_trace is None. Number of trains used to
calculate the average raw trace of the corresponding channel.
plot: bool
If True, displays the raw trace and peak integration regions.
show_all: bool
If True, displays the entire raw trace and all peak integration
regions (this can be time-consuming).
If False, shows the first and last pulse according to the bunchPattern.
bunchPattern: optional, str
Only used if plot is True. Checks the bunch pattern against
the digitizer peak parameters and shows potential mismatch.
Returns
-------
dictionnary of peak integration parameters
"""
run = open_run(proposal, runNB)
run_mnemonics = mnemonics_for_run(run)
digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source'])
pulse_period = 24 if digitizer == "FastADC" else 440
pattern = None
try:
if bunchPattern == 'sase3':
pattern = XrayPulses(run)
if bunchPattern == 'scs_ppl':
pattern = OpticalLaserPulses(run)
except Exception as e:
log.warning(e)
bunchPattern = None
params, pulse_ids, regular, raw_trace = find_peak_integration_parameters(
run, mnemonic, raw_trace, integParams, pattern)
if bunchPattern:
if regular:
add_text = ''
if len(pulse_ids) > 1:
add_text = f's, {(pulse_ids[1]-pulse_ids[0]) * pulse_period}'\
+ ' samples between two pulses'
print(f'Bunch pattern {bunchPattern}: {len(pulse_ids)} pulse'
+ add_text)
else:
print(f'Bunch pattern {bunchPattern}: Not a regular pattern. '
f'{len(pulse_ids)} pulses, pulse_ids='
f'{pulse_ids}.')
if integParams is None:
title = 'Auto-find peak parameters'
else:
title = 'Provided peak parameters'
no_change = True
for k, v in integParams.items():
no_change = no_change & (v == params[k])
if hasattr(no_change, '__len__'):
no_change = no_change.all()
if no_change is False:
print('The provided parameters did not match the bunch '
'pattern and were adjusted.')
title += ' (adjusted)'
if regular:
add_text = ''
if params['npulses'] > 1:
add_text = f's, {params["period"]} samples between two pulses'
print(title + ': ' + f' {params["npulses"]} pulse' + add_text)
else:
print(f'{title}: Not a regular pattern. '
f'{len(pulse_ids)} pulses, pulse_ids='
f'{pulse_ids}.')
if plot:
if raw_trace is None:
raw_trace = get_dig_avg_trace(run, mnemonic)
fig, ax = plotPeakIntegrationWindow(raw_trace, params,
show_all=show_all)
fig.suptitle(f'p{proposal} r{runNB} ' + title, size=12)
return params
def plotPeakIntegrationWindow(raw_trace, params, show_all=False):
if hasattr(params['pulseStart'], '__len__'):
starts = np.array(params['pulseStart'])
stops = params['pulseStop'] + starts - params['pulseStart'][0]
baseStarts = params['baseStart'] + starts - params['pulseStart'][0]
baseStops = params['baseStop'] + starts - params['pulseStart'][0]
else:
n = params['npulses']
p = params['period']
starts = [params['pulseStart'] + i*p for i in range(n)]
stops = [params['pulseStop'] + i*p for i in range(n)]
baseStarts = [params['baseStart'] + i*p for i in range(n)]
baseStops = [params['baseStop'] + i*p for i in range(n)]
if show_all:
fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True)
for i in range(len(starts)):
lbl = 'baseline' if i == 0 else None
lp = 'peak' if i == 0 else None
ax.axvline(baseStarts[i], ls='--', color='k')
ax.axvline(baseStops[i], ls='--', color='k')
ax.axvspan(baseStarts[i], baseStops[i],
alpha=0.5, color='grey', label=lbl)
ax.axvline(starts[i], ls='--', color='r')
ax.axvline(stops[i], ls='--', color='r')
ax.axvspan(starts[i], stops[i],
alpha=0.2, color='r', label=lp)
ax.plot(raw_trace, color='C0', label='raw trace')
ax.legend(fontsize=8)
return fig, [ax]
fig, ax = plt.subplots(1, 2, figsize=(6, 3), constrained_layout=True)
for plot in range(2):
title = 'First pulse' if plot == 0 else 'Last pulse'
i = 0 if plot == 0 else - 1
for j in range(min(2, len(starts))):
if plot == 1:
j = -j
label = 'baseline' if j == 0 else ''
ax[plot].axvline(baseStarts[i+j], ls='--', color='k')
ax[plot].axvline(baseStops[i+j], ls='--', color='k')
ax[plot].axvspan(baseStarts[i+j], baseStops[i+j],
alpha=0.5, color='grey', label=label)
label = 'peak' if j == 0 else ''
ax[plot].axvline(starts[i+j], ls='--', color='r')
ax[plot].axvline(stops[i+j], ls='--', color='r')
ax[plot].axvspan(starts[i+j], stops[i+j],
alpha=0.2, color='r', label=label)
if len(starts) > 1:
period = starts[1] - starts[0]
xmin = np.max([0, baseStarts[i] - int(1.5*period)])
xmax = np.min([stops[i] + int(1.5*period), raw_trace.size])
else:
xmin = np.max([0, baseStarts[i] - 200])
xmax = np.min([stops[i] + 200, raw_trace.size])
ax[plot].plot(np.arange(xmin, xmax),
raw_trace[xmin:xmax], color='C0', label='raw trace')
ax[plot].legend(fontsize=8)
ax[plot].set_xlim(xmin, xmax)
ax[plot].set_xlabel('digitizer samples')
ax[plot].set_title(title, size=10)
return fig, ax
def digitizer_type(run, source):
"""
Finds the digitizer type based on the class Id / name of the source.
Example source: 'SCS_UTC1_MCP/ADC/1'. Defaults to ADQ412 if not found.
"""
ret = None
if '_MCP/ADC/1' in source:
ret = 'FastADC'
if '_ADQ/ADC/1' in source:
ret = 'ADQ412'
if ret is None:
digi_dict = {'FastAdc': 'FastADC',
'FastAdcLegacy': 'FastADC',
'AdqDigitizer': 'ADQ412',
'PyADCChannel': 'FastADC',
'PyADCChannelLegacy': 'FastADC'}
try:
source = source.split(':')[0]
classId = run.get_run_value(source, 'classId.value')
ret = digi_dict.get(classId)
except Exception as e:
log.warning(str(e))
log.warning(f'Could not find digitizer type from source {source}.')
ret = 'ADQ412'
return ret
[docs]def get_digitizer_peaks(proposal, runNB, mnemonic, merge_with=None,
bunchPattern='sase3', integParams=None,
save=False, subdir='usr/processed_runs'):
"""
Extract the peaks from digitizer raw traces. The result can be merged
to an existing `merge_with` dataset and/or saved into an HDF file.
The calculation is a trapezoidal integration between 'pulseStart' and
'pulseStop' with subtraction of a baseline defined as the median between
'baseStart' and 'baseStop'.
The integration parameters can either be provided using integParams, or
computed by a peak finding algorithm if integParams is None.
If the bunchPattern argument is provided, the pulse ids are aligned to it.
If there is a mismatch between the provided parameters or the computed
parameters and the bunch pattern, the bunch pattern parameters prevail.
Parameters
----------
proposal: int
the proposal number
runNB: int
the run number
mnemonic: str
mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd".
The data is either loaded from the DataCollection or taken from
merge_with.
merge_with: xarray Dataset
If provided, the resulting Dataset will be merged with this one.
bunchPattern: str or dict
'sase1' or 'sase3' or 'scs_ppl', 'None': bunch pattern
integParams: dict
dictionnary for raw trace integration, e.g.
{'pulseStart':100, 'pulsestop':200, 'baseStart':50,
'baseStop':99, 'period':24, 'npulses':500}.
If None, integration parameters are computed automatically.
Returns
-------
xarray Dataset with digitizer peak variables. Raw variables are
substituted by the peak caclulated values (e.g. "FastADC2raw" becomes
"FastADC2peaks").
"""
# prepare resulting dataset
if bool(merge_with):
mw_ds = merge_with.drop(mnemonic, errors='ignore')
else:
mw_ds = xr.Dataset()
run = open_run(proposal, runNB)
run_mnemonics = mnemonics_for_run(run)
mnemo = run_mnemonics.get(mnemonic)
if mnemo is None:
log.warning('Mnemonic not found. Skipping.')
return xr.DataArray()
source, key = mnemo['source'], mnemo['key']
extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern)
if extra_dim is None:
extra_dim = 'pulseId'
pattern = None
try:
if bunchPattern == 'sase3':
pattern = XrayPulses(run)
if bunchPattern == 'scs_ppl':
pattern = OpticalLaserPulses(run)
except Exception as e:
log.warning(e)
# load traces and calculate the average
traces = run[source, key].xarray(name=mnemonic, extra_dims=mnemo['dim'])
trace = traces.mean('trainId').rename(mnemonic.replace('raw', 'avg'))
# find integration parameters
params, pulse_ids, regular, trace = find_peak_integration_parameters(
run, mnemonic, trace, integParams, pattern)
# extract peaks
peaks = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim)
peaks = peaks.rename(mnemonic.replace('raw', 'peaks'))
# Align pulse id
if regular is True:
peaks = peaks.assign_coords({extra_dim: pulse_ids})
elif pattern:
mask = pattern.pulse_mask(labelled=False)
mask = xr.DataArray(mask, dims=['trainId', extra_dim],
coords={'trainId': run[source, key].train_ids,
extra_dim: np.arange(mask.shape[1])})
mask = mask.sel({extra_dim: pulse_ids})
peaks = peaks.where(mask, drop=True)
# add peak integration parameters attributes
for p in params:
if params[p] is None:
params[p] = 'None'
peaks.attrs[f'{peaks.name}_{p}'] = params[p]
# save peaks
if save:
save_peaks(proposal, runNB, peaks, trace, subdir)
# merge with existing dataset
ds = mw_ds.merge(peaks, join='inner')
return ds
def save_peaks(proposal, runNB, peaks, avg_trace, subdir):
'''
Save the peaks extracted with extract_digitizer_peaks() as well as the
average raw trace in a dataset at the proposal + subdir location.
If a dataset already exists, the new data is merged with it.
Parameters
----------
proposal: int
the proposal number
runNB: int
the run number
peaks: xarray DataArray
the 2D-array obtained by extract_digitizer_peaks()
avg_trace: xarray DataArray
the average raw trace over the trains
subdir: str
subdirectory. The data is stored in
<proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5
Returns
-------
None
'''
root = find_proposal(f'p{proposal:06d}')
path = os.path.join(root, subdir + f'/r{runNB:04d}/')
os.makedirs(path, exist_ok=True)
fname = path + f'r{runNB:04d}-digitizers-data.h5'
ds_peaks = peaks.to_dataset(promote_attrs=True)
if os.path.isfile(fname):
ds = xr.load_dataset(fname)
ds = ds.drop_vars([peaks.name, avg_trace.name], errors='ignore')
for dim in ds.dims:
if all(dim not in ds[v].dims for v in ds):
ds = ds.drop_dims(dim)
dim_name = 'sampleId'
if 'sampleId' in ds.dims and ds.sizes['sampleId'] != len(avg_trace):
dim_name = 'sampleId2'
avg_trace = avg_trace.rename({avg_trace.dims[0]: dim_name})
if f'params_{peaks.name}' in ds.attrs:
del ds.attrs[f'params_{peaks.name}']
ds = xr.merge([ds, ds_peaks, avg_trace],
combine_attrs='drop_conflicts', join='inner')
else:
ds = ds_peaks.merge(avg_trace.rename({avg_trace.dims[0]: 'sampleId'}))
ds.to_netcdf(fname, format='NETCDF4')
print(f'saved data into {fname}.')
return None
[docs]def load_processed_peaks(proposal, runNB, mnemonic=None,
data='usr/processed_runs', merge_with=None):
"""
Load processed digitizer peaks data.
Parameters
----------
proposal: int
the proposal number
runNB: int
the run number
mnemonic: str
the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'.
If None, the entire dataset is loaded
data: str
subdirectory. The data is stored in
<proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5
merge_with: xarray Dataset
A dataset to merge the data with.
Returns
-------
xarray DataArray if menmonic is not None and merge_with is None
xarray Dataset if mnemonic is None or merge_with is not None.
Example
-------
# load the mono energy and the MCP_BIG peaks
run, ds = tb.load(proposal, runNB, 'nrj')
ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks',
merge_with=ds)
"""
if mnemonic is None:
return load_all_processed_peaks(proposal, runNB, data, merge_with)
root = find_proposal(f'p{proposal:06d}')
path = os.path.join(root, data + f'/r{runNB:04d}/')
fname = path + f'r{runNB:04d}-digitizers-data.h5'
if os.path.isfile(fname):
ds = xr.load_dataset(fname)
if mnemonic not in ds:
log.warning(f'Mnemonic {mnemonic} not found in {fname}')
return {}
da = ds[mnemonic]
da.attrs = {k: ds.attrs[k] for k in ds.attrs if f'{mnemonic}_' in k}
if merge_with is not None:
return merge_with.merge(da.to_dataset(promote_attrs=True),
combine_attrs='drop_conflicts',
join='inner')
else:
return da
else:
log.warning(f'Mnemonic {mnemonic} not found in {fname}')
return merge_with
def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs',
merge_with=None):
"""
Load processed digitizer peaks dataset. The data contains the peaks,
the average raw trace and the integration parameters (attribute) of
each processed digitizer source.
Parameters
----------
proposal: int
the proposal number
runNB: int
the run number
data: str
subdirectory. The data is stored in
<proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5
merge_with: xarray Dataset
A dataset to merge the data with.
Returns
-------
xarray Dataset
"""
root = find_proposal(f'p{proposal:06d}')
path = os.path.join(root, data + f'/r{runNB:04d}/')
fname = path + f'r{runNB:04d}-digitizers-data.h5'
if os.path.isfile(fname):
if merge_with is not None:
return merge_with.merge(xr.load_dataset(fname),
combine_attrs='drop_conflicts',
join='inner')
return xr.load_dataset(fname)
else:
log.warning(f'{fname} not found.')
return merge_with
[docs]def check_processed_peak_params(proposal, runNB, mnemonic,
data='usr/processed_runs',
plot=True, show_all=False):
"""
Check the integration parameters used to generate the processed
peak values.
Parameters
----------
proposal: int
the proposal number
runNB: int
the run number
mnemonic: str
the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'.
If None, the entire dataset is loaded
data: str
subdirectory. The data is stored in
<proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5
plot: bool
If True, displays the raw trace and peak integration regions.
show_all: bool
If True, displays the entire raw trace and all peak integration
regions (this can be time-consuming).
If False, shows the first and last pulses.
Returns
-------
params: dict
the integration parameters with keys ['pulseStart', 'pulseStop',
'baseStart', 'baseStop', 'period', 'npulses'].
See `extract_digitizer_peaks()`.
"""
root = find_proposal(f'p{proposal:06d}')
path = os.path.join(root, data + f'/r{runNB:04d}/')
fname = path + f'r{runNB:04d}-digitizers-data.h5'
if os.path.isfile(fname):
ds = xr.load_dataset(fname)
if mnemonic.replace('raw', 'peaks') not in ds:
log.warning(f'Mnemonic {mnemonic} not found in {fname}')
return {}
params = {k.replace(f'{mnemonic}_', ''): ds.attrs[k] for
k in ds.attrs if f'{mnemonic}_' in k}
if plot:
title = 'Processed peak parameters'
fig, ax = plotPeakIntegrationWindow(
ds[mnemonic.replace('peaks', 'avg')],
params,
show_all=show_all)
fig.suptitle(f'p{proposal} r{runNB} ' + title, size=12)
return params
else:
log.warning(f'{fname} not found.')
return {}