""" Toolbox for SCS.
Various utilities function to quickly process data measured
at the SCS instrument.
Copyright (2019-) SCS Team.
"""
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
import re
from toolbox_scs.misc.laser_utils import positionToDelay as pTd
from toolbox_scs.routines.XAS import xas
__all__ = [
'reflectivity'
]
def prepare_reflectivity_ds(ds, Iokey, Irkey, alternateTrains,
pumpOnEven, pumpedOnly):
"""
Sorts the dataset according to the bunch pattern:
Alternating pumped/unpumped pulses, alternating pumped/unpumped
trains, or pumped only.
"""
assert ds[Iokey].dims == ds[Irkey].dims, \
f"{Iokey} and {Irkey} do not share dimensions."
if alternateTrains:
p = 0 if pumpOnEven else 1
pumped_tid = ds['trainId'].where(ds.trainId % 2 == p, drop=True)
unpumped_tid = ds['trainId'].where(ds.trainId % 2 == int(not p),
drop=True)
max_size = min(pumped_tid.size, unpumped_tid.size)
pumped = ds.sel(trainId=pumped_tid[:max_size])
unpumped = ds.sel(trainId=unpumped_tid[:max_size]
).assign_coords(trainId=pumped.trainId)
for v in [Iokey, Irkey]:
pumped[v+'_unpumped'] = unpumped[v].rename(v+'_unpumped')
ds = pumped
elif pumpedOnly is False:
# check that number of pulses is even with pumped / unpumped pattern
dim_name = [dim for dim in ds[Iokey].dims if dim != 'trainId'][0]
if ds[dim_name].size % 2 == 1:
ds = ds.isel({dim_name: slice(0, -1)})
print('The dataset contains an odd number of pulses '
'per train. Ignoring the last pulse.')
pumped = ds.isel({dim_name: slice(0, None, 2)})
unpumped = ds.isel({dim_name: slice(1, None, 2)}).assign_coords(
{dim_name: pumped[dim_name]})
for v in [Iokey, Irkey]:
pumped[v+'_unpumped'] = unpumped[v].rename(v+'_unpumped')
ds = pumped
return ds
[docs]def reflectivity(data, Iokey='FastADC5peaks', Irkey='FastADC3peaks',
delaykey='PP800_DelayLine', binWidth=0.05,
positionToDelay=True, origin=None, invert=False,
pumpedOnly=False, alternateTrains=False, pumpOnEven=True,
Ioweights=False, plot=True, plotErrors=True, units='mm'
):
"""
Computes the reflectivity R = 100*(Ir/Io[pumped] / Ir/Io[unpumped] - 1)
as a function of delay. Delay can be a motor position in mm or an
optical delay in ps, with possibility to convert from position to delay.
The default scheme is alternating pulses pumped/unpumped/... in each
train, also possible are alternating trains and pumped only.
If fitting is enabled, attempts a double exponential (default) or step
function fit.
Parameters
----------
data: xarray Dataset
Dataset containing the Io, Ir and delay data
Iokey: str
Name of the Io variable
Irkey: str
Name of the Ir variable
delaykey: str
Name of the delay variable (motor position in mm or
optical delay in ps)
binWidth: float
width of bin in units of delay variable
positionToDelay: bool
If True, adds a time axis converted from position axis according
to origin and invert parameters. Ignored if origin is None.
origin: float
Position of time overlap, shown as a vertical line.
Used if positionToDelay is True to convert position to time axis.
invert: bool
Used if positionToDelay is True to convert position to time axis.
pumpedOnly: bool
Assumes that all trains and pulses are pumped. In this case,
Delta R is defined as Ir/Io.
alternateTrains: bool
If True, assumes that trains alternate between pumped and
unpumped data.
pumpOnEven: bool
Only used if alternateTrains=True. If True, even trains are pumped,
if False, odd trains are pumped.
Ioweights: bool
If True, computes the ratio of the means instead of the mean of
the ratios Irkey/Iokey. Useful when dealing with large intensity
variations.
plot: bool
If True, plots the results.
plotErrors: bool
If True, plots the 95% confidence interval.
Output
------
xarray Dataset containing the binned Delta R, standard deviation,
standard error, counts and delays, and the fitting results if full
is True.
"""
# select relevant variables from dataset
variables = [Iokey, Irkey, delaykey]
ds = data[variables]
# prepare dataset according to pulse pattern
ds = prepare_reflectivity_ds(ds, Iokey, Irkey, alternateTrains,
pumpOnEven, pumpedOnly)
if (len(ds[delaykey].dims) > 1) and (ds[delaykey].dims !=
ds[Iokey].dims):
raise ValueError("Dimensions mismatch: delay variable has dims "
f"{ds[delaykey].dims} but (It, Io) variables have "
f"dims {ds[Iokey].dims}.")
bin_delays = binWidth * np.round(ds[delaykey] / binWidth)
ds[delaykey+'_binned'] = bin_delays
counts = xr.ones_like(ds[Iokey]).groupby(bin_delays).sum(...)
if Ioweights is False:
ds['deltaR'] = ds[Irkey]/ds[Iokey]
if pumpedOnly is False:
ds['deltaR'] = 100*(ds['deltaR'] /
(ds[Irkey+'_unpumped']/ds[Iokey+'_unpumped']) - 1)
groupBy = ds.groupby(bin_delays)
binned = groupBy.mean(...)
std = groupBy.std(...)
binned['deltaR_std'] = std['deltaR']
binned['deltaR_stderr'] = std['deltaR'] / np.sqrt(counts)
binned['counts'] = counts.astype(int)
else:
xas_pumped = xas(ds, Iokey=Iokey, Itkey=Irkey, nrjkey=delaykey,
fluorescence=True, bins=binWidth)
if pumpedOnly:
deltaR = xas_pumped['muA']
stddev = xas_pumped['sigmaA']
else:
xas_unpumped = xas(ds, Iokey=Iokey+'_unpumped',
Itkey=Irkey+'_unpumped', nrjkey=delaykey,
fluorescence=True, bins=binWidth)
deltaR = 100*(xas_pumped['muA'] / xas_unpumped['muA'])
stddev = np.abs(deltaR) * np.sqrt(
(xas_pumped['sigmaA']/xas_pumped['muA'])**2 +
(xas_unpumped['sigmaA']/xas_unpumped['muA'])**2)
deltaR -= 100
deltaR = xr.DataArray(deltaR, dims=delaykey, name='deltaR',
coords={delaykey: xas_pumped['nrj']})
stddev = xr.DataArray(stddev, dims=delaykey, name='deltaR_std',
coords={delaykey: xas_pumped['nrj']})
stderr = xr.DataArray(stddev / np.sqrt(xas_pumped['counts']),
dims=delaykey, name='deltaR_stderr',
coords={delaykey: xas_pumped['nrj']})
counts = xr.DataArray(xas_pumped['counts'], dims=delaykey,
name='counts',
coords={delaykey: xas_pumped['nrj']})
binned = xr.merge([deltaR, stddev, stderr, counts])
# copy attributes
for key, val in data.attrs.items():
binned.attrs[key] = val
binned = binned.rename({delaykey: 'delay'})
if plot:
plot_reflectivity(binned, delaykey, positionToDelay,
origin, invert, plotErrors, units)
return binned
def plot_reflectivity(data, delaykey, positionToDelay, origin,
invert, plotErrors, units):
fig, ax = plt.subplots(figsize=(6, 4), constrained_layout=True)
ax.plot(data['delay'], data['deltaR'], 'o-', color='C0')
xlabel = delaykey + f' [{units}]'
if plotErrors:
ax.fill_between(data['delay'],
data['deltaR'] - 1.96*data['deltaR_stderr'],
data['deltaR'] + 1.96*data['deltaR_stderr'],
color='C0', alpha=0.2)
ax2 = ax.twinx()
ax2.bar(data['delay'], data['counts'],
width=0.80*(data['delay'][1]-data['delay'][0]),
color='C1', alpha=0.2)
ax2.set_ylabel('counts', color='C1', fontsize=13)
ax2.set_ylim(0, data['counts'].max()*3)
if origin is not None:
ax.axvline(origin, color='grey', ls='--')
if positionToDelay:
ax3 = ax.twiny()
xmin, xmax = ax.get_xlim()
ax3.set_xlim(pTd(xmin, origin, invert),
pTd(xmax, origin, invert),)
ax3.set_xlabel('delay [ps]', fontsize=13)
try:
proposalNB = int(re.findall(r'p(\d{6})',
data.attrs['proposal'])[0])
runNB = data.attrs['runNB']
ax.set_title(f'run {runNB} p{proposalNB}', fontsize=14)
except Exception:
if 'plot_title' in data.attrs:
ax.set_title(data.attrs['plot_title'])
ax.set_xlabel(xlabel, fontsize=13)
ax.set_ylabel(r'$\Delta R$ [%]', color='C0', fontsize=13)
ax.grid()
return fig, ax