Source code for cal_tools.agipdlib

import os
import posixpath
import zlib
from datetime import datetime
from multiprocessing import Manager
from multiprocessing.pool import ThreadPool
from typing import Any, Dict, List, Optional, Tuple

import h5py
import numpy as np
import sharedmem
from dateutil import parser
from extra_data import DataCollection, H5File, by_id, components
from iCalibrationDB import Conditions, Constants

from cal_tools import agipdalgs as calgs
from cal_tools.agipdutils import (
    assemble_constant_dict,
    baseline_correct_via_noise,
    baseline_correct_via_stripe,
    correct_baseline_via_hist,
    correct_baseline_via_hist_asic,
    make_noisy_adc_mask,
    match_asic_borders,
    melt_snowy_pixels,
    cast_array_inplace
)
from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution
from cal_tools.h5_copy_except import h5_copy_except_paths
from cal_tools.tools import get_from_db


class AgipdCtrl:
    def __init__(
        self,
        run_dc: DataCollection,
        image_src: str,
        ctrl_src: str,
        raise_error: bool = True,
    ):
        """ Initialize AgipdCondition class to read
        all required AGIPD parameters.

        :param run_dc: Run data collection with expected sources
        to read needed parameters.
        :param image_src: H5 source for image data.
        :param ctrl_src: H5 source for control (slow) data.
        :param raise_error: Boolean to raise errors for missing
        sources and keys.
        """
        self.run_dc = run_dc
        self.image_src = image_src
        self.ctrl_src = ctrl_src
        self.raise_error = raise_error

    def get_num_cells(self) -> Optional[int]:
        """Read number of memory cells from fast data.

        :return mem_cells: Number of memory cells
        return None, if no data available.
        """
        cells = np.squeeze(
            self.run_dc[
                self.image_src, "image.cellId"].drop_empty_trains().ndarray()
        )
        if cells.shape[0] == 0:
            return None
        maxcell = np.max(cells)
        options = [4, 32, 64, 76, 128, 176, 202, 250, 352]
        dists = [abs(o - maxcell) for o in options]
        return options[np.argmin(dists)]

    def _get_acq_rate_ctrl(self) -> Optional[float]:
        """Get acquisition (repetition) rate from CONTROL source."""
        # Attempt to look for acquisition rate in slow data
        rep_rate_src = (
            self.ctrl_src, "bunchStructure.repetitionRate.value")
        if (
            rep_rate_src[0] in self.run_dc.all_sources and
            rep_rate_src[1] in self.run_dc.keys_for_source(rep_rate_src[0])
        ):
            # It is desired to loose precision here because the usage is
            # about bucketing the rate for managing meta-data.
            return round(float(self.run_dc[rep_rate_src].as_single_value()), 1)

    def _get_acq_rate_instr(self) -> Optional[float]:
        """Get acquisition (repetition rate) from INSTRUMENT source."""

    def _get_acq_rate_instr(self) -> Optional[float]:
        """Get acquisition (repetition rate) from INSTRUMENT source."""

        train_pulses = np.squeeze(
            self.run_dc[
                self.image_src,
                "image.pulseId"].drop_empty_trains().train_from_index(0)[1]
        )

        # Compute acquisition rate from fast data
        diff = train_pulses[1] - train_pulses[0]
        options = {8: 0.5, 4: 1.1, 2: 2.2, 1: 4.5}
        return options.get(diff, None)

    def get_acq_rate(self) -> Optional[float]:
        """Read the acquisition rate for the selected detector module.

        The value is read from CONTROL source e.g.`CONTROL/../MDL/FPGA_COMP`,
        If key is not available, the rate is calculated from
        two consecutive pulses of the same trainId.

        :return acq_rate: the acquisition rate.
                          return None, if not available.
        """
        acq_rate = self._get_acq_rate_ctrl()

        if acq_rate is not None:
            return acq_rate
        # For AGIPD500K this function would produce wrong value (always 4.5)
        # before W10/2022.
        # TODO: Confirm leaving this as it is.
        return self._get_acq_rate_instr()

    def _get_gain_setting_ctrl(self) -> Optional[int]:
        """Read gain_settings from CONTROL source and gain key."""
        return int(self.run_dc[self.ctrl_src, "gain"].as_single_value())

    def _get_gain_setting_ctrl_old(self) -> Optional[int]:
        """Read gain_settings from setupr and patterTypeIndex
        from old RAW data that is missing `gain.value`.

        If `gain.value` isn't available in MDL source,
        the setting is calculated from `setupr` and `patternTypeIndex`

        gain-setting 1: setupr@dark=8, setupr@slopespc=40
        gain-setting 0: setupr@dark=0, setupr@slopespc=32

        patternTypeIndex 1: High-gain
        patternTypeIndex 2: Medium-gain
        patternTypeIndex 3: Low-gain
        patternTypeIndex 4: SlopesPC
        Returns:
            int: gain_setting value.
        """
        setupr = self.run_dc[self.ctrl_src, "setupr"].as_single_value()
        pattern_type_idx = self.run_dc[
            self.ctrl_src, "patternTypeIndex"].as_single_value()

        if (setupr == 0 and pattern_type_idx < 4) or (
                setupr == 32 and pattern_type_idx == 4):
            return 0
        elif (setupr == 8 and pattern_type_idx < 4) or (
                setupr == 40 and pattern_type_idx == 4):
            return 1
        else:
            # TODO: Confirm that this can be removed.
            if self.raise_error:
                raise ValueError(
                    "Could not derive gain setting from"
                    " setupr and patternTypeIndex"
                )
            return

    def get_gain_setting(
        self,
        creation_time: Optional[datetime] = None,
    ) -> Optional[int]:
        """Read Gain setting from CONTROL sources.
        if key `gain.value` is not available, calculate gain_setting from
        setupr and patterTypeIndex. If it failed raise ValueError.

        :param creation_time: datetime object for the data creation time.
        :return: gain setting.
                 return 0, if not available.
        """
        # TODO: remove after fixing get_possible_conditions
        if (
            creation_time and
            creation_time.replace(tzinfo=None) < parser.parse('2020-01-31')
        ):
            print("Set gain-setting to None for runs taken before 2020-01-31")
            return

        if "gain.value" in self.run_dc.keys_for_source(self.ctrl_src):
            return self._get_gain_setting_ctrl()

        gain_setting = self._get_gain_setting_ctrl_old()

        if gain_setting is not None:
            return gain_setting
        else:
            # TODO: confirm that this can be removed.
            print(
                "ERROR: gain_setting is not available "
                f"at source {self.ctrl_src}.\nSet gain_setting to 0.")
            return 0

    def get_gain_mode(self) -> int:
        """Returns the gain mode (adaptive or fixed) from slow data."""

        if (
            self.ctrl_src in self.run_dc.all_sources and
            "gainModeIndex.value" in self.run_dc.keys_for_source(
                self.ctrl_src)
        ):
            return AgipdGainMode(int(self.run_dc[
                self.ctrl_src, "gainModeIndex"].as_single_value()))

        return AgipdGainMode.ADAPTIVE_GAIN

    def get_bias_voltage(
        self,
        karabo_id_control: str,
        module: Optional[int] = 0
    ) -> int:
        """Read the voltage information from the RUN source of module 0.

        Different modules may operate at different voltages.
        In practice, they all operate at the same voltage.
        As such, it is okay to read a single module's value.

        If the FPGA/PSC RUN source is not available, 300 will be returned.
        300 was the default bias_voltage value for
        MID_DET_AGIPD1M-1 and SPB_DET_AGIPD1M-1.

        :param karabo_id_control: The karabo deviceId for the CONTROL device.
        :param module: defaults to module 0
        :return: bias voltage
        """
        # TODO: Add a breaking fix by passing the source and key through
        # get_bias_voltage arguments.
        if "AGIPD1M" in karabo_id_control:
            voltage_src = (
                f"{karabo_id_control[:-1]}/PSC/HV",
                f"channels.U{module}.measurementSenseVoltage.value")
            # TODO: Validate if removing this and depend on adding voltage value
            # from the Notebook's first cell.
            default_voltage = 300
        else:  # AGIPD500K
            voltage_src = (
                f"{karabo_id_control}/FPGA/M_{module}",
                "highVoltage.actual.value")
            default_voltage = None

        if (
            voltage_src[0] in self.run_dc.all_sources and
            voltage_src[1] in self.run_dc.keys_for_source(voltage_src[0])
        ):
            # Use RUN source for reading the bias voltage value.
            # As HED_DET_AGIPD500K2G has a hardware issue that leads
            # to storing arbitrary voltage values in the CONTROL source
            # array. e.g. /gpfs/exfel/exp/HED/202230/p900248/raw
            return int(self.run_dc.get_run_value(*voltage_src))
        else:
            # TODO: Validate if removing this and
            # and using NB value for old RAW data.
            error = ("ERROR: Unable to read bias_voltage from"
                     f" {voltage_src[0]}/{voltage_src[1].replace('.','/')}.")

            if default_voltage:
                print(f"{error} Returning {default_voltage} "
                      "as default bias voltage value.")
            else:
                raise ValueError(error)
            return default_voltage

    def get_integration_time(self) -> int:
        """Read integration time from the FPGA device.

        The integration time is specified as an integer number of clock
        cycles each spanning ~9ns. The default (and legacy) value is 12.

        :return: integration time
        """
        if (
            self.ctrl_src in self.run_dc.all_sources and
            'integrationTime.value' in self.run_dc.keys_for_source(
                self.ctrl_src)
        ):
            return int(self.run_dc[
                self.ctrl_src, 'integrationTime'].as_single_value())

        return 12


