Source code for toolbox_scs.detectors.dssc_processing

"""
    DSSC-related sub-routines.

    comment: contributions should comply with pep8 code structure guidelines.
"""
import logging
import os

import numpy as np
import xarray as xr
import pandas as pd

import extra_data as ed

from ..mnemonics_machinery import mnemonics_for_run
from .dssc_data import save_xarray

__all__ = [
    'process_dssc_data'
]

log = logging.getLogger(__name__)


def create_empty_dataset(run_info, binners={}):
    """
    Create empty (zero-valued) DataSet for a single DSSC module
    to iteratively add data to.

    Copyright (c) 2020, SCS-team

    Parameters
    ----------

    Returns
    -------

    """
    fpt = run_info['frames_per_train']
    len_x = run_info['dims'][0]
    len_y = run_info['dims'][1]
    defaults = {"trainId": run_info["trainIds"],
                "pulse": np.linspace(0, fpt-1, fpt, dtype=int),
                "x": np.linspace(1, len_x, len_x, dtype=int),
                "y": np.linspace(1, len_y, len_y, dtype=int)}

    dims = []
    coords = {}
    shape = []

    for key in defaults:
        dims.append(key)
        df = pd.DataFrame({'data': defaults[key]})
        if key in binners:
            df = pd.DataFrame({'data': binners[key].values})
        grouped = df.groupby(['data'])
        groups = list(grouped.groups.keys())
        coords[key] = groups
        shape.append(len(groups))

    da_data = xr.DataArray(np.zeros(shape, dtype=np.float64),
                           coords=coords,
                           dims=dims)
    ds = da_data.to_dataset(name="data")
    ds['hist'] = xr.full_like(ds.data[:, :, 0, 0], fill_value=0)

    log.debug("Prepared empty dataset for single dssc module")
    return ds


def load_chunk_data(sel, sourcename, maxframes=None):
    """
    Load selected DSSC data. The flattened multi-index (trains+pulses) is
    unraveled before returning the data.

    Copyright (c) 2019, Michael Schneider
    Copyright (c) 2020, SCS-team
    license: BSD 3-Clause License (see LICENSE_BSD for more info)

    Parameters
    ----------
    sel: extra_data.DataCollection
        a DataCollection or a subset of a DataCollection obtained by its
        select_trains() method
    sourcename: str

    Returns
    -------
    xarray.DataArray
    """

    info = sel.detector_info(sourcename)
    fpt = info['frames_per_train']
    data = sel.get_array(sourcename, 'image.data',
                         extra_dims=['_empty_', 'x', 'y']
                         ).squeeze()

    tids = np.unique(data.trainId)
    data = data.rename(dict(trainId='trainId_pulse'))

    midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)],
                                      names=('trainId', 'pulse'))
    data = xr.DataArray(data,
                        dict(trainId_pulse=midx)
                        ).unstack('trainId_pulse')
    data = data.transpose('trainId', 'pulse', 'x', 'y')

    log.debug(f"loaded and formatted chunk of dssc data")
    return data.loc[{'pulse': np.s_[:maxframes]}]


def _load_chunk_xgm(sel, xgm_mnemonic, stride):
    """
    Load selected xgm data.

    Copyright (c) 2020, SCS-team

    Parameters
    ----------
    sel: extra_data.DataCollection
        a DataCollection or a subset of a DataCollection obtained by its
        select_trains() method
    xgm_mnemonic: str
        a valid mnemonic for an xgm source from the tb.mnemonic dictionary
    stride: int
        indicating which dssc frames should be normalized using the xgm data.

    Returns
    -------
    xarray.DataArray
    """
    mnemonics = mnemonics_for_run(sel)
    d = sel.get_array(*mnemonics[xgm_mnemonic].values())
    d = d.rename(new_name_or_name_dict={'XGMbunchId': 'pulse'})
    frame_coords = np.linspace(0, (d.shape[1]-1)*stride, d.shape[1], dtype=int)
    d = d.assign_coords({'pulse': frame_coords})
    log.debug(f"loaded and formatted chunk of xgm data")
    return d


