Source code for toolbox_scs.routines.Reflectivity

""" Toolbox for SCS.

    Various utilities function to quickly process data measured
    at the SCS instrument.

    Copyright (2019-) SCS Team.
"""
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
import re
from toolbox_scs.misc.laser_utils import positionToDelay as pTd
from toolbox_scs.routines.XAS import xas

__all__ = [
    'reflectivity'
]


def prepare_reflectivity_ds(ds, Iokey, Irkey, alternateTrains,
                            pumpOnEven, pumpedOnly):
    """
    Sorts the dataset according to the bunch pattern:
    Alternating pumped/unpumped pulses, alternating pumped/unpumped
    trains, or pumped only.
    """
    assert ds[Iokey].dims == ds[Irkey].dims, \
    f"{Iokey} and {Irkey} do not share dimensions."
    if alternateTrains:
        p = 0 if pumpOnEven else 1
        pumped_tid = ds['trainId'].where(ds.trainId % 2 == p, drop=True)
        unpumped_tid = ds['trainId'].where(ds.trainId % 2 == int(not p),
                                           drop=True)
        max_size = min(pumped_tid.size, unpumped_tid.size)
        pumped = ds.sel(trainId=pumped_tid[:max_size])
        unpumped = ds.sel(trainId=unpumped_tid[:max_size]
                          ).assign_coords(trainId=pumped.trainId)
        for v in [Iokey, Irkey]:
            pumped[v+'_unpumped'] = unpumped[v].rename(v+'_unpumped')
        ds = pumped
    elif pumpedOnly is False:
        # check that number of pulses is even with pumped / unpumped pattern
        dim_name = [dim for dim in ds[Iokey].dims if dim != 'trainId'][0]
        if ds[dim_name].size % 2 == 1:
            ds = ds.isel({dim_name: slice(0, -1)})
            print('The dataset contains an odd number of pulses '
                  'per train. Ignoring the last pulse.')
        pumped = ds.isel({dim_name: slice(0, None, 2)})
        unpumped = ds.isel({dim_name: slice(1, None, 2)}).assign_coords(
            {dim_name: pumped[dim_name]})
        for v in [Iokey, Irkey]:
            pumped[v+'_unpumped'] = unpumped[v].rename(v+'_unpumped')
        ds = pumped
    return ds