class CellSelection:
    """Selection of detector memory cells (abstract class)"""
    row_size = 32
    ncell_max = 352
    CM_NONE = 0
    CM_PRESEL = 1
    CM_FINSEL = 2

    def get_cells_on_trains(
        self, trains_sel: List[int], cm: int = 0
    ) -> np.array:
        """Returns mask of cells selected for processing

        :param train_sel: list of a train ids selected for processing
        :param cm: flag indicates the final selection or interim selection
            for common-mode correction
        """
        raise NotImplementedError

    def msg(self):
        """Return log message on initialization"""
        raise NotImplementedError

    @staticmethod
    def _sel_for_cm(flag, flag_cm, cm):
        if cm == CellSelection.CM_NONE:
            return flag
        elif cm == CellSelection.CM_PRESEL:
            return flag_cm
        elif cm == CellSelection.CM_FINSEL:
            return flag[flag_cm]
        else:
            raise ValueError("param 'cm' takes only 0,1,2")


[docs]class AgipdCorrections: def __init__( self, max_cells: int, cell_sel: CellSelection, h5_data_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", h5_index_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", corr_bools: Optional[dict] = None, gain_mode: AgipdGainMode = AgipdGainMode.ADAPTIVE_GAIN, comp_threads: int = 1, train_ids: Optional[np.ndarray] = None ): """ Initialize an AgipdCorrections Class :param max_cells: maximum number of memory cells to handle, e.g. if calibration constants only exist for a subset of cells :param cell_sel: the CellSelection indicates cells selected for calibration :param h5_data_path: path in HDF5 file which is prefixed to the image/data section :param h5_index_path: path in HDF5 file which is prefixed to the index section :param corr_bools: A dict with all of the correction booleans requested or available :param comp_threads: Number of threads to use for compressing gain/mask :param train_ids: train IDs to process, all if omitted. The following example shows a typical use case: .. code-block:: python cell_sel = CellRange(max_pulses, max_cells) agipd_corr = AgipdCorrections(max_cells, cell_sel, h5_data_path=h5path, h5_index_path=h5path_idx, corr_bools=corr_bools) agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128)) metadata = cal_tools.tools.CalibrationMetadata(out_folder) const_yaml = metadata["retrieved-constants"] for mod in modules: agipd_corr.initialize_from_yaml(karabo_da, const_yaml, mod) data_shape = (n_images_max, 512, 128) agipd_corr.allocate_images(data_shape, n_cores_files) i_proc=0 agipd_corr.read_file(i_proc, file_name, apply_sel_pulses) # Correct first 1000 images agipd_corr.correct_agipd(i_proc, first=0, last=1000) agipd_corr.write_file(i_proc, file_name, ofile_name) """ if corr_bools is None: corr_bools = {} # Data description self.h5_data_path = h5_data_path self.h5_index_path = h5_index_path self.max_cells = max_cells self.gain_mode = gain_mode self.comp_threads = comp_threads self.train_ids = np.array(train_ids) if train_ids is not None else None self.cell_sel = cell_sel # Correction parameters self.baseline_corr_noise_threshold = -1000 self.snow_resolution = SnowResolution.INTERPOLATE self.cm_dark_fraction = 0.66 self.cm_dark_min = -25. self.cm_dark_max = 25. self.cm_n_itr = 4 self.mg_hard_threshold = 100 self.hg_hard_threshold = 100 self.noisy_adc_threshold = 0.25 self.ff_gain = 1 self.photon_energy = 9.2 # Output parameters self.compress_fields = ['gain', 'mask'] self.recast_image_fields = {} # Shared variables for data and constants self.shared_dict = [] self.offset = {} self.noise = {} self.thresholds = {} self.frac_high_med = {} self.md_additional_offset = {} self.rel_gain = {} self.mask = {} self.xray_cor = {} if corr_bools.get('round_photons'): # Variables related to histogramming. self.shared_hist_preround = None self.shared_hist_postround = None self.hist_bins_preround = np.linspace(-5.0, 60.0, 200) self.hist_bins_postround = np.arange(-5.5, 60.1) self.hist_lock = Manager().Lock() # check if given corr_bools are correct tot_corr_bools = ['only_offset', 'adjust_mg_baseline', 'rel_gain', 'xray_corr', 'blc_noise', 'blc_hmatch', 'blc_stripes', 'blc_set_min', 'match_asics', 'corr_asic_diag', 'zero_nans', 'zero_orange', 'force_hg_if_below', 'force_mg_if_below', 'mask_noisy_adc', 'melt_snow', 'common_mode', 'mask_zero_std', 'low_medium_gap', 'round_photons'] if set(corr_bools).issubset(tot_corr_bools): self.corr_bools = corr_bools else: bools = list(set(corr_bools) - set(tot_corr_bools)) raise Exception('Correction Booleans: ' f'{bools} are not available!') # Flags allowing for pulse capacitor constant retrieval. self.pc_bools = [self.corr_bools.get("rel_gain"), self.corr_bools.get("adjust_mg_baseline"), self.corr_bools.get('blc_noise'), self.corr_bools.get('blc_hmatch'), self.corr_bools.get('blc_stripes'), self.corr_bools.get('melt_snow')] self.blc_bools = [self.corr_bools.get('blc_noise'), self.corr_bools.get('blc_hmatch'), self.corr_bools.get('blc_stripes')]
[docs] def read_file(self, i_proc: int, file_name: str, apply_sel_pulses: Optional[bool] = True ) -> int: """Read file with raw data to shared memory :param file_name: Name of input file including path. :param i_proc: Index of shared memory array. :param apply_sel_pulses: apply selected pulses before all corrections. :return: - n_img: The number of images to correct. """ module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) agipd_base = self.h5_data_path.format(module_idx) data_dict = self.shared_dict[i_proc] data_dict['moduleIdx'][0] = module_idx h5_dc = H5File(file_name) # Exclude trains without data. im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) valid_train_ids = self.get_valid_image_idx( im_dc[agipd_base, "image.trainId"]) if not valid_train_ids: # If there's not a single valid train, exit early. print(f"WARNING: No valid trains for {im_dc.files} to process.") data_dict['nImg'][0] = 0 return 0 # store valid trains in shared memory # valid_train_ids = train_ids[valid] n_valid_trains = len(valid_train_ids) data_dict["n_valid_trains"][0] = n_valid_trains data_dict["valid_trains"][:n_valid_trains] = valid_train_ids # get cell selection for the images in this file cm = (self.cell_sel.CM_NONE if apply_sel_pulses else self.cell_sel.CM_PRESEL) img_selected = self.cell_sel.get_cells_on_trains( valid_train_ids, cm=cm) data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) # Exclude non_valid trains from the selected data collection. im_dc = im_dc.select_trains(by_id(valid_train_ids)) if "AGIPD500K" in agipd_base: agipd_comp = components.AGIPD500K(im_dc) else: agipd_comp = components.AGIPD1M(im_dc) kw = { "unstack_pulses": False, } # [n_modules, n_imgs, 2, x, y] raw_data = agipd_comp.get_array("image.data", **kw)[0] frm_ix = np.flatnonzero(img_selected) n_img = frm_ix.size data_dict['nImg'][0] = n_img data_dict['data'][:n_img] = raw_data[frm_ix, 0] data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1] data_dict['cellId'][:n_img] = agipd_comp.get_array( "image.cellId", **kw)[0, frm_ix] data_dict['pulseId'][:n_img] = agipd_comp.get_array( "image.pulseId", **kw)[0, frm_ix] data_dict['trainId'][:n_img] = agipd_comp.get_array( "image.trainId", **kw)[0, frm_ix] return n_img
[docs] def write_file(self, i_proc, file_name, ofile_name): """ Create output file and write corrected data to it :param file_name: Name of input file including path :param ofile_name: Name of output file including path :param i_proc: Index of shared memory array """ module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) agipd_base = f'INSTRUMENT/{self.h5_data_path}/'.format(module_idx) idx_base = self.h5_index_path.format(module_idx) data_path = f'{agipd_base}/image' # Obtain a shallow copy of the pointer map to allow for local # changes in this method. data_dict = self.shared_dict[i_proc].copy() image_fields = [ 'trainId', 'pulseId', 'cellId', 'data', 'gain', 'mask', 'blShift', ] n_img = data_dict['nImg'][0] if n_img == 0: return trains = data_dict['trainId'][:n_img] # Re-cast fields in-place, i.e. using the same memory region. for field, dtype in self.recast_image_fields.items(): data_dict[field] = cast_array_inplace(data_dict[field], dtype) with h5py.File(ofile_name, "w") as outfile: # Copy any other data from the input file. # This includes indexes, so it's important that the corrected data # we write is aligned with the raw data. with h5py.File(file_name, "r") as infile: self.copy_and_sanitize_non_cal_data( infile, outfile, agipd_base, idx_base, trains ) # All corrected data goes in a /INSTRUMENT/.../image group image_grp = outfile[data_path] # Set up all the datasets before filling them. This puts the # metadata about the datasets together at the start of the file, # so it's efficient to examine the file structure. for field in image_fields: arr = data_dict[field][:n_img] if field in self.compress_fields: # gain/mask compressed with gzip level 1, but not # checksummed as we would have to implement this. kw = dict( compression='gzip', compression_opts=1, shuffle=True ) else: # Uncompressed data can easily be checksummed by HDF5's # filter pipeline. This should be cheap to compute. kw = {'fletcher32': True} if arr.ndim > 1: kw['chunks'] = (1,) + arr.shape[1:] # 1 chunk = 1 image image_grp.create_dataset( field, shape=arr.shape, dtype=arr.dtype, **kw ) # Write the corrected data for field in image_fields: if field in self.compress_fields: self._write_compressed_frames( image_grp[field], data_dict[field][:n_img], ) else: image_grp[field][:] = data_dict[field][:n_img]
def _write_compressed_frames(self, dataset, arr): """Compress gain/mask frames in multiple threads, and save their data This is significantly faster than letting HDF5 do the compression in a single thread. """ def _compress_frame(i): # Equivalent to the HDF5 'shuffle' filter: transpose bytes for # better compression. shuffled = np.ascontiguousarray( arr[i].view(np.uint8).reshape((-1, arr.itemsize)).transpose() ) return i, zlib.compress(shuffled, level=1) with ThreadPool(self.comp_threads) as pool: for i, compressed in pool.imap(_compress_frame, range(len(arr))): # Each frame is 1 complete chunk dataset.id.write_direct_chunk((i, 0, 0), compressed)
[docs] def cm_correction(self, i_proc, asic): """ Perform common-mode correction of data in shared memory In a given code a complete file is loaded to the memory. Asics common mode correction is calculated based on single image. Cell common mode is calculated across trains and groups of 32 cells. Both corrections are iterative and requires 4 iterations. Correction is performed in chunks of (e.g. 512 images). A complete array of data from one file (256 trains, 352 cells) will take 256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory :param i_proc: Index of shared memory array to process :param asic: Asic number to process """ if not self.corr_bools.get("common_mode"): return dark_min = self.cm_dark_min dark_max = self.cm_dark_max fraction = self.cm_dark_fraction n_itr = self.cm_n_itr n_img = self.shared_dict[i_proc]['nImg'][0] if n_img == 0: return cell_id = self.shared_dict[i_proc]['cellId'][:n_img] train_id = self.shared_dict[i_proc]['trainId'][:n_img] cell_ids = cell_id[train_id == train_id[0]] n_cells = cell_ids.size data = self.shared_dict[i_proc]['data'][:n_img].reshape(-1, n_cells, 8, 64, 2, 64) # Loop over iterations for _ in range(n_itr): # Loop over rows of cells # TODO: check what occurs in case of 64 memory cells # as it will have less than 11 iterations first = 0 for cell_row in range(11): last = first + cell_ids[(cell_ids // 32) == cell_row].size if first == last: continue asic_data = data[:, first:last, asic % 8, :, asic // 8, :] # Cell common mode cell_cm_sum, cell_cm_count = \ calgs.sum_and_count_in_range_cell(asic_data, dark_min, dark_max) cell_cm = cell_cm_sum / cell_cm_count # TODO: check files with less 256 trains cell_cm[cell_cm_count < fraction * 32 * 256] = 0 asic_data[...] -= cell_cm[None, None, :, :] # Asics common mode asic_cm_sum, asic_cm_count = \ calgs.sum_and_count_in_range_asic(asic_data, dark_min, dark_max) asic_cm = asic_cm_sum / asic_cm_count asic_cm[asic_cm_count < fraction * 64 * 64] = 0 asic_data[...] -= asic_cm[:, :, None, None] first = last
[docs] def mask_zero_std(self, i_proc, cells): """ Add bad pixel bit: DATA_STD_IS_ZERO to the mask of bad pixels Pixel is bad if standard deviation for a given pixel and given memory cell is zero :param i_proc: Index of shared memory array to process :param cells: List of cells to be considered """ if not self.corr_bools.get("mask_zero_std"): return module_idx = self.shared_dict[i_proc]['moduleIdx'][0] n_img = self.shared_dict[i_proc]['nImg'][0] data = self.shared_dict[i_proc]['data'][:n_img] cellid = self.shared_dict[i_proc]['cellId'][:n_img] mask_std = self.mask[module_idx] # shape of n_cells, x, y for c in cells: std = np.nanstd(data[cellid == c, ...], axis=0) mask_std[:, c, std == 0] |= BadPixels.DATA_STD_IS_ZERO
[docs] def offset_correction(self, i_proc: int, first: int, last: int): """ Perform image-wise offset correction for data in shared memory :param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process """ module_idx = self.shared_dict[i_proc]['moduleIdx'][0] data = self.shared_dict[i_proc]['data'][first:last] rawgain = self.shared_dict[i_proc]['rawgain'][first:last] gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: # first evaluate the gain into 0, 1, 2 --> high, medium, low t0 = self.thresholds[module_idx][0] t1 = self.thresholds[module_idx][1] if self.corr_bools.get("melt_snow"): # load raw_data and rgain to be used during gain_correction self.shared_dict[i_proc]["t0_rgain"][first:last] = rawgain / t0[cellid, ...] # noqa self.shared_dict[i_proc]["raw_data"][first:last] = np.copy(data) # noqa # Often most pixels are in high-gain, so it's more efficient to # set the whole output block to zero than select the right pixels. gain[:] = 0 # exceeding first threshold means data is medium or low gain gain[rawgain > t0[cellid, ...]] = 1 # exceeding also second threshold means data is low gain gain[rawgain > t1[cellid, ...]] = 2 else: # the enum values map 1, 2, 3 to (fixed) gain modes gain[:] = self.gain_mode - 1 offsetb = self.offset[module_idx][:, cellid] # force into high or medium gain if requested if self.corr_bools.get("force_mg_if_below"): gain[(gain == 2) & ((data - offsetb[1]) < self.mg_hard_threshold)] = 1 # noqa if self.corr_bools.get("force_hg_if_below"): gain[(gain > 0) & ((data - offsetb[0]) < self.hg_hard_threshold)] = 0 # noqa # choose constants according to gain setting off = calgs.gain_choose(gain, offsetb) del offsetb # subtract offset data -= off del off
[docs] def baseline_correction(self, i_proc: int, first: int, last: int): """ Perform image-wise base-line shift correction for data in shared memory via histogram or stripe :param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process """ # before doing relative gain correction we need to evaluate any # baseline shifts # as they are effectively and additional offset in the data if not any(self.blc_bools): return module_idx = self.shared_dict[i_proc]['moduleIdx'][0] data = self.shared_dict[i_proc]['data'][first:last] bl_shift = self.shared_dict[i_proc]['blShift'][first:last] gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] # output is saved in sharedmem to pass for correct_agipd() # as this function takes about 3 seconds. self.shared_dict[i_proc]["msk"][first:last] = calgs.gain_choose( gain, self.mask[module_idx][:, cellid] ) if hasattr(self, "rel_gain"): # Get the correct rel_gain depending on cell-id self.shared_dict[i_proc]['rel_corr'][first:last] = \ calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # do this image wise, as the shift is per image for i in range(data.shape[0]): # first correction requested may be to evaluate shift via # noise peak if self.corr_bools.get('blc_noise'): mn_noise = np.nanmean(self.noise[module_idx][0, cellid[i]]) dd, sh = baseline_correct_via_noise(data[i], mn_noise, gain[i], self.baseline_corr_noise_threshold) # noqa # if not we continue with initial data else: dd = data[i] sh = 0 # if we have enough pixels in medium or low gain and # correction via hist matching is requested to this now gcrit = np.count_nonzero(gain[i] > 0) > 1000 if (gcrit and self.corr_bools.get('blc_hmatch') and hasattr(self, "rel_gain")): dd2, sh2 = correct_baseline_via_hist(data[i], self.shared_dict[i_proc]['rel_corr'][first:last][i], # noqa gain[i]) data[i] = np.maximum(dd, dd2) sh = np.minimum(sh, sh2) # finally correct diagonal effects if requested if self.corr_bools.get('corr_asic_diag'): ii = data[i, ...] gg = gain[i, ...] adim = correct_baseline_via_hist_asic(ii, gg) data[i, ...] = adim # if there is not enough medium or low gain data to do an # evaluation, do nothing else: data[i, ...] = dd bl_shift[i] = sh if self.corr_bools.get('blc_stripes'): fmh = self.frac_high_med[module_idx][cellid[i]] dd, sh = baseline_correct_via_stripe(data[i, ...], gain[i, ...], self.shared_dict[i_proc]['msk'][first:last][i, ...], # noqa fmh) data[i, ...] = dd bl_shift[i] = sh
[docs] def gain_correction(self, i_proc: int, first: int, last: int): """ Perform several image-wise corrections for data in shared memory e.g. Relative gain, FlatField xray correction, ..... :param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process """ module_idx = self.shared_dict[i_proc]['moduleIdx'][0] data = self.shared_dict[i_proc]['data'][first:last] gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] mask = self.shared_dict[i_proc]['mask'][first:last] rel_corr = self.shared_dict[i_proc]['rel_corr'][first:last] msk = self.shared_dict[i_proc]['msk'][first:last] # if baseline correction was not requested # msk and rel_corr will still be empty shared_mem arrays if not any(self.blc_bools): msk = calgs.gain_choose(gain, self.mask[module_idx][:, cellid]) # same for relative gain and then bad pixel mask if hasattr(self, "rel_gain"): # Get the correct rel_gain depending on cell-id rel_corr = calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # noqa # Correct for relative gain if self.corr_bools.get("rel_gain") and hasattr(self, "rel_gain"): data *= rel_corr del rel_corr # Adjust medium gain baseline to match highest high gain value if self.corr_bools.get("adjust_mg_baseline"): mgbc = self.md_additional_offset[module_idx][cellid, ...] data[gain == 1] += mgbc[gain == 1] del mgbc # Set negative values for medium gain to 0 # TODO: Probably it would be better to add it to badpixel maps, # not just set to 0 if self.corr_bools.get('blc_set_min'): data[(data < 0) & (gain == 1)] = 0 # Do xray correction if requested # The slopes we have in our constants are already relative # slopeFF = slopeFFpix/avarege(slopeFFpix) # To apply them we have to / not * if self.corr_bools.get("xray_corr"): data /= self.xray_cor[module_idx][cellid, ...] # use sharedmem raw_data and t0_rgain # after calculating it while offset correcting. if self.corr_bools.get('melt_snow'): _ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last], # noqa data, gain, self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa self.snow_resolution) # Inner ASIC borders are matched to the same signal level if self.corr_bools.get("match_asics"): data = match_asic_borders(data, 8, module_idx) # Add any non-finite values to the mask, zero them if self.corr_bools.get("zero_nans"): bidx = ~np.isfinite(data) data[bidx] = 0 msk[bidx] |= BadPixels.VALUE_IS_NAN del bidx # Add pixels with unrealistically high and low values to the mask. # Zero them. if self.corr_bools.get("zero_orange"): bidx = (data < -1e7) | (data > 1e7) data[bidx] = 0 msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE del bidx # Round keV-normalized intensity to photons. if self.corr_bools.get("round_photons"): data_hist_preround, _ = np.histogram(data, bins=self.hist_bins_preround) data /= self.photon_energy np.round(data, out=data) # This could also be done before and its mask inverted for # rounding, but the performance difference is negligible. bidx = data < 0 data[bidx] = 0 msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE del bidx data_hist_postround, _ = np.histogram(data * self.photon_energy, bins=self.hist_bins_postround) with self.hist_lock: self.shared_hist_preround += data_hist_preround self.shared_hist_postround += data_hist_postround if np.issubdtype(self.recast_image_fields.get('image'), np.integer): # If the image data is meant to be recast to an integer # type, make sure its values are within its bounds. type_info = np.iinfo(self.recast_image_data['image']) bidx = data < type_info.min data[bidx] = 0 msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE del bidx bidx = data > type_info.max data[bidx] = type_info.max msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE del bidx # Mask entire ADC if they are noise above a threshold # TODO: Needs clarification if needed, # the returned arg is not used. if self.corr_bools.get("mask_noisy_adc"): _ = make_noisy_adc_mask(msk, self.noisy_adc_threshold) # Copy the data across into the existing shared-memory array mask[...] = msk[...]
[docs] def get_valid_image_idx(self, im_dc: DataCollection) -> list: # noqa """Return a list of valid train ids. Exclude non-valid train ids from past or future. """ dc_trains = im_dc.train_ids if len(dc_trains) == 0: return [] # No trains to validate. # Check against train ID filter list, if any if self.train_ids is not None: valid = np.in1d(dc_trains, self.train_ids) else: valid = np.ones_like(dc_trains, dtype=bool) # Train indices are of type=f32 # Validate that train indices values fall # between medianTrain +- 1e4 medianTrain = np.nanmedian(dc_trains) lowok = (dc_trains > medianTrain - 1e4) highok = (dc_trains < medianTrain + 1e4) valid &= lowok & highok # exclude non valid trains valid_trains = valid * dc_trains return list(valid_trains[valid_trains != 0])
def apply_selected_pulses(self, i_proc: int) -> int: """Select sharedmem data indices to correct based on selected pulses indices. :param i_proc: the index of sharedmem for a given file/module :return n_img: number of images to correct """ data_dict = self.shared_dict[i_proc] n_img = data_dict['nImg'][0] if not data_dict["cm_presel"][0]: return n_img ntrains = data_dict["n_valid_trains"][0] train_ids = data_dict["valid_trains"][:ntrains] # Initializing can_calibrate array can_calibrate = self.cell_sel.get_cells_on_trains( train_ids, cm=self.cell_sel.CM_FINSEL ) if np.all(can_calibrate): return n_img # Get selected number of images based on # selected pulses to correct n_img_sel = np.count_nonzero(can_calibrate) # Only select data corresponding to selected pulses # and overwrite data in shared-memory leaving # the required indices to correct array_names = ["data", "rawgain", "cellId", "pulseId", "trainId", "gain"] # if AGIPD in fixed gain mode or melting snow was not requested # `t0_rgain` and `raw_data` will be empty shared_mem arrays is_adaptive = self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN melt_snow = self.corr_bools.get("melt_snow") if (is_adaptive and melt_snow): array_names += ["t0_rgain", "raw_data"] # if baseline correction was not requested # `msk` and `rel_corr` will still be empty shared_mem arrays if any(self.blc_bools): array_names += ["blShift", "msk"] if hasattr(self, "rel_gain"): array_names.append("rel_corr") for name in array_names: arr = data_dict[name][:n_img][can_calibrate] data_dict[name][:n_img_sel] = arr # Overwrite number of images data_dict['nImg'][0] = n_img_sel return n_img_sel def copy_and_sanitize_non_cal_data(self, infile, outfile, agipd_base, idx_base, trains): """ Copy and sanitize data in `infile` that is not touched by `correctAGIPD` """ # these are touched in the correct function, do not copy them here dont_copy = ["data", "cellId", "trainId", "pulseId", "status", "length"] dont_copy = [posixpath.join(agipd_base, "image", ds) for ds in dont_copy] # don't copy index as we may need to adjust if we filter trains dont_copy.append(posixpath.join(idx_base, "image")) h5_copy_except_paths(infile, outfile, dont_copy) # sanitize indices for do in ["image", ]: # uq: INDEX/trainID # fidxv: INDEX/.../image/first idx values # cntsv: INDEX/.../image/counts values # Extract parameters through identifying # unique trains, index and numbers. uq, fidxv, cntsv = np.unique(trains, return_index=True, return_counts=True) # noqa # Validate calculated CORR INDEX contents by checking # difference between trainId stored in RAW data and trains from train_diff = np.isin(np.array(infile["/INDEX/trainId"]), uq, invert=True) # noqa # Insert zeros for missing trains. # fidxv and cntsv should have same length as # raw INDEX/.../image/first and INDEX/.../image/count, # respectively # first_inc = first incrementation first_inc = True for i, diff in enumerate(train_diff): if diff: if i < len(cntsv): cntsv = np.insert(cntsv, i, 0) fidxv = np.insert(fidxv, i, 0) if i == 0 else np.insert(fidxv, i, fidxv[i]) else: # append if at the end of the array cntsv = np.append(cntsv, 0) # increment fidxv once with the # no. of processed mem-cells. if first_inc: fidxv = np.append(fidxv, (2 * fidxv[i-1]) - fidxv[i-2]) first_inc = False else: fidxv = np.append(fidxv, fidxv[i-1]) # save INDEX contents (first, count) in CORR files outfile.create_dataset(idx_base + "{}/first".format(do), fidxv.shape, dtype=fidxv.dtype, data=fidxv, fletcher32=True) outfile.create_dataset(idx_base + "{}/count".format(do), cntsv.shape, dtype=cntsv.dtype, data=cntsv, fletcher32=True) def init_constants(self, cons_data, when, module_idx, variant): """ For CI derived gain, a mean multiplication factor of 4.48 compared to medium gain is used, as no reliable CI data for all memory cells exists of the current AGIPD instances. Relative gain is derived both from pulse capacitor as well as low intensity flat field data, information from flat field data is needed to 'calibrate' pulse capacitor data, if there is no available FF data, relative gain for High Gain stage is set to 1: * Relative gain for High gain stage - from the FF data we get the relative slopes of a given pixel and memory cells with respect to all memory cells and all pixels in the module, Please note: Current slopesFF avaialble in calibibration constants are created per pixel only, not per memory cell: rel_high_gain = 1 if only PC data is available rel_high_gain = rel_slopesFF if FF data is also available * Relative gain for Medium gain stage: we derive the factor between high and medium gain using slope information from fits to the linear part of high and medium gain: rfpc_high_medium = m_h/m_m where m_h and m_m is the medium gain slope of given memory cells and pixel and m_h is the high gain slope as above rel_gain_medium = rel_high_gain * rfpc_high_medium With this data the relative gain for the three gain stages evaluates to: rel_high gain = 1 or rel_slopesFF rel_medium gain = rel_high_gain * rfpc_high_medium rel_low gain = _rel_medium gain * 4.48 :param cons_data: A dictionary for each retrieved constant value. :param when: A dictionary for the creation time of each retrieved constant. :param module_idx: A module_idx index :param variant: A dictionary for the variant of each retrieved CCV. :return: """ # Distribute threads for transposition evenly across all modules # assuming this method runs in parallel. calgs_opts = dict(num_threads=os.cpu_count() // len(self.offset)) calgs.transpose_constant(self.offset[module_idx], cons_data['Offset'], **calgs_opts) calgs.transpose_constant(self.noise[module_idx], cons_data['Noise'], **calgs_opts) if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: calgs.transpose_constant(self.thresholds[module_idx], cons_data['ThresholdsDark'][..., :3], **calgs_opts) if self.corr_bools.get("low_medium_gap"): t0 = self.thresholds[module_idx][0] t1 = self.thresholds[module_idx][1] t1[t1 <= 1.05 * t0] = np.iinfo(np.uint16).max bpixels = cons_data["BadPixelsDark"].astype(np.uint32) if self.corr_bools.get("xray_corr"): if when["BadPixelsFF"]: bpixels |= cons_data["BadPixelsFF"].astype(np.uint32)[..., :bpixels.shape[2], # noqa None] if when["SlopesFF"]: # Checking if constant was retrieved slopesFF = cons_data["SlopesFF"] # This could be used for backward compatibility # for very old SlopesFF constants if len(slopesFF.shape) == 4: slopesFF = slopesFF[..., 0] # This is for backward compatability for old FF constants # (128, 512, mem_cells) if slopesFF.shape[-1] == 2: xray_cor = np.squeeze(slopesFF[..., 0]) xray_cor_med = np.nanmedian(xray_cor) xray_cor[np.isnan(xray_cor)] = xray_cor_med xray_cor[(xray_cor < 0.8) | ( xray_cor > 1.2)] = xray_cor_med xray_cor = np.dstack([xray_cor]*self.max_cells) else: # Memory cell resolved xray_cor correction xray_cor = slopesFF # (128, 512, mem_cells) if xray_cor.shape[-1] < self.max_cells: # When working with new constant with fewer memory # cells, eg. lacking enough FF data or during # development, xray_cor must be expand its last memory # cell to maintain a consistent shape. xray_cor = np.dstack(xray_cor, np.dstack([xray_cor[..., -1]] * (self.max_cells - xray_cor.shape[-1]))) # noqa # This is already done for old constants, # but new constant is absolute and we need to have # global ADU output for the moment xray_cor /= self.ff_gain else: xray_cor = np.ones((128, 512, self.max_cells), np.float32) self.xray_cor[module_idx][...] = xray_cor.transpose()[...] # add additional bad pixel information if any(self.pc_bools): if when["BadPixelsPC"]: bppc = np.moveaxis(cons_data["BadPixelsPC"].astype(np.uint32), 0, 2) bpixels |= bppc[..., :bpixels.shape[2], None] # calculate relative gain from the constants rel_gain = np.ones((128, 512, self.max_cells, 3), np.float32) if when["SlopesPC"]: slopesPC = cons_data["SlopesPC"].astype(np.float32, copy=False) # This will handle some historical data in a different format # constant dimension injected first if slopesPC.shape[0] in [10, 11]: slopesPC = np.moveaxis(slopesPC, 0, 3) slopesPC = np.moveaxis(slopesPC, 0, 2) # high gain slope from pulse capacitor data pc_high_m = slopesPC[..., :self.max_cells, 0] pc_high_l = slopesPC[..., :self.max_cells, 1] # medium gain slope from pulse capacitor data pc_med_m = slopesPC[..., :self.max_cells, 3] pc_med_l = slopesPC[..., :self.max_cells, 4] # calculate median for slopes pc_high_med = np.nanmedian(pc_high_m, axis=(0, 1)) pc_med_med = np.nanmedian(pc_med_m, axis=(0, 1)) if variant == 0: # calculate median for intercepts: pc_high_l_med = np.nanmedian(pc_high_l, axis=(0, 1)) pc_med_l_med = np.nanmedian(pc_med_l, axis=(0, 1)) # sanitize PC data with CCV variant = 0. # Sanitization is already done for constants # with CCV variant = 1 # In the following loop, # replace `nan`s across memory cells with # the median value calculated previously. # Then, values outside of the valid range (0.8 and 1.2) # are fixed to the median value. # This is applied for high and medium gain stages for i in range(self.max_cells): pc_high_m[ np.isnan(pc_high_m[..., i]), i] = pc_high_med[i] pc_med_m[ np.isnan(pc_med_m[..., i]), i] = pc_med_med[i] pc_high_l[ np.isnan(pc_high_l[..., i]), i] = pc_high_l_med[i] pc_med_l[ np.isnan(pc_med_l[..., i]), i] = pc_med_l_med[i] pc_high_m[ (pc_high_m[..., i] < 0.8 * pc_high_med[i]) | (pc_high_m[..., i] > 1.2 * pc_high_med[i]), i] = pc_high_med[i] # noqa pc_med_m[ (pc_med_m[..., i] < 0.8 * pc_med_med[i]) | (pc_med_m[..., i] > 1.2 * pc_med_med[i]), i] = pc_med_med[i] # noqa pc_high_l[ (pc_high_l[..., i] < 0.8 * pc_high_l_med[i]) | (pc_high_l[..., i] > 1.2 * pc_high_l_med[i]), i] = pc_high_l_med[i] # noqa pc_med_l[ (pc_med_l[..., i] < 0.8 * pc_med_l_med[i]) | (pc_med_l[..., i] > 1.2 * pc_med_l_med[i]), i] = pc_med_l_med[i] # noqa # ration between HG and MG per pixel per mem cell used # for rel gain calculation frac_high_med_pix = pc_high_m / pc_med_m # average ratio between HG and MG as a function of # mem cell (needed for bls_stripes) # TODO: Per pixel would be more optimal correction frac_high_med = pc_high_med / pc_med_med # calculate additional medium-gain offset md_additional_offset = pc_high_l - pc_med_l * pc_high_m / pc_med_m # noqa # Calculate relative gain. If FF constants are available, # use them for high gain # if not rel_gain is calculated using PC data only # if self.corr_bools.get("xray_corr"): # rel_gain[..., :self.max_cells, 0] /= xray_corr # PC data should be 'calibrated with X-ray data, # if it is not done, it is better to use 1 instead of bias # the results with PC arteffacts. # rel_gain[..., 0] = 1./(pc_high_m / pc_high_ave) rel_gain[..., 1] = rel_gain[..., 0] * frac_high_med_pix rel_gain[..., 2] = rel_gain[..., 1] * 4.48 else: # Intialize with fake calculated parameters of Ones and Zeros md_additional_offset = np.zeros((128, 512, self.max_cells), np.float32) frac_high_med = np.ones((self.max_cells,), np.float32) self.md_additional_offset[module_idx][...] = md_additional_offset.transpose()[...] # noqa calgs.transpose_constant(self.rel_gain[module_idx], rel_gain, **calgs_opts) self.frac_high_med[module_idx][...] = frac_high_med calgs.transpose_constant(self.mask[module_idx], bpixels, **calgs_opts) return def initialize_from_yaml( self, karabo_da: str, const_yaml: Dict[str, Any], module_idx: int ) -> Dict[str, Any]: """Initialize calibration constants from a yaml file :param karabo_da: a karabo data aggregator :param const_yaml: from the "retrieved-constants" part of a yaml file from pre-notebook, which consists of metadata of either the constant file path or the empty constant shape, and the creation-time of the retrieved constants :param module_idx: Index of module :return when: dict of retrieved constants with their creation-time """ # string of the device name. cons_data = dict() when = dict() variant = dict() db_module = const_yaml[karabo_da]["physical-detector-unit"] for cname, mdata in const_yaml[karabo_da]["constants"].items(): base_key = f"{db_module}/{cname}/0" when[cname] = mdata["creation-time"] if when[cname]: with h5py.File(mdata["file-path"], "r") as cf: cons_data[cname] = np.copy(cf[f"{base_key}/data"]) # Set variant to 0 if the attribute is missing # as for old constants. if "variant" in cf[base_key].attrs.keys(): variant[cname] = cf[base_key].attrs["variant"] else: variant[cname] = 0 else: # Create empty constant using the list elements cons_data[cname] = getattr(np, mdata["file-path"][0])(mdata["file-path"][1]) # noqa self.init_constants(cons_data, when, module_idx, variant) return when def initialize_from_db(self, karabo_id: str, karabo_da: str, cal_db_interface: str, creation_time: datetime, memory_cells: float, bias_voltage: int, photon_energy: float, gain_setting: float, acquisition_rate: float, integration_time: int, module_idx: int, only_dark: bool = False): """ Initialize calibration constants from the calibration database :param karabo_id: karabo identifier :param karabo_da: karabo data aggregator :param cal_db_interface: database interaface port :param creation_time: time for desired calibration constant version :param memory_cells: number of memory cells used for CCV conditions :param bias_voltage: bias voltage used for CCV conditions :param photon_energy: photon energy used for CCV conditions :param gain_setting: gain setting used for CCV conditions :param acquisition_rate: acquistion rate used for CCV conditions :param integration_time: integration time used for CCV conditions :param module_idx: module index to save retrieved CCV in sharedmem :param only_dark: load only dark image derived constants. This implies that a `calfile` is used to load the remaining constants. Useful to reduce DB traffic and interactions for non-frequently changing constants, i.e. such which are not usually updated during a beamtime. The `cal_db_interface` parameter in the `dbparms` tuple may be in one of the following notations: * tcp://host:port to directly identify the host and port to connect to * tcp://host:port_low#port_high to specify a port range from which a random port will be picked. E.g. specifying tcp://max-exfl016:8015#8025 will randomly pick an address in the range max-exfl016:8015 and max-exfl016:8025. The latter notation allows for load-balancing. This routine loads the following constants as given in `iCalibrationDB`: Dark Image Derived ------------------ * Constants.AGIPD.Offset * Constants.AGIPD.Noise * Constants.AGIPD.BadPixelsDark * Constants.AGIPD.ThresholdsDark Pulse Capacitor Derived ----------------------- * Constants.AGIPD.SlopesPC Flat-Field Derived * Constants.AGIPD.SlopesFF """ const_dict = assemble_constant_dict( self.corr_bools, self.pc_bools, memory_cells, bias_voltage, gain_setting, acquisition_rate, photon_energy, beam_energy=None, only_dark=only_dark, integration_time=integration_time ) when = {} cons_data = {} variant = {} for cname, cval in const_dict.items(): condition = getattr( Conditions, cval[2][0]).AGIPD(**cval[2][1]) cdata, md = get_from_db( karabo_id=karabo_id, karabo_da=karabo_da, constant=getattr(Constants.AGIPD, cname)(), condition=condition, empty_constant=getattr(np, cval[0])(cval[1]), cal_db_interface=cal_db_interface, creation_time=creation_time, verbosity=0, ) cons_data[cname] = cdata variant[cname] = md.calibration_constant_version.variant when[cname] = None # Read the CCV begin at if constant was retrieved successfully. if md and md.comm_db_success: when[cname] = md.calibration_constant_version.begin_at self.init_constants(cons_data, when, module_idx, variant) return when def allocate_constants(self, modules, constant_shape): """ Allocate memory for correction constants :param modules: Module indices :param constant_shape: Shape of expected constants (gain, cells, x, y) """ for module_idx in modules: self.offset[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: self.thresholds[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa self.noise[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa self.md_additional_offset[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") # noqa self.rel_gain[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], dtype="f4") # noqa self.mask[module_idx] = sharedmem.empty(constant_shape, dtype="i4") self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") # noqa def allocate_images(self, shape, n_cores_files): """ Allocate memory for image data and variable shared between correction functions :param shape: Shape of expected data (nImg, x, y) :param n_cores_files: Number of files, handled in parallel """ self.shared_dict = [] for i in range(n_cores_files): self.shared_dict.append({}) self.shared_dict[i]["cellId"] = sharedmem.empty(shape[0], dtype="u2") # noqa self.shared_dict[i]["pulseId"] = sharedmem.empty(shape[0], dtype="u8") # noqa self.shared_dict[i]["trainId"] = sharedmem.empty(shape[0], dtype="u8") # noqa self.shared_dict[i]["moduleIdx"] = sharedmem.empty(1, dtype="i4") self.shared_dict[i]["nImg"] = sharedmem.empty(1, dtype="i4") self.shared_dict[i]["mask"] = sharedmem.empty(shape, dtype="u4") self.shared_dict[i]["data"] = sharedmem.empty(shape, dtype="f4") self.shared_dict[i]["rawgain"] = sharedmem.empty(shape, dtype="u2") self.shared_dict[i]["gain"] = sharedmem.empty(shape, dtype="u1") self.shared_dict[i]["blShift"] = sharedmem.empty(shape[0], dtype="f4") # noqa # Parameters shared between image-wise correction functions self.shared_dict[i]["msk"] = sharedmem.empty(shape, dtype="i4") self.shared_dict[i]["raw_data"] = sharedmem.empty(shape, dtype="f4") # noqa self.shared_dict[i]["rel_corr"] = sharedmem.empty(shape, dtype="f4") # noqa self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2") # noqa # Valid trains self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b") self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4") # noqa self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8") # noqa if self.corr_bools.get("round_photons"): self.shared_hist_preround = sharedmem.empty(len(self.hist_bins_preround) - 1, dtype="i8") self.shared_hist_postround = sharedmem.empty(len(self.hist_bins_postround) - 1, dtype="i8")
def validate_selected_pulses( max_pulses: List[int], max_cells: int ) -> List[int]: """Validate the selected pulses given from the notebook Validate the selected range of pulses to correct raw data of at least one image. 1) A pulse indices within one train can't be greater than the operating memory cells. 2) Validate the order of the given raneg of pulses. 3) Raise value error if generate list of pulses is empty. :param max_pulses: a list of at most 3 elements defining the range of pulses to calibrate. :param max_cells: operating memory cells. :return adjusted_range: An adjusted range of pulse indices to correct. """ # Validate selected pulses range: # 1) A pulseId can't be greater than the operating memory cells. pulses_range = [max_cells if p > max_cells else p for p in max_pulses] if pulses_range != max_pulses: print( "WARNING: \"max_pulses\" list has been modified from " f"{max_pulses} to {pulses_range}. As the number of " "operating memory cells are less than the selected " "maximum pulse." ) if len(pulses_range) == 1: adjusted_range = (0, pulses_range[0], 1) elif len(pulses_range) == 2: adjusted_range = (pulses_range[0], pulses_range[1], 1) elif len(pulses_range) == 3: adjusted_range = tuple(pulses_range) else: raise ValueError( "ERROR: Wrong length for the list of pulses indices range. " "Please check the given range for \"max_pulses\":" f"{max_pulses}. \"max_pulses\" needs to be a list of " "3 elements, [start, last, step]") if adjusted_range[0] > adjusted_range[1]: raise ValueError( "ERROR: Pulse range start is greater than range end. " "Please check the given range for \"max_pulses\":" f"{max_pulses}. \"max_pulses\" needs to be a list of " "3 elements, [start, last, step]") if not np.all([isinstance(p, int) for p in max_pulses]): raise TypeError( "ERROR: \"max_pulses\" elements needs to be integers:" f" {max_pulses}.") print( "A range of pulse indices is selected to correct:" f" {pulses_range}") return adjusted_range class CellRange(CellSelection): """Selection of detector memory cells given as a range""" def __init__(self, crange: List[int], max_cells: int): """Initialize range selection :param crange: range parameters of selected cells, list up to 3 elements :param max_cells: number of exposed cells """ self.max_cells = max_cells self.crange = validate_selected_pulses(crange, max_cells) self.flag = np.zeros(self.max_cells, bool) self.flag_cm = np.zeros(self.ncell_max, bool) self.flag[slice(*crange)] = True self.flag_cm[:self.max_cells] = self.flag self.flag_cm = (self.flag_cm.reshape(-1, self.row_size).any(1) .repeat(self.row_size)[:self.max_cells]) def msg(self): return ( f"Use range selection with crange={self.crange}, " f"max_cells={self.max_cells}\n" f"Frames per train: {self.flag.sum()}" ) def get_cells_on_trains( self, train_sel: List[int], cm: int = 0 ) -> np.array: return np.tile(self._sel_for_cm(self.flag, self.flag_cm, cm), len(train_sel)) class LitFrameSelection(CellSelection): """Selection of detector memery cells indicated as lit frames by the AgipdLitFrameFinder """ def __init__(self, litfrmdata: 'AgipdLitFrameFinderOffline', train_ids: List[int], crange: Optional[List[int]] = None, energy_threshold: float = -1000): """Initialize lit frame selection :param litfrmdata: AgipdLitFrameFinder output data :param train_ids: the list of selected trains :param crange: range parameters of selected cells, list up to 3 elements """ # read AgipdLitFrameFinder data self.dev = litfrmdata.meta.litFrmDev self.crange = validate_selected_pulses(crange, self.ncell_max) self.energy_threshold = energy_threshold nfrm = litfrmdata.output.nFrame litfrm_train_ids = litfrmdata.meta.trainId litfrm = litfrmdata.output.nPulsePerFrame > 0 if energy_threshold != -1000: litfrm &= litfrmdata.output.energyPerFrame > energy_threshold # apply range selection if crange is None: cellsel = np.ones(self.ncell_max, bool) else: cellsel = np.zeros(self.ncell_max, bool) cellsel[slice(*crange)] = True # update train selection, removing trains without litframe data if train_ids is None: train_ids = np.unique(litfrm_train_ids) ntrain = len(train_ids) else: ntrain_orig = len(train_ids) train_ids, _, litfrm_train_idx = np.intersect1d( train_ids, litfrm_train_ids, return_indices=True ) litfrm = litfrm[litfrm_train_idx] nfrm = nfrm[litfrm_train_idx] ntrain = len(train_ids) if ntrain != ntrain_orig: print(f"Lit frame data missed for {ntrain_orig - ntrain}. " "Skip them.") # initiate instance attributes nimg = np.sum(nfrm) self.train_ids = train_ids self.count = nfrm self.first = np.zeros(ntrain, int) self.flag = np.zeros(nimg, bool) self.flag_cm = np.zeros(nimg, bool) self.min_sel = nfrm.max() self.max_sel = 0 # compress frame pattern i0 = 0 for i in range(ntrain): n = nfrm[i] iN = i0 + n trnflg = litfrm[i] & cellsel trnflg[n:] = False self.first[i] = i0 self.flag[i0:iN] = trnflg[:n] self.flag_cm[i0:iN] = (trnflg.reshape(-1, self.row_size).any(1) .repeat(self.row_size)[:n]) nlit = trnflg[:n].sum() self.min_sel = min(self.min_sel, nlit) self.max_sel = max(self.max_sel, nlit) i0 = iN def msg(self): srng = (f"{self.min_sel}" if self.min_sel == self.max_sel else f"{self.min_sel}~{self.max_sel}") return ( f"Use lit frame selection from {self.dev}, crange={self.crange}\n" f"Frames per train: {srng}" ) def get_cells_on_trains( self, train_sel: List[int], cm: int = 0 ) -> np.array: train_idx = np.flatnonzero(np.in1d(self.train_ids, train_sel)) i0 = self.first[train_idx] iN = i0 + self.count[train_idx] idx = np.concatenate( [np.arange(i0[i], iN[i], dtype=int) for i in range(train_idx.size)] ) return self._sel_for_cm(self.flag[idx], self.flag_cm[idx], cm)