""" Beam Arrival Monitor related sub-routines
Copyright (2021) SCS Team.
(contributions preferrably comply with pep8 code structure
guidelines.)
"""
import logging
import numpy as np
import xarray as xr
import extra_data as ed
import re
import os
from pathlib import Path
from multiprocessing import Pool
from extra.components import XrayPulses, AdqRawChannel
from ..misc.bunch_pattern_external import is_sase_3
from ..mnemonics_machinery import (mnemonics_to_process,
mnemonics_for_run)
from extra_data.read_machinery import find_proposal
from ..constants import mnemonics as _mnemonics
__all__ = [
'get_pes_params',
'get_pes_tof',
'save_pes_avg_traces',
'load_pes_avg_traces'
]
log = logging.getLogger(__name__)
[docs]def get_pes_tof(proposal, runNB, mnemonic, start=0, origin=None,
width=None, subtract_baseline=False,
baseStart=None, baseWidth=40,merge_with=None):
"""
Extracts time-of-flight spectra from raw digitizer traces. The spectra
are aligned by pulse Id using the SASE 3 bunch pattern. If origin is
not None, a time coordinate in nanoseconds 'time_ns' is computed and
added to the DataArray.
Parameters
----------
proposal:int
The proposal number.
runNB: int
The run number.
mnemonic: str
mnemonic for PES, e.g. "PES_2Araw".
start: int
starting sample of the first spectrum in the raw trace.
origin: int
sample of the trace that corresponds to time-of-flight origin,
also called prompt. Used to compute the 'time_ns'
coordinates. If None, computation of 'time_ns' is skipped.
width: int
number of samples per spectra. If None, the number of samples
for 4.5 MHz repetition rate is used.
subtract_baseline: bool
If True, subtract baseline defined by baseStart and baseWidth to each
spectrum.
baseStart: int
starting sample of the baseline.
baseWidth: int
number of samples to average (starting from baseStart) for baseline
calculation.
merge_with: xarray Dataset
If provided, the resulting Dataset will be merged with this
one.
Returns
-------
pes: xarray DataArray
DataArray containing the PES time-of-flight spectra.
Example
-------
>>> import toolbox_scs as tb
>>> import toolbox_scs.detectors as tbdet
>>> proposal, runNB = 900447, 12
>>> pes = tbdet.get_pes_tof(proposal, runNB, 'PES_2Araw',
>>> start=2557, origin=76)
"""
run = ed.open_run(proposal, runNB)
all_mnemonics = mnemonics_for_run(run)
mnemo = all_mnemonics.get(mnemonic)
if mnemo is None:
print(f'Mnemonic {mnemonic} not found in run. Skipping.')
return None
PULSE_PERIOD = 440
pattern = XrayPulses(run)
regular = True
if pattern.is_constant_pattern() is False:
print('The number of pulses changed during the run.')
pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False))
regular = False
else:
pulse_ids = pattern.peek_pulse_ids(labelled=False)
npulses = len(pulse_ids)
period = 1
if npulses > 1:
periods = np.diff(pulse_ids)
if len(np.unique(periods)) > 1:
regular = False
period = min(periods)
npulses = int((max(pulse_ids) - min(pulse_ids)) / period) + 1
period *= PULSE_PERIOD
kd = run[mnemo['source'], mnemo['key']]
nsamples = kd.shape[1]
npulses_trace = int(np.floor((nsamples - start) / period))
if npulses_trace < npulses:
log.warning(f'The digitizer only recorded {npulses_trace} pulses '
f'out of the {npulses} pulses in the train.')
else:
npulses_trace = npulses
indices = np.array([start + i*period for i in range(npulses_trace + 1)])
outp = kd.ndarray()
outp = outp[:, indices[0]:indices[-1]].reshape([outp.shape[0],
npulses_trace, period])
spectra = xr.DataArray(outp, dims=['trainId', 'sa3_pId', 'sampleId'],
coords={'trainId': kd.train_id_coordinates(),
'sa3_pId': pulse_ids[:npulses_trace],
'sampleId': np.arange(period)})
if subtract_baseline:
if baseStart is None:
baseStart = 0
spectra = spectra - spectra.isel(
sampleId=slice(baseStart, baseStart + baseWidth)).mean('sampleId')
if width is None:
width = PULSE_PERIOD
spectra = spectra.isel(sampleId=slice(0, width), drop=True)
spectra.attrs['start'] = start
if origin is not None:
try:
adq = AdqRawChannel(run, mnemonic.split('_')[1].split('raw')[0])
sample_rate = adq.sampling_rate
except Exception as e:
log.warning(e)
sample_rate = 1986111111.1111112
origin -= start
time_ns = (np.arange(spectra.sizes['sampleId'])
- origin)/sample_rate * 1e9
spectra = spectra.assign_coords(time_ns=('sampleId', time_ns))
spectra.attrs['origin'] = origin
spectra = spectra.rename(mnemonic.replace('raw', 'spectrum'))
if merge_with is not None:
return merge_with.merge(spectra.to_dataset(promote_attrs=True),
join='inner')
return spectra.rename(mnemonic.replace('raw', 'spectrum'))
[docs]def get_pes_params(run, channel=None):
"""
Extract PES parameters for a given extra_data DataCollection.
Parameters are gas, binding energy, retardation voltages or all
voltages of the MPOD.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing the digitizer data
channel: str
Channel name or PES mnemonic, e.g. '2A' or 'PES_1Craw'.
If None, or if the channel is not found in the data,
the retardation voltage for all channels is retrieved.
Returns
-------
params: dict
dictionnary of PES parameters
"""
params = {}
sel = run.select_trains(ed.by_index[:20])
gas_dict = {'N2': 409.9, 'Ne': 870.2, 'Kr': 1921, 'Xe': 1148.7}
for gas in gas_dict.keys():
mnemo = _mnemonics[f'PES_{gas}'][0]
arr = sel.get_run_value(mnemo['source'], mnemo['key'])
if arr == 'ON':
params['gas'] = gas
params['binding_energy'] = gas_dict[gas]
break
if 'gas' not in params:
params['gas'] = 'unknown'
log.warning('Could not find which PES gas was used.')
mpod_mapper = 'SA3_XTD10_PES/MDL/MPOD_MAPPER'
if mpod_mapper in run.all_sources:
if channel is None:
channel = [f'{a//4 + 1}{b}' for a, b in enumerate(
['A', 'B', 'C', 'D']*4)]
else:
if 'raw' in channel:
channel = channel.split('raw')[0].split('_')[1]
channel = [channel]
for c in channel:
rv = get_pes_rv(run, c, mpod_mapper)
if rv is not None:
params[f'{c}_RV'] = rv
elif 'SA3_XTD10_PES/MDL/DAQ_MPOD' in run.all_sources:
voltages = get_pes_voltages(run)
if voltages is not None:
params.update(voltages)
return params
def get_pes_rv(run, channel, mpod_mapper='SA3_XTD10_PES/MDL/MPOD_MAPPER'):
channel_to_group = {'4D': 1, '4B': 1, '4C': 1, '4A': 1,
'2A': 2, '1C': 2, '2C': 2, '2D': 2,
'3D': 3, '3B': 3, '2B': 3, '3A': 3,
'1A': 4, '3C': 4, '1B': 4, '1D': 4}
group = channel_to_group.get(channel)
if group is None:
log.warning(f'Could not find group for channel {channel}.')
return None
key = f'Group{group}D.channelMeasurementSenseVoltage.value'
voltage = run[mpod_mapper, key].ndarray().mean()
return voltage
def get_pes_voltages(run, device='SA3_XTD10_PES/MDL/DAQ_MPOD'):
"""
Extract PES voltages read by the MDL watchdog of the MPOD device.
Parameters
----------
run: extra_data.DataCollection
DataCollection containing PES data.
device: string
Name of the device containing the voltage data.
Returns
-------
voltages: dict
dictionnary of voltages, empty if device not found.
"""
if device not in run.all_sources:
log.warning(f'Could not find {device} in run. Skipping.')
return None
a = re.compile("[u]\d{3}.value")
tid, da = run.train_from_index(0, devices=device)
voltages = {}
for k in da[device]:
if len(a.findall(k)) == 1:
voltages[k.split('.')[0]] = da[device][k]
return voltages
def calculate_average(run, name, mnemo):
return run[mnemo['source'], mnemo['key']].xarray(
name=name, extra_dims=mnemo['dim']).mean('trainId')
[docs]def save_pes_avg_traces(proposal, runNB, channels=None,
subdir='usr/processed_runs'):
'''
Save average traces of PES into an h5 file.
Parameters
----------
proposal:int
The proposal number.
runNB: int
The run number.
channels: str or list
The PES channels or mnemonics, e.g. '2A', ['2A', '3C'],
['PES_1Araw', 'PES_4Draw', '3B']
subdir: str
subdirectory. The data is stored in
<proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-pes-data.h5'
Output
------
xarray Dataset saved in a h5 file containing the PES average traces.
'''
root = find_proposal(f'p{proposal:06d}')
path = os.path.join(root, subdir + f'/r{runNB:04d}/')
Path(path).mkdir(parents=True, exist_ok=True)
fname = path + f'r{runNB:04d}-pes-data.h5'
if channels is None:
channels = [f'{a//4 + 1}{b}' for a, b in enumerate(
['A', 'B', 'C', 'D']*4)]
if isinstance(channels, str):
channels = [channels]
run = ed.open_run(proposal, runNB, parallelize=False)
all_mnemos = mnemonics_for_run(run)
# use multiprocessing.Pool
args = []
for c in channels:
m = c
if 'raw' not in c:
m = f'PES_{c}raw'
if m not in all_mnemos:
continue
args.append([run, m.replace('raw', 'avg'), all_mnemos[m]])
if len(args) == 0:
log.warning('No pes average trace to save. Skipping')
return
with Pool(len(args)) as pool:
avg_traces = pool.starmap(calculate_average, args)
avg_traces = xr.merge(avg_traces)
ds = xr.Dataset()
if os.path.isfile(fname):
ds = xr.load_dataset(fname)
ds = ds.drop_vars(channels, errors='ignore')
ds = ds.merge(avg_traces)
ds.to_netcdf(fname, format='NETCDF4')
return
[docs]def load_pes_avg_traces(proposal, runNB, channels=None,
subdir='usr/processed_runs'):
'''
Load existing PES average traces.
Parameters
----------
proposal:int
The proposal number.
runNB: int
The run number.
channels: str or list
The PES channels or mnemonics, e.g. '2A', ['2A', '3C'],
['PES_1Araw', 'PES_4Draw', '3B']
subdir: str
subdirectory. The data is stored in
<proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-pes-data.h5'
Output
------
ds: xarray Dataset
dataset containing the PES average traces.
'''
root = find_proposal(f'p{proposal:06d}')
path = os.path.join(root, subdir + f'/r{runNB:04d}/')
fname = path + f'r{runNB:04d}-pes-data.h5'
if channels is None:
channels = [f'PES_{a//4 + 1}{b}avg' for a, b in
enumerate(['A', 'B', 'C', 'D']*4)]
if isinstance(channels, str):
channels = [channels]
for i, c in enumerate(channels):
if 'PES_' not in c and 'avg' not in c:
channels[i] = f'PES_{c}avg'
if os.path.isfile(fname):
ds = xr.load_dataset(fname)
channels = [c for c in channels if c in ds]
ds = ds[channels]
return ds
else:
log.warning(f'{fname} is not a valid file.')