""" Beam Arrival Monitor related sub-routines
Copyright (2021) SCS Team.
(contributions preferrably comply with pep8 code structure
guidelines.)
"""
import logging
import numpy as np
import xarray as xr
import extra_data as ed
import re
from ..misc.bunch_pattern_external import is_sase_3
from ..mnemonics_machinery import (mnemonics_to_process,
mnemonics_for_run)
from ..constants import mnemonics as _mnemonics
__all__ = [
'get_pes_params',
'get_pes_tof',
]
log = logging.getLogger(__name__)
[docs]def get_pes_tof(run, mnemonics=None, merge_with=None,
start=31390, width=300, origin=None, width_ns=None,
subtract_baseline=True,
baseStart=None, baseWidth=80,
sample_rate=2e9):
"""
Extracts time-of-flight spectra from raw digitizer traces. The
tracesvare either loaded via ToolBox mnemonics or those in the
optionally provided merge_with dataset. The spectra are aligned
by pulse Id using the SASE 3 bunch pattern, and have time coordinates
in nanoseconds.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data
mnemonics: str or list of str
mnemonics for PES, e.g. "PES_W_raw" or ["PES_W_raw", "PES_ENE_raw"].
If None and no merge_with dataset is provided, defaults to "PES_W_raw".
merge_with: xarray Dataset
If provided, the resulting Dataset will be merged with this
one. The PES variables of merge_with (if any) will also be
computed and merged.
start: int
starting sample of the first spectrum in the raw trace.
width: int
number of samples per spectra.
origin: int
sample of the raw trace that corresponds to time-of-flight origin.
If None, origin is equal to start.
width_ns: float
time window for one spectrum. If None, the time window is defined by
width / sample rate.
subtract_baseline: bool
If True, subtract baseline defined by baseStart and baseWidth to each
spectrum.
baseStart: int
starting sample of the baseline.
baseWidth: int
number of samples to average (starting from baseStart) for baseline
calculation.
sample_rate: float
sample rate of the digitizer.
Returns
-------
pes: xarray Dataset
Dataset containing the PES time-of-flight spectra (e.g. "PES_W_tof"),
merged with optionally provided merg_with dataset.
Example
-------
>>> import toolbox_scs as tb
>>> import toolbox_scs.detectors as tbdet
>>> run, ds = tb.load(2927, 100, "PES_W_raw")
>>> pes = tbdet.get_pes_tof(run, merge_with=ds)
"""
def to_processed_name(name):
return name.replace('raw', 'tof')
to_process = mnemonics_to_process(mnemonics, merge_with,
'PES', to_processed_name)
run_mnemonics = mnemonics_for_run(run)
# check if bunch pattern table exists
if bool(merge_with) and 'bunchPatternTable' in merge_with:
bpt = merge_with['bunchPatternTable']
elif 'bunchPatternTable' in run_mnemonics:
bpt = run.get_array(*run_mnemonics['bunchPatternTable'].values())
elif 'bunchPatternTable_SA3' in run_mnemonics:
bpt = run.get_array(*run_mnemonics['bunchPatternTable_SA3'].values())
else:
bpt = None
mask = is_sase_3(bpt).assign_coords({'pulse_slot': np.arange(2700)})
mask_on = mask.where(mask, drop=True)
npulses = mask.sum(dim='pulse_slot')[0].values
if npulses > 1:
period = mask_on['pulse_slot'].diff(dim='pulse_slot')[0].values
else:
period = 0
if origin is None:
origin = start
if baseStart is None:
baseStart = start
if width_ns is not None:
width = int(sample_rate * width_ns * 1e-9)
time_ns = 1e9 * (np.arange(start, start + width) - origin) / sample_rate
ds = xr.Dataset()
for m in to_process:
if bool(merge_with) and m in merge_with:
arr = merge_with[m]
else:
arr = run.get_array(*run_mnemonics[m].values(), name=m)
if arr.sizes['PESsampleId'] < npulses*period*440 + start + width:
log.warning('Not all pulses were recorded. The number of samples '
f'on the digitizer {arr.sizes["PESsampleId"]} is not '
f'enough to cover the {npulses} spectra. Missing '
'spectra will be filled with NaNs.')
spectra = []
for p in range(npulses):
begin = p*period*440 + start
end = begin + width
if end > arr.sizes['PESsampleId']:
break
pes = arr.isel(PESsampleId=slice(begin, end))
if subtract_baseline:
baseBegin = p*period*440 + baseStart
baseEnd = baseBegin + baseWidth
bl = arr.isel(
PESsampleId=slice(baseBegin, baseEnd)).mean(dim='PESsampleId')
pes = pes - bl
spectra.append(pes)
spectra = xr.concat(spectra,
dim='sa3_pId').rename(m.replace('raw', 'tof'))
ds = ds.merge(spectra)
if len(ds.variables) > 0:
ds = ds.assign_coords(
{'sa3_pId': mask_on['pulse_slot'][:ds.sizes['sa3_pId']].values})
ds = ds.rename({'PESsampleId': 'time_ns'})
ds = ds.assign_coords({'time_ns': time_ns})
if bool(merge_with):
ds = merge_with.drop(to_process,
errors='ignore').merge(ds, join='left')
return ds
[docs]def get_pes_params(run):
"""
Extract PES parameters for a given extra_data DataCollection.
Parameters are gas, binding energy, voltages of the MPOD.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data
Returns
-------
params: dict
dictionnary of PES parameters
"""
params = {}
sel = run.select_trains(ed.by_index[:20])
gas_dict = {'N2': 409.9, 'Ne': 870.2, 'Kr': 1921, 'Xe': 1148.7}
for gas in gas_dict.keys():
mnemo = _mnemonics[f'PES_{gas}'][0]
arr = sel.get_run_value(mnemo['source'], mnemo['key'])
if arr == 'ON':
params['gas'] = gas
params['binding_energy'] = gas_dict[gas]
break
if 'gas' not in params:
params['gas'] = 'unknown'
log.warning('Could not find which PES gas was used.')
voltages = get_pes_voltages(run)
params.update(voltages)
return params
def get_pes_voltages(run, device='SA3_XTD10_PES/MDL/DAQ_MPOD'):
"""
Extract PES voltages read by the MDL watchdog of the MPOD device.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing PES data.
device: string
Name of the device containing the voltage data.
Returns
-------
voltages: dict
dictionnary of voltages
"""
a = re.compile('[u]\d{3}.value')
tid, da = run.train_from_index(0, devices=device)
voltages = {}
for k in da[device]:
if len(a.findall(k)) == 1:
voltages[k.split('.')[0]] = da[device][k]
return voltages