"""
DSSC-related sub-routines.
comment: contributions should comply with pep8 code structure guidelines.
"""
import logging
import os
import numpy as np
import xarray as xr
import pandas as pd
import extra_data as ed
from ..mnemonics_machinery import mnemonics_for_run
from .dssc_data import save_xarray
__all__ = [
'process_dssc_data'
]
log = logging.getLogger(__name__)
def create_empty_dataset(run_info, binners={}):
"""
Create empty (zero-valued) DataSet for a single DSSC module
to iteratively add data to.
Copyright (c) 2020, SCS-team
Parameters
----------
Returns
-------
"""
fpt = run_info['frames_per_train']
len_x = run_info['dims'][0]
len_y = run_info['dims'][1]
defaults = {"trainId": run_info["trainIds"],
"pulse": np.linspace(0, fpt-1, fpt, dtype=int),
"x": np.linspace(1, len_x, len_x, dtype=int),
"y": np.linspace(1, len_y, len_y, dtype=int)}
dims = []
coords = {}
shape = []
for key in defaults:
dims.append(key)
df = pd.DataFrame({'data': defaults[key]})
if key in binners:
df = pd.DataFrame({'data': binners[key].values})
grouped = df.groupby(['data'])
groups = list(grouped.groups.keys())
coords[key] = groups
shape.append(len(groups))
da_data = xr.DataArray(np.zeros(shape, dtype=np.float64),
coords=coords,
dims=dims)
ds = da_data.to_dataset(name="data")
ds['hist'] = xr.full_like(ds.data[:, :, 0, 0], fill_value=0)
log.debug("Prepared empty dataset for single dssc module")
return ds
def load_chunk_data(sel, sourcename, maxframes=None):
"""
Load selected DSSC data. The flattened multi-index (trains+pulses) is
unraveled before returning the data.
Copyright (c) 2019, Michael Schneider
Copyright (c) 2020, SCS-team
license: BSD 3-Clause License (see LICENSE_BSD for more info)
Parameters
----------
sel: extra_data.DataCollection
a DataCollection or a subset of a DataCollection obtained by its
select_trains() method
sourcename: str
Returns
-------
xarray.DataArray
"""
info = sel.detector_info(sourcename)
fpt = info['frames_per_train']
data = sel.get_array(sourcename, 'image.data',
extra_dims=['_empty_', 'x', 'y']
).squeeze()
tids = np.unique(data.trainId)
data = data.rename(dict(trainId='trainId_pulse'))
midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)],
names=('trainId', 'pulse'))
data = xr.DataArray(data,
dict(trainId_pulse=midx)
).unstack('trainId_pulse')
data = data.transpose('trainId', 'pulse', 'x', 'y')
log.debug(f"loaded and formatted chunk of dssc data")
return data.loc[{'pulse': np.s_[:maxframes]}]
def _load_chunk_xgm(sel, xgm_mnemonic, stride):
"""
Load selected xgm data.
Copyright (c) 2020, SCS-team
Parameters
----------
sel: extra_data.DataCollection
a DataCollection or a subset of a DataCollection obtained by its
select_trains() method
xgm_mnemonic: str
a valid mnemonic for an xgm source from the tb.mnemonic dictionary
stride: int
indicating which dssc frames should be normalized using the xgm data.
Returns
-------
xarray.DataArray
"""
mnemonics = mnemonics_for_run(sel)
d = sel.get_array(*mnemonics[xgm_mnemonic].values())
d = d.rename(new_name_or_name_dict={'XGMbunchId': 'pulse'})
frame_coords = np.linspace(0, (d.shape[1]-1)*stride, d.shape[1], dtype=int)
d = d.assign_coords({'pulse': frame_coords})
log.debug(f"loaded and formatted chunk of xgm data")
return d
[docs]def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners,
path='./',
pulsemask=None,
dark_image=None,
xgm_mnemonic='SCS_SA3',
xgm_normalization=False,
normevery=1
):
"""
Collects and reduces DSSC data for a single module.
Copyright (c) 2020, SCS-team
Parameters
----------
proposal : int
proposal number
run_nr : int
run number
module : int
DSSC module to process
chunksize : int
number of trains to load simultaneously
info: dictionary
dictionary containing keys 'dims', 'frames_per_train', 'total_frames',
'trainIds', 'number_of_trains'.
dssc_binners: dictionary
a dictionary containing binner objects created by the ToolBox member
function "create_binner()"
path : str
location in which the .h5 files, containing the binned data, should
be stored.
pulsemask : numpy.ndarray
array of booleans to be used to mask dssc data according to xgm data.
dark_image: xarray.DataArray
an xarray dataarray with matching coordinates with the loaded data. If
dark_image is not None it will be subtracted from each individual dssc
frame.
xgm_normalization: bool
true if the data should be divided by the corresponding xgm value.
xgm_mnemonic: str
Mnemonic of the xgm data to be used for normalization.
normevery: int
One out of normevery dssc frames will be normalized.
Returns
-------
module_data: xarray.Dataset
xarray datastructure containing data binned according to bins.
"""
# metadata definition
ntrains = info['number_of_trains']
chunks = np.arange(ntrains, step=chunksize)
sourcename_dssc = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
# open extra_data run objects
collection_DSSC = ed.open_run(proposal, run_nr,
include=f'*DSSC{module:02d}*')
collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*')
log.info(f"Processing dssc module {module}: start")
# create empty dataset for dssc data to be filled iteratively
module_data = create_empty_dataset(info, dssc_binners)
# load data chunk by chunk and merge result into prepared empty dataset
for chunk in chunks:
log.debug(f"Module {module}: "
f"load trains {chunk}:{chunk + chunksize}")
sel = collection_DSSC.select_trains(
ed.by_index[chunk:chunk + chunksize])
chunk_data = load_chunk_data(sel, sourcename_dssc)
chunk_hist = xr.full_like(chunk_data[:, :, 0, 0], fill_value=1)
# ---------------------------------------------------------------------
# optional blocks -> ToDo: see merge request !87
# ---------------------------------------------------------------------
# option 1: prefiltering -> xgm pulse masking
if pulsemask is not None:
log.debug(f'Module {module}: drop out-of-bounds frames')
# relatively slow. ToDo -> wrap using np.ufunc
chunk_data = chunk_data.where(pulsemask)
chunk_hist = chunk_hist.where(pulsemask)
# option 2: subtraction of dark image/s
if dark_image is not None:
log.debug(f'Module {module}: subtract dark')
chunk_data.values = chunk_data.values - dark_image.values
# slower: using xarray directly
# chunk_data = chunk_data - dark_image
# option 3: normalize dssc frames by their xgm values
if xgm_normalization:
log.debug(f'Module {module}: load and format xgm data')
sel_DA1 = collection_DA1.select_trains(
ed.by_index[chunk:chunk + chunksize])
chunk_xgm = _load_chunk_xgm(sel_DA1, xgm_mnemonic, normevery)
log.debug(f'Module {module}: normalize chunk_data using xgm')
# the following line uses numpys fast vectorized array operation
# there is overhead coming from rewriting the xarrayDataarray
chunk_data.values[:, 0::normevery, :, :] = \
np.divide(chunk_data[:, 0::normevery, :, :], chunk_xgm)
# slow code using xarray directly
# chunk_data = chunk_data / chunk_xgm
# ---------------------------------------------------------------------
# end optional blocks: xarray operations from here on.
# ---------------------------------------------------------------------
# data reduction -> apply binners
log.debug(f'Module {module}: apply binning to chunk_data')
chunk_data = chunk_data.to_dataset(name='data')
chunk_data['hist'] = chunk_hist
for b in dssc_binners:
chunk_data[b+"_binned"] = dssc_binners[b]
chunk_data = chunk_data.groupby(b+"_binned").sum(b)
chunk_data = chunk_data.rename(name_dict={b+"_binned": b})
# add chunk data to loaded data
log.debug(f'Module {module}: merge junk data')
for var in ['data', 'hist']:
module_data[var] = xr.concat([module_data[var],
chunk_data[var]],
dim='tmp').sum('tmp')
log.debug(f"Module {module}: "
f"processed trains {chunk}:{chunk + chunksize}")
log.debug(f'Module {module}: calculate mean')
module_data['data'] = module_data['data'] / module_data['hist']
module_data = module_data.transpose('trainId', 'pulse', 'x', 'y')
module_data.attrs['module'] = module
log.debug(f'saving module {module}')
if not os.path.isdir(path):
os.mkdir(path)
fname = f'run_{run_nr}_module{module}.h5'
save_xarray(path+fname, module_data, mode='a')
log.info(f"processing module {module}: done")