Source code for toolbox_scs.detectors.pes

""" Beam Arrival Monitor related sub-routines

    Copyright (2021) SCS Team.

    (contributions preferrably comply with pep8 code structure
    guidelines.)
"""

import logging
import numpy as np
import xarray as xr
import extra_data as ed
import re
import os
from pathlib import Path
from multiprocessing import Pool
from extra.components import XrayPulses, AdqRawChannel

from ..misc.bunch_pattern_external import is_sase_3
from ..mnemonics_machinery import (mnemonics_to_process,
                                   mnemonics_for_run)
from extra_data.read_machinery import find_proposal
from ..constants import mnemonics as _mnemonics

__all__ = [
    'get_pes_params',
    'get_pes_tof',
    'save_pes_avg_traces',
    'load_pes_avg_traces'
]


log = logging.getLogger(__name__)


[docs]def get_pes_tof(proposal, runNB, mnemonic, start=0, origin=None, width=None, subtract_baseline=False, baseStart=None, baseWidth=40,merge_with=None): """ Extracts time-of-flight spectra from raw digitizer traces. The spectra are aligned by pulse Id using the SASE 3 bunch pattern. If origin is not None, a time coordinate in nanoseconds 'time_ns' is computed and added to the DataArray. Parameters ---------- proposal:int The proposal number. runNB: int The run number. mnemonic: str mnemonic for PES, e.g. "PES_2Araw". start: int starting sample of the first spectrum in the raw trace. origin: int sample of the trace that corresponds to time-of-flight origin, also called prompt. Used to compute the 'time_ns' coordinates. If None, computation of 'time_ns' is skipped. width: int number of samples per spectra. If None, the number of samples for 4.5 MHz repetition rate is used. subtract_baseline: bool If True, subtract baseline defined by baseStart and baseWidth to each spectrum. baseStart: int starting sample of the baseline. baseWidth: int number of samples to average (starting from baseStart) for baseline calculation. merge_with: xarray Dataset If provided, the resulting Dataset will be merged with this one. Returns ------- pes: xarray DataArray DataArray containing the PES time-of-flight spectra. Example ------- >>> import toolbox_scs as tb >>> import toolbox_scs.detectors as tbdet >>> proposal, runNB = 900447, 12 >>> pes = tbdet.get_pes_tof(proposal, runNB, 'PES_2Araw', >>> start=2557, origin=76) """ run = ed.open_run(proposal, runNB) all_mnemonics = mnemonics_for_run(run) mnemo = all_mnemonics.get(mnemonic) if mnemo is None: print(f'Mnemonic {mnemonic} not found in run. Skipping.') return None PULSE_PERIOD = 440 pattern = XrayPulses(run) regular = True if pattern.is_constant_pattern() is False: print('The number of pulses changed during the run.') pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) regular = False else: pulse_ids = pattern.peek_pulse_ids(labelled=False) npulses = len(pulse_ids) period = 1 if npulses > 1: periods = np.diff(pulse_ids) if len(np.unique(periods)) > 1: regular = False period = min(periods) npulses = int((max(pulse_ids) - min(pulse_ids)) / period) + 1 period *= PULSE_PERIOD kd = run[mnemo['source'], mnemo['key']] nsamples = kd.shape[1] npulses_trace = int(np.floor((nsamples - start) / period)) if npulses_trace < npulses: log.warning(f'The digitizer only recorded {npulses_trace} pulses ' f'out of the {npulses} pulses in the train.') else: npulses_trace = npulses indices = np.array([start + i*period for i in range(npulses_trace + 1)]) outp = kd.ndarray() outp = outp[:, indices[0]:indices[-1]].reshape([outp.shape[0], npulses_trace, period]) spectra = xr.DataArray(outp, dims=['trainId', 'sa3_pId', 'sampleId'], coords={'trainId': kd.train_id_coordinates(), 'sa3_pId': pulse_ids[:npulses_trace], 'sampleId': np.arange(period)}) if subtract_baseline: if baseStart is None: baseStart = 0 spectra = spectra - spectra.isel( sampleId=slice(baseStart, baseStart + baseWidth)).mean('sampleId') if width is None: width = PULSE_PERIOD spectra = spectra.isel(sampleId=slice(0, width), drop=True) spectra.attrs['start'] = start if origin is not None: try: adq = AdqRawChannel(run, mnemonic.split('_')[1].split('raw')[0]) sample_rate = adq.sampling_rate except Exception as e: log.warning(e) sample_rate = 1986111111.1111112 origin -= start time_ns = (np.arange(spectra.sizes['sampleId']) - origin)/sample_rate * 1e9 spectra = spectra.assign_coords(time_ns=('sampleId', time_ns)) spectra.attrs['origin'] = origin spectra = spectra.rename(mnemonic.replace('raw', 'spectrum')) if merge_with is not None: return merge_with.merge(spectra.to_dataset(promote_attrs=True), join='inner') return spectra.rename(mnemonic.replace('raw', 'spectrum'))
[docs]def get_pes_params(run, channel=None): """ Extract PES parameters for a given extra_data DataCollection. Parameters are gas, binding energy, retardation voltages or all voltages of the MPOD. Parameters ---------- run: extra_data.DataCollection DataCollection containing the digitizer data channel: str Channel name or PES mnemonic, e.g. '2A' or 'PES_1Craw'. If None, or if the channel is not found in the data, the retardation voltage for all channels is retrieved. Returns ------- params: dict dictionnary of PES parameters """ params = {} sel = run.select_trains(ed.by_index[:20]) gas_dict = {'N2': 409.9, 'Ne': 870.2, 'Kr': 1921, 'Xe': 1148.7} for gas in gas_dict.keys(): mnemo = _mnemonics[f'PES_{gas}'][0] arr = sel.get_run_value(mnemo['source'], mnemo['key']) if arr == 'ON': params['gas'] = gas params['binding_energy'] = gas_dict[gas] break if 'gas' not in params: params['gas'] = 'unknown' log.warning('Could not find which PES gas was used.') mpod_mapper = 'SA3_XTD10_PES/MDL/MPOD_MAPPER' if mpod_mapper in run.all_sources: if channel is None: channel = [f'{a//4 + 1}{b}' for a, b in enumerate( ['A', 'B', 'C', 'D']*4)] else: if 'raw' in channel: channel = channel.split('raw')[0].split('_')[1] channel = [channel] for c in channel: rv = get_pes_rv(run, c, mpod_mapper) if rv is not None: params[f'{c}_RV'] = rv elif 'SA3_XTD10_PES/MDL/DAQ_MPOD' in run.all_sources: voltages = get_pes_voltages(run) if voltages is not None: params.update(voltages) return params
def get_pes_rv(run, channel, mpod_mapper='SA3_XTD10_PES/MDL/MPOD_MAPPER'): channel_to_group = {'4D': 1, '4B': 1, '4C': 1, '4A': 1, '2A': 2, '1C': 2, '2C': 2, '2D': 2, '3D': 3, '3B': 3, '2B': 3, '3A': 3, '1A': 4, '3C': 4, '1B': 4, '1D': 4} group = channel_to_group.get(channel) if group is None: log.warning(f'Could not find group for channel {channel}.') return None key = f'Group{group}D.channelMeasurementSenseVoltage.value' voltage = run[mpod_mapper, key].ndarray().mean() return voltage def get_pes_voltages(run, device='SA3_XTD10_PES/MDL/DAQ_MPOD'): """ Extract PES voltages read by the MDL watchdog of the MPOD device. Parameters ---------- run: extra_data.DataCollection DataCollection containing PES data. device: string Name of the device containing the voltage data. Returns ------- voltages: dict dictionnary of voltages, empty if device not found. """ if device not in run.all_sources: log.warning(f'Could not find {device} in run. Skipping.') return None a = re.compile("[u]\d{3}.value") tid, da = run.train_from_index(0, devices=device) voltages = {} for k in da[device]: if len(a.findall(k)) == 1: voltages[k.split('.')[0]] = da[device][k] return voltages def calculate_average(run, name, mnemo): return run[mnemo['source'], mnemo['key']].xarray( name=name, extra_dims=mnemo['dim']).mean('trainId')
[docs]def save_pes_avg_traces(proposal, runNB, channels=None, subdir='usr/processed_runs'): ''' Save average traces of PES into an h5 file. Parameters ---------- proposal:int The proposal number. runNB: int The run number. channels: str or list The PES channels or mnemonics, e.g. '2A', ['2A', '3C'], ['PES_1Araw', 'PES_4Draw', '3B'] subdir: str subdirectory. The data is stored in <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-pes-data.h5' Output ------ xarray Dataset saved in a h5 file containing the PES average traces. ''' root = find_proposal(f'p{proposal:06d}') path = os.path.join(root, subdir + f'/r{runNB:04d}/') Path(path).mkdir(parents=True, exist_ok=True) fname = path + f'r{runNB:04d}-pes-data.h5' if channels is None: channels = [f'{a//4 + 1}{b}' for a, b in enumerate( ['A', 'B', 'C', 'D']*4)] if isinstance(channels, str): channels = [channels] run = ed.open_run(proposal, runNB, parallelize=False) all_mnemos = mnemonics_for_run(run) # use multiprocessing.Pool args = [] for c in channels: m = c if 'raw' not in c: m = f'PES_{c}raw' if m not in all_mnemos: continue args.append([run, m.replace('raw', 'avg'), all_mnemos[m]]) if len(args) == 0: log.warning('No pes average trace to save. Skipping') return with Pool(len(args)) as pool: avg_traces = pool.starmap(calculate_average, args) avg_traces = xr.merge(avg_traces) ds = xr.Dataset() if os.path.isfile(fname): ds = xr.load_dataset(fname) ds = ds.drop_vars(channels, errors='ignore') ds = ds.merge(avg_traces) ds.to_netcdf(fname, format='NETCDF4') return
[docs]def load_pes_avg_traces(proposal, runNB, channels=None, subdir='usr/processed_runs'): ''' Load existing PES average traces. Parameters ---------- proposal:int The proposal number. runNB: int The run number. channels: str or list The PES channels or mnemonics, e.g. '2A', ['2A', '3C'], ['PES_1Araw', 'PES_4Draw', '3B'] subdir: str subdirectory. The data is stored in <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-pes-data.h5' Output ------ ds: xarray Dataset dataset containing the PES average traces. ''' root = find_proposal(f'p{proposal:06d}') path = os.path.join(root, subdir + f'/r{runNB:04d}/') fname = path + f'r{runNB:04d}-pes-data.h5' if channels is None: channels = [f'PES_{a//4 + 1}{b}avg' for a, b in enumerate(['A', 'B', 'C', 'D']*4)] if isinstance(channels, str): channels = [channels] for i, c in enumerate(channels): if 'PES_' not in c and 'avg' not in c: channels[i] = f'PES_{c}avg' if os.path.isfile(fname): ds = xr.load_dataset(fname) channels = [c for c in channels if c in ds] ds = ds[channels] return ds else: log.warning(f'{fname} is not a valid file.')