[docs]def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, path='./', pulsemask=None, dark_image=None, xgm_mnemonic='SCS_SA3', xgm_normalization=False, normevery=1 ): """ Collects and reduces DSSC data for a single module. Copyright (c) 2020, SCS-team Parameters ---------- proposal : int proposal number run_nr : int run number module : int DSSC module to process chunksize : int number of trains to load simultaneously info: dictionary dictionary containing keys 'dims', 'frames_per_train', 'total_frames', 'trainIds', 'number_of_trains'. dssc_binners: dictionary a dictionary containing binner objects created by the ToolBox member function "create_binner()" path : str location in which the .h5 files, containing the binned data, should be stored. pulsemask : numpy.ndarray array of booleans to be used to mask dssc data according to xgm data. dark_image: xarray.DataArray an xarray dataarray with matching coordinates with the loaded data. If dark_image is not None it will be subtracted from each individual dssc frame. xgm_normalization: bool true if the data should be divided by the corresponding xgm value. xgm_mnemonic: str Mnemonic of the xgm data to be used for normalization. normevery: int One out of normevery dssc frames will be normalized. Returns ------- module_data: xarray.Dataset xarray datastructure containing data binned according to bins. """ # metadata definition ntrains = info['number_of_trains'] chunks = np.arange(ntrains, step=chunksize) sourcename_dssc = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' # open extra_data run objects collection_DSSC = ed.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*') log.info(f"Processing dssc module {module}: start") # create empty dataset for dssc data to be filled iteratively module_data = create_empty_dataset(info, dssc_binners) # load data chunk by chunk and merge result into prepared empty dataset for chunk in chunks: log.debug(f"Module {module}: " f"load trains {chunk}:{chunk + chunksize}") sel = collection_DSSC.select_trains( ed.by_index[chunk:chunk + chunksize]) chunk_data = load_chunk_data(sel, sourcename_dssc) chunk_hist = xr.full_like(chunk_data[:, :, 0, 0], fill_value=1) # --------------------------------------------------------------------- # optional blocks -> ToDo: see merge request !87 # --------------------------------------------------------------------- # option 1: prefiltering -> xgm pulse masking if pulsemask is not None: log.debug(f'Module {module}: drop out-of-bounds frames') # relatively slow. ToDo -> wrap using np.ufunc chunk_data = chunk_data.where(pulsemask) chunk_hist = chunk_hist.where(pulsemask) # option 2: subtraction of dark image/s if dark_image is not None: log.debug(f'Module {module}: subtract dark') chunk_data.values = chunk_data.values - dark_image.values # slower: using xarray directly # chunk_data = chunk_data - dark_image # option 3: normalize dssc frames by their xgm values if xgm_normalization: log.debug(f'Module {module}: load and format xgm data') sel_DA1 = collection_DA1.select_trains( ed.by_index[chunk:chunk + chunksize]) chunk_xgm = _load_chunk_xgm(sel_DA1, xgm_mnemonic, normevery) log.debug(f'Module {module}: normalize chunk_data using xgm') # the following line uses numpys fast vectorized array operation # there is overhead coming from rewriting the xarrayDataarray chunk_data.values[:, 0::normevery, :, :] = \ np.divide(chunk_data[:, 0::normevery, :, :], chunk_xgm) # slow code using xarray directly # chunk_data = chunk_data / chunk_xgm # --------------------------------------------------------------------- # end optional blocks: xarray operations from here on. # --------------------------------------------------------------------- # data reduction -> apply binners log.debug(f'Module {module}: apply binning to chunk_data') chunk_data = chunk_data.to_dataset(name='data') chunk_data['hist'] = chunk_hist for b in dssc_binners: chunk_data[b+"_binned"] = dssc_binners[b] chunk_data = chunk_data.groupby(b+"_binned").sum(b) chunk_data = chunk_data.rename(name_dict={b+"_binned": b}) # add chunk data to loaded data log.debug(f'Module {module}: merge junk data') for var in ['data', 'hist']: module_data[var] = xr.concat([module_data[var], chunk_data[var]], dim='tmp').sum('tmp') log.debug(f"Module {module}: " f"processed trains {chunk}:{chunk + chunksize}") log.debug(f'Module {module}: calculate mean') module_data['data'] = module_data['data'] / module_data['hist'] module_data = module_data.transpose('trainId', 'pulse', 'x', 'y') module_data.attrs['module'] = module log.debug(f'saving module {module}') if not os.path.isdir(path): os.mkdir(path) fname = f'run_{run_nr}_module{module}.h5' save_xarray(path+fname, module_data, mode='a') log.info(f"processing module {module}: done")