[docs]def reflectivity(data, Iokey='FastADC5peaks', Irkey='FastADC3peaks', delaykey='PP800_DelayLine', binWidth=0.05, positionToDelay=True, origin=None, invert=False, pumpedOnly=False, alternateTrains=False, pumpOnEven=True, Ioweights=False, plot=True, plotErrors=True, units='mm' ): """ Computes the reflectivity R = 100*(Ir/Io[pumped] / Ir/Io[unpumped] - 1) as a function of delay. Delay can be a motor position in mm or an optical delay in ps, with possibility to convert from position to delay. The default scheme is alternating pulses pumped/unpumped/... in each train, also possible are alternating trains and pumped only. If fitting is enabled, attempts a double exponential (default) or step function fit. Parameters ---------- data: xarray Dataset Dataset containing the Io, Ir and delay data Iokey: str Name of the Io variable Irkey: str Name of the Ir variable delaykey: str Name of the delay variable (motor position in mm or optical delay in ps) binWidth: float width of bin in units of delay variable positionToDelay: bool If True, adds a time axis converted from position axis according to origin and invert parameters. Ignored if origin is None. origin: float Position of time overlap, shown as a vertical line. Used if positionToDelay is True to convert position to time axis. invert: bool Used if positionToDelay is True to convert position to time axis. pumpedOnly: bool Assumes that all trains and pulses are pumped. In this case, Delta R is defined as Ir/Io. alternateTrains: bool If True, assumes that trains alternate between pumped and unpumped data. pumpOnEven: bool Only used if alternateTrains=True. If True, even trains are pumped, if False, odd trains are pumped. Ioweights: bool If True, computes the ratio of the means instead of the mean of the ratios Irkey/Iokey. Useful when dealing with large intensity variations. plot: bool If True, plots the results. plotErrors: bool If True, plots the 95% confidence interval. Output ------ xarray Dataset containing the binned Delta R, standard deviation, standard error, counts and delays, and the fitting results if full is True. """ # select relevant variables from dataset variables = [Iokey, Irkey, delaykey] ds = data[variables] # prepare dataset according to pulse pattern ds = prepare_reflectivity_ds(ds, Iokey, Irkey, alternateTrains, pumpOnEven, pumpedOnly) if (len(ds[delaykey].dims) > 1) and (ds[delaykey].dims != ds[Iokey].dims): raise ValueError("Dimensions mismatch: delay variable has dims " f"{ds[delaykey].dims} but (It, Io) variables have " f"dims {ds[Iokey].dims}.") bin_delays = binWidth * np.round(ds[delaykey] / binWidth) ds[delaykey+'_binned'] = bin_delays counts = xr.ones_like(ds[Iokey]).groupby(bin_delays).sum(...) if Ioweights is False: ds['deltaR'] = ds[Irkey]/ds[Iokey] if pumpedOnly is False: ds['deltaR'] = 100*(ds['deltaR'] / (ds[Irkey+'_unpumped']/ds[Iokey+'_unpumped']) - 1) groupBy = ds.groupby(bin_delays) binned = groupBy.mean(...) std = groupBy.std(...) binned['deltaR_std'] = std['deltaR'] binned['deltaR_stderr'] = std['deltaR'] / np.sqrt(counts) binned['counts'] = counts.astype(int) else: xas_pumped = xas(ds, Iokey=Iokey, Itkey=Irkey, nrjkey=delaykey, fluorescence=True, bins=binWidth) if pumpedOnly: deltaR = xas_pumped['muA'] stddev = xas_pumped['sigmaA'] else: xas_unpumped = xas(ds, Iokey=Iokey+'_unpumped', Itkey=Irkey+'_unpumped', nrjkey=delaykey, fluorescence=True, bins=binWidth) deltaR = 100*(xas_pumped['muA'] / xas_unpumped['muA']) stddev = np.abs(deltaR) * np.sqrt( (xas_pumped['sigmaA']/xas_pumped['muA'])**2 + (xas_unpumped['sigmaA']/xas_unpumped['muA'])**2) deltaR -= 100 deltaR = xr.DataArray(deltaR, dims=delaykey, name='deltaR', coords={delaykey: xas_pumped['nrj']}) stddev = xr.DataArray(stddev, dims=delaykey, name='deltaR_std', coords={delaykey: xas_pumped['nrj']}) stderr = xr.DataArray(stddev / np.sqrt(xas_pumped['counts']), dims=delaykey, name='deltaR_stderr', coords={delaykey: xas_pumped['nrj']}) counts = xr.DataArray(xas_pumped['counts'], dims=delaykey, name='counts', coords={delaykey: xas_pumped['nrj']}) binned = xr.merge([deltaR, stddev, stderr, counts]) # copy attributes for key, val in data.attrs.items(): binned.attrs[key] = val binned = binned.rename({delaykey: 'delay'}) if plot: plot_reflectivity(binned, delaykey, positionToDelay, origin, invert, plotErrors, units) return binned
def plot_reflectivity(data, delaykey, positionToDelay, origin, invert, plotErrors, units): fig, ax = plt.subplots(figsize=(6, 4), constrained_layout=True) ax.plot(data['delay'], data['deltaR'], 'o-', color='C0') xlabel = delaykey + f' [{units}]' if plotErrors: ax.fill_between(data['delay'], data['deltaR'] - 1.96*data['deltaR_stderr'], data['deltaR'] + 1.96*data['deltaR_stderr'], color='C0', alpha=0.2) ax2 = ax.twinx() ax2.bar(data['delay'], data['counts'], width=0.80*(data['delay'][1]-data['delay'][0]), color='C1', alpha=0.2) ax2.set_ylabel('counts', color='C1', fontsize=13) ax2.set_ylim(0, data['counts'].max()*3) if origin is not None: ax.axvline(origin, color='grey', ls='--') if positionToDelay: ax3 = ax.twiny() xmin, xmax = ax.get_xlim() ax3.set_xlim(pTd(xmin, origin, invert), pTd(xmax, origin, invert),) ax3.set_xlabel('delay [ps]', fontsize=13) try: proposalNB = int(re.findall(r'p(\d{6})', data.attrs['proposal'])[0]) runNB = data.attrs['runNB'] ax.set_title(f'run {runNB} p{proposalNB}', fontsize=14) except Exception: if 'plot_title' in data.attrs: ax.set_title(data.attrs['plot_title']) ax.set_xlabel(xlabel, fontsize=13) ax.set_ylabel(r'$\Delta R$ [%]', color='C0', fontsize=13) ax.grid() return fig, ax