Source code for toolbox_scs.detectors.dssc

"""
    DSSC-detector class module

    The dssc detector class. It represents a namespace for frequent evaluation
    while implicitly applying/requiring certain structure/naming conventions to
    its objects.

    comments:
        - contributions should comply with pep8 code structure guidelines.
        - Plot routines don't fit into objects since they are rather fluent.
          They have been outsourced to dssc_plot.py. They can now be accessed
          as toolbox_scs member functions.
"""
import os
import logging
import joblib

import numpy as np
import xarray as xr

import toolbox_scs as tb
from ..util.exceptions import ToolBoxValueError, ToolBoxFileError
from .dssc_data import (
    save_xarray, load_xarray, save_attributes_h5,
    search_files, get_data_formatted)
from .dssc_misc import (
        load_dssc_info, get_xgm_formatted, get_tim_formatted)
from .dssc_processing import (
        process_dssc_data, create_empty_dataset)

__all__ = [
    "DSSCBinner",
    "DSSCFormatter"]

log = logging.getLogger(__name__)


[docs]class DSSCBinner: def __init__(self, proposal_nr, run_nr, binners={}, xgm_name='SCS_SA3', tim_names=['MCP1apd', 'MCP2apd', 'MCP3apd'], dssc_coords_stride=2, ): """ A dssc binner object. Loads and bins the dssc data according to the bins specified in 'binners'. The data can be reduced further through masking using XGM or TIM data. Parameters ---------- proposal_nr: int, str proposal number containing run folders run_nr: int, str run number binners: dictionary dictionary containing binners constructed using the 'create_dssc_bins' toolbox_scs.detectors-method. xgm_name: str a valid mnemonic key of the XGM data to be used to mask the dssc frames. Since the xgm is used in several methods its name can be set here globally. tim_names: list of strings a list of valid mnemonic keys for an mcp in the tim. Once the corresponding data is loaded the different sources will be averaged. dssc_coords_stride: int, list defines which dssc frames should be normalized using data from the xgm. The parameter may be an integer (stride parameter) or a list, that assigns each pulse to its corresponding dssc frame number. Returns ------- DSSCbinner: object Example ------- 1.) quick -> generic bins, no xgm, >>> import toolbox_scs as tb >>> run235 = tb.DSSCBinner(proposal_nb=2212, run_nb=235) 2.) detailed -> docs """ # --------------------------------------------------------------------- # object (run) properties # --------------------------------------------------------------------- self.proposal = proposal_nr self.runnr = run_nr self.info = load_dssc_info(proposal_nr, run_nr) self.run, _ = tb.load(proposal_nr, run_nr) self.binners = {} for b in binners: self.add_binner(b, binners[b]) self.xgm_name = xgm_name self.tim_names = tim_names self.dssc_coords_stride = dssc_coords_stride self.xgm = None self.tim = None self.pulsemask = None log.debug("Constructed DSSC object")
[docs] def __del__(self): pass
[docs] def add_binner(self, name, binner): """ Add additional binner to internal dictionary Parameters ---------- name: str name of binner to be created binner: xarray.DataArray An array that represents a map how the respective coordinate should be binned. Raises ------ ToolBoxValueError: Exception Raises exception in case the name does not correspond to a valid binner name. To be generalized. """ if name in ['trainId', 'pulse', 'x', 'y']: self.binners[name] = binner else: msg = "Invalid binner name" log.info(msg+", no binner created") raise ToolBoxValueError(msg, name)
[docs] def load_xgm(self): """ load xgm data and construct coordinate array according to corresponding dssc frame number. """ self.xgm = get_xgm_formatted(self.run, self.xgm_name, self.dssc_coords_stride)
[docs] def load_tim(self): """ load tim data and construct coordinate array according to corresponding dssc frame number. """ self.tim = get_tim_formatted(self.run, self.tim_names, self.dssc_coords_stride)
[docs] def create_pulsemask(self, use_data='xgm', threshold=(0, np.inf)): """ creates a mask for dssc frames according to measured xgm intensity. Once such a mask has been constructed, it will be used in the data reduction process to drop out-of-bounds pulses. """ fpt = self.info['frames_per_train'] n_trains = self.info['number_of_trains'] trainIds = self.info['trainIds'] data = np.ones([n_trains, fpt], dtype=bool) self.pulsemask = xr.DataArray(data, dims=['trainId', 'pulse'], coords={'trainId': trainIds, 'pulse': range(fpt)}) if use_data == 'xgm': if self.xgm is None: self.load_xgm() valid = (self.xgm > threshold[0]) * \ (self.xgm < threshold[1]) if use_data == 'tim': if self.tim is None: self.load_tim() valid = (self.tim > threshold[0]) * \ (self.tim < threshold[1]) self.pulsemask = \ (valid.combine_first(self.pulsemask).astype(bool))[:, 0:fpt] log.info(f'created pulse mask used during processing')
[docs] def get_info(self): """ Returns the expected shape of the binned dataset, in case binners have been defined. """ if any(self.binners): empty = create_empty_dataset(self.info, self.binners) return(empty.dims) else: log.info("no binner defined yet.") pass
[docs] def _bin_metadata(self, data): if self.pulsemask is not None: data = data.where(self.pulsemask) for b in self.binners: if b in ['trainId', 'pulse']: data[b+"_binned"] = self.binners[b] data = data.groupby(b+"_binned").mean(b) data = data.rename(name_dict={b+"_binned": b}) log.debug('binned metadata data according to dssc binners.') return data.transpose('trainId', 'pulse')
[docs] def get_xgm_binned(self): """ Bin the xgm data according to the binners of the dssc data. The result can eventually be merged into the final dataset by the DSSCFormatter. Returns ------- xgm_data: xarray.DataSet xarray dataset containing the binned xgm data """ if self.xgm is not None: xgm_data = self.xgm.to_dataset(name='xgm') xgm_binned = self._bin_metadata(xgm_data) log.info('binned xgm data according to dssc binners.') return xgm_binned else: log.warning("no xgm data. Use load_xgm() to load the xgm data.") pass
[docs] def get_tim_binned(self): """ Bin the tim data according to the binners of the dssc data. The result can eventually be merged into the final dataset by the DSSCFormatter. Returns ------- tim_data: xarray.DataSet xarray dataset containing the binned tim data """ if self.tim is not None: tim_data = self.tim.to_dataset(name='tim') tim_binned = self._bin_metadata(tim_data) log.info('binned tim data according to dssc binners.') return tim_binned else: log.warning("no data. Use load_tim() to load the tim data.") pass
# ------------------------------------------------------------------------- # Data processing # -------------------------------------------------------------------------
[docs] def process_data(self, modules=[], filepath='./', chunksize=512, backend='loky', n_jobs=None, dark_image=None, xgm_normalization=False, normevery=1 ): """ Load and bin dssc data according to self.bins. No data is returned by this method. The condensed data is written to file by the worker processes directly. Parameters ---------- modules: list of ints a list containing the module numbers that should be processed. If empty, all modules are processed. filepath: str the path where the files containing the reduced data should be stored. chunksize: int The number of trains that should be read in one iterative step. backend: str joblib multiprocessing backend to be used. At the moment it can be any of joblibs standard backends: 'loky' (default), 'multiprocessing', 'threading'. Anything else than the default is experimental and not appropriately implemented in the dbdet member function 'bin_data'. n_jobs: int inversely proportional of the number of cpu's available for one job. Tasks within one job can grab a maximum of n_CPU_tot/n_jobs of cpu's. Note that when using the default backend there is no need to adjust this parameter with the current implementation. dark_image: xarray.DataArray DataArray with dimensions compatible with the loaded dssc data. If given, it will be subtracted from the dssc data before the binning. The dark image needs to be of dimension module, trainId, pulse, x and y. xgm_normalization: boolean if true, the dssc data is normalized by the xgm data before the binning. normevery: int integer indicating which out of normevery frame will be normalized. """ log.info("Bin data according to binners") log.info(f'Process {chunksize} trains per chunk') mod_list = modules if len(mod_list) == 0: mod_list = [i for i in range(16)] log.info(f'Process modules {mod_list}') njobs = n_jobs if njobs is None: njobs = len(mod_list) module_jobs = [] for m in mod_list: dark = dark_image if dark_image is not None: dark = dark_image.sel(module=m) module_jobs.append(dict( proposal=self.proposal, run_nr=self.runnr, module=m, chunksize=chunksize, path=filepath, info=self.info, dssc_binners=self.binners, pulsemask=self.pulsemask, dark_image=dark, xgm_normalization=xgm_normalization, xgm_mnemonic=self.xgm_name, normevery=normevery, )) log.info(f'using parallelization backend {backend}') joblib.Parallel(n_jobs=njobs, backend=backend)\ (joblib.delayed(process_dssc_data)(**module_jobs[i]) for i in range(len(mod_list))) log.info(f'Binning done')
[docs]class DSSCFormatter: def __init__(self, filepath): """ Class that handles formatting related aspects, before handing the data overt for analysis. Parameters ---------- filepath: str location of processed files. Raises ------ ToolBoxFileError: Exception Trows an error in case the the given path does not exist. """ self._filenames = [] self._filename = '' if os.path.exists(filepath): try: self._filenames = search_files(filepath) except ToolBoxFileError: log.info("path did not contain any files") pass else: log.info("path did not exist") self.data = None self.data_xarray = {} self.attributes = {}
[docs] def combine_files(self, filenames=[]): """ Read the files given in filenames, and store the data in the class variable 'data'. If no filenames are given, it tries to read the files stored in the class-internal variable '_filenames'. Parameters ---------- filenames: list list of strings containing the names of the files to be combined. """ if any(filenames) is True: self._filenames = filenames if self._filenames is not None: self.data = get_data_formatted(self._filenames) else: log.info("No matching data found.")
[docs] def add_dataArray(self, groups=[]): """ Reads addional xarray-data from the first file given in the list of filenames. This assumes that all the files in the folder contain the same additional data. To be generalized. Parameters ---------- groups: list list of strings with the names of the groups in the h5 file, containing additional xarray data. """ if any(self._filenames) is True: for group in groups: self.data_xarray[group] = load_xarray(self._filenames[0], group=group, form='array') else: log.info("No files found in specified folder.")
[docs] def add_attributes(self, attributes={}): """ Add additional information, such as run-type, as attributes to the formatted .h5 file. Parameters ---------- attributes: dictionary a dictionary, containing information or data of any kind, that will be added to the formatted .h5 file as attributes. """ for key in attributes.keys(): self.attributes[key] = attributes[key]
[docs] def save_formatted_data(self, filename): """ Create a .h5 file containing the main dataset in the group called 'data'. Additional groups will be created for the content of the variable 'data_array'. Metadata about the file is added in the form of attributes. Parameters ---------- filename: str the name of the file to be created """ save_xarray(filename, self.data, group='data', mode='w') for arr in self.data_xarray.keys(): save_xarray(filename, self.data_xarray[arr], group=arr, mode='a') save_attributes_h5(filename, self.attributes)