Skip to content

agipdlib

AgipdCorrections

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
class AgipdCorrections:

    def __init__(
        self,
        max_cells: int,
        cell_sel: CellSelection,
        h5_data_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/",
        h5_index_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/",
        corr_bools: Optional[dict] = None,
        gain_mode: AgipdGainMode = AgipdGainMode.ADAPTIVE_GAIN,
        comp_threads: int = 1,
        train_ids: Optional[np.ndarray] = None
    ):
        """
        Initialize an AgipdCorrections Class

        :param max_cells: maximum number of memory cells to handle, e.g. if
                         calibration constants only exist for a subset of cells
        :param cell_sel: the CellSelection indicates cells selected for
            calibration
        :param h5_data_path: path in HDF5 file which is prefixed to the
            image/data section
        :param h5_index_path: path in HDF5 file which is prefixed to the
            index section
        :param corr_bools: A dict with all of the correction booleans requested
                           or available
        :param comp_threads: Number of threads to use for compressing gain/mask
        :param train_ids: train IDs to process, all if omitted.

        The following example shows a typical use case:
        .. code-block:: python

            cell_sel = CellRange(max_pulses, max_cells)
            agipd_corr = AgipdCorrections(max_cells, cell_sel,
                                          h5_data_path=h5path,
                                          h5_index_path=h5path_idx,
                                          corr_bools=corr_bools)

            agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128))

            metadata = cal_tools.tools.CalibrationMetadata(out_folder)
            const_yaml = metadata["retrieved-constants"]

            for mod in modules:
                agipd_corr.initialize_from_yaml(karabo_da, const_yaml, mod)

            data_shape = (n_images_max, 512, 128)
            agipd_corr.allocate_images(data_shape, n_cores_files)

            i_proc=0
            agipd_corr.read_file(i_proc, file_name, apply_sel_pulses)

            # Correct first 1000 images
            agipd_corr.correct_agipd(i_proc, first=0, last=1000)

            agipd_corr.write_file(i_proc, file_name, ofile_name)

        """

        if corr_bools is None:
            corr_bools = {}

        # Data description
        self.h5_data_path = h5_data_path
        self.h5_index_path = h5_index_path

        self.max_cells = max_cells
        self.gain_mode = gain_mode
        self.comp_threads = comp_threads
        self.train_ids = np.array(train_ids) if train_ids is not None else None

        self.cell_sel = cell_sel

        # Correction parameters
        self.baseline_corr_noise_threshold = -1000
        self.snow_resolution = SnowResolution.INTERPOLATE
        self.cm_dark_fraction = 0.66
        self.cm_dark_min = -25.
        self.cm_dark_max = 25.
        self.cm_n_itr = 4
        self.mg_hard_threshold = 100
        self.hg_hard_threshold = 100
        self.noisy_adc_threshold = 0.25
        self.ff_gain = 1
        self.photon_energy = 9.2
        self.rounding_threshold = 0.5
        self.cs_mg_adjust = 7e3

        # Output parameters
        self.compress_fields = ['gain', 'mask']
        self.recast_image_fields = {}

        # Shared variables for data and constants
        self.shared_dict = []
        self.offset = {}
        self.noise = {}
        self.thresholds = {}
        self.frac_high_med = {}
        self.md_additional_offset = {}
        self.rel_gain = {}
        self.mask = {}
        self.xray_cor = {}

        if corr_bools.get('round_photons'):
            # Variables related to histogramming.
            self.shared_hist_preround = None
            self.shared_hist_postround = None
            self.hist_bins_preround = np.linspace(-5.0, 60.0, 200)
            self.hist_bins_postround = np.arange(-5.5, 60.1)
            self.hist_lock = Manager().Lock()

        # check if given corr_bools are correct
        tot_corr_bools = ['only_offset', 'adjust_mg_baseline', 'pc_corr',
                          'cs_corr', 'xray_corr', 'blc_noise', 'blc_hmatch',
                          'blc_stripes', 'blc_set_min', 'match_asics',
                          'corr_asic_diag', 'zero_nans',
                          'zero_orange', 'force_hg_if_below',
                          'force_mg_if_below', 'mask_noisy_adc',
                          'melt_snow', 'common_mode', 'mask_zero_std',
                          'low_medium_gap', 'round_photons']

        if set(corr_bools).issubset(tot_corr_bools):
            self.corr_bools = corr_bools
        else:
            bools = list(set(corr_bools) - set(tot_corr_bools))
            raise Exception('Correction Booleans: '
                            f'{bools} are not available!')

        # Flags allowing for pulse capacitor / current source constant retrieval.
        self.relgain_bools = [
            self.corr_bools.get("pc_corr"),
            self.corr_bools.get("cs_corr"),
            # This correction is disabled if cs_corr == True
            self.corr_bools.get("adjust_mg_baseline"),
            self.corr_bools.get('blc_noise'),
            self.corr_bools.get('blc_hmatch'),
            self.corr_bools.get('blc_stripes'),
            self.corr_bools.get('melt_snow')
        ]

        self.blc_bools = [self.corr_bools.get('blc_noise'),
                          self.corr_bools.get('blc_hmatch'),
                          self.corr_bools.get('blc_stripes')]

    def read_file(self, i_proc: int, file_name: str,
                  apply_sel_pulses: Optional[bool] = True
                  ) -> int:
        """Read file with raw data to shared memory

        :param file_name: Name of input file including path.
        :param i_proc: Index of shared memory array.
        :param apply_sel_pulses: apply selected pulses before
                                 all corrections.
        :return:
            - n_img: The number of images to correct.
        """
        module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
        agipd_base = self.h5_data_path.format(module_idx)
        data_dict = self.shared_dict[i_proc]
        data_dict['moduleIdx'][0] = module_idx

        h5_dc = H5File(file_name)

        # Exclude trains without data.
        im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)

        valid_train_ids = self.get_valid_image_idx(
            im_dc[agipd_base, "image.trainId"])

        # filter out trains which will not be selected
        valid_train_ids = self.cell_sel.filter_trains(
            np.array(valid_train_ids)).tolist()

        if not valid_train_ids:
            # If there's not a single valid train, exit early.
            print(f"WARNING: No valid trains for {im_dc.files} to process.")
            data_dict['nImg'][0] = 0
            return 0

        # Exclude non_valid trains from the selected data collection.
        im_dc = im_dc.select_trains(by_id(valid_train_ids))

        # Just want to be sure that order is correct
        valid_train_ids = im_dc.train_ids
        # Get a count of images in each train
        nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
        nimg_in_trains = nimg_in_trains.astype(np.int64)

        # store valid trains in shared memory
        n_valid_trains = len(valid_train_ids)
        data_dict["n_valid_trains"][0] = n_valid_trains
        data_dict["valid_trains"][:n_valid_trains] = valid_train_ids

        # get selection for the images in this file
        cm = (self.cell_sel.CM_NONE if apply_sel_pulses
              else self.cell_sel.CM_PRESEL)

        agipd_src = im_dc[agipd_base]

        cellid = agipd_src["image.cellId"].ndarray()[:, 0]

        img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
            np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)

        data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
        data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)

        n_img = img_selected.sum()
        if img_selected.all():
            # All frames selected - use slice to skip unnecessary copy
            frm_ix = np.s_[:]
        else:
            frm_ix = np.flatnonzero(img_selected)

        # read raw data
        # [n_imgs, 2, x, y]
        raw_data = agipd_src['image.data'].ndarray()

        # store in shmem only selected images
        data_dict['nImg'][0] = n_img
        data_dict['data'][:n_img] = raw_data[frm_ix, 0]
        data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
        data_dict['cellId'][:n_img] = cellid[frm_ix]
        data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0]
        data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0]

        return n_img

    def write_file(self, i_proc, file_name, ofile_name):
        """
        Create output file and write corrected data to it

        :param file_name: Name of input file including path
        :param ofile_name: Name of output file including path
        :param i_proc: Index of shared memory array
        """

        module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
        agipd_base = f'INSTRUMENT/{self.h5_data_path}/'.format(module_idx)
        idx_base = self.h5_index_path.format(module_idx)
        data_path = f'{agipd_base}/image'

        # Obtain a shallow copy of the pointer map to allow for local
        # changes in this method.
        data_dict = self.shared_dict[i_proc].copy()

        image_fields = [
            'trainId', 'pulseId', 'cellId', 'data', 'gain', 'mask', 'blShift',
        ]

        n_img = data_dict['nImg'][0]
        if n_img == 0:
            return
        trains = data_dict['trainId'][:n_img]

        # Re-cast fields in-place, i.e. using the same memory region.
        for field, dtype in self.recast_image_fields.items():
            data_dict[field] = cast_array_inplace(data_dict[field], dtype)

        with h5py.File(ofile_name, "w") as outfile:
            # Copy any other data from the input file.
            # This includes indexes, so it's important that the corrected data
            # we write is aligned with the raw data.
            with h5py.File(file_name, "r") as infile:
                self.copy_and_sanitize_non_cal_data(
                    infile, outfile, agipd_base, idx_base, trains
                )

            # All corrected data goes in a /INSTRUMENT/.../image group
            image_grp = outfile[data_path]

            # Set up all the datasets before filling them. This puts the
            # metadata about the datasets together at the start of the file,
            # so it's efficient to examine the file structure.
            for field in image_fields:
                arr = data_dict[field][:n_img]
                if field in self.compress_fields:
                    # gain/mask compressed with gzip level 1, but not
                    # checksummed as we would have to implement this.
                    kw = dict(
                        compression='gzip', compression_opts=1, shuffle=True
                    )
                else:
                    # Uncompressed data can easily be checksummed by HDF5's
                    # filter pipeline. This should be cheap to compute.
                    kw = {'fletcher32': True}
                if arr.ndim > 1:
                    kw['chunks'] = (1,) + arr.shape[1:]  # 1 chunk = 1 image

                image_grp.create_dataset(
                    field, shape=arr.shape, dtype=arr.dtype, **kw
                )

            # Write the corrected data
            for field in image_fields:
                if field in self.compress_fields:
                    self._write_compressed_frames(
                        image_grp[field], data_dict[field][:n_img],
                    )
                else:
                    image_grp[field][:] = data_dict[field][:n_img]

    def _write_compressed_frames(self, dataset, arr):
        """Compress gain/mask frames in multiple threads, and save their data

        This is significantly faster than letting HDF5 do the compression
        in a single thread.
        """
        def _compress_frame(i):
            # Equivalent to the HDF5 'shuffle' filter: transpose bytes for
            # better compression.
            shuffled = np.ascontiguousarray(
                arr[i].view(np.uint8).reshape((-1, arr.itemsize)).transpose()
            )
            return i, zlib.compress(shuffled, level=1)

        with ThreadPool(self.comp_threads) as pool:
            for i, compressed in pool.imap(_compress_frame, range(len(arr))):
                # Each frame is 1 complete chunk
                dataset.id.write_direct_chunk((i, 0, 0), compressed)

    def cm_correction(self, i_proc, asic):
        """
        Perform common-mode correction of data in shared memory

        In a given code a complete file is loaded to the memory.
        Asics common mode correction is calculated based on single image.
        Cell common mode is calculated across trains and groups of 32 cells.
        Both corrections are iterative and requires 4 iterations.

        Correction is performed in chunks of (e.g. 512 images).
        A complete array of data from one file
        (256 trains, 352 cells) will take
        256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory

        :param i_proc: Index of shared memory array to process
        :param asic: Asic number to process
        """
        if not self.corr_bools.get("common_mode"):
            return
        dark_min = self.cm_dark_min
        dark_max = self.cm_dark_max
        fraction = self.cm_dark_fraction
        n_itr = self.cm_n_itr
        n_img = self.shared_dict[i_proc]['nImg'][0]
        if n_img == 0:
            return
        cell_id = self.shared_dict[i_proc]['cellId'][:n_img]
        data = self.shared_dict[i_proc]['data'][:n_img]
        data = data.reshape(-1, 8, 64, 2, 64)

        asic_data = data[:, asic % 8, :, asic // 8, :]
        for _ in range(n_itr):
            calgs.cm_correction(
                asic_data, cell_id, dark_min, dark_max, fraction)

    def mask_zero_std(self, i_proc, cells):
        """
        Add bad pixel bit: DATA_STD_IS_ZERO to the mask of bad pixels

        Pixel is bad if standard deviation for a given pixel and
        given memory cell is zero

        :param i_proc: Index of shared memory array to process
        :param cells: List of cells to be considered
        """
        if not self.corr_bools.get("mask_zero_std"):
            return

        module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
        n_img = self.shared_dict[i_proc]['nImg'][0]
        data = self.shared_dict[i_proc]['data'][:n_img]
        cellid = self.shared_dict[i_proc]['cellId'][:n_img]
        mask_std = self.mask[module_idx]  # shape of n_cells, x, y

        for c in cells:
            std = np.nanstd(data[cellid == c, ...], axis=0)
            mask_std[:, c, std == 0] |= BadPixels.DATA_STD_IS_ZERO

    def offset_correction(self, i_proc: int, first: int, last: int):
        """
        Perform image-wise offset correction for data in shared memory

        :param first: Index of the first image to be corrected
        :param last: Index of the last image to be corrected
        :param i_proc: Index of shared memory array to process
        """
        module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
        data = self.shared_dict[i_proc]['data'][first:last]
        rawgain = self.shared_dict[i_proc]['rawgain'][first:last]
        gain = self.shared_dict[i_proc]['gain'][first:last]
        cellid = self.shared_dict[i_proc]['cellId'][first:last]

        if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
            # first evaluate the gain into 0, 1, 2 --> high, medium, low
            t0 = self.thresholds[module_idx][0]
            t1 = self.thresholds[module_idx][1]

            # This correction has not been used for years.
            # TODO: decide about removing it.
            if self.corr_bools.get("melt_snow"):
                # load raw_data and rgain to be used during gain_correction
                self.shared_dict[i_proc]["t0_rgain"][first:last] = rawgain / t0[cellid, ...]  # noqa
                self.shared_dict[i_proc]["raw_data"][first:last] = np.copy(data)  # noqa

            # Often most pixels are in high-gain, so it's more efficient to
            # set the whole output block to zero than select the right pixels.
            gain[:] = 0
            # exceeding first threshold means data is medium or low gain
            gain[rawgain > t0[cellid, ...]] = 1
            # exceeding also second threshold means data is low gain
            gain[rawgain > t1[cellid, ...]] = 2
        else:
            # the enum values map 1, 2, 3 to (fixed) gain modes
            gain[:] = self.gain_mode - 1

        offsetb = self.offset[module_idx][:, cellid]

        # force into high or medium gain if requested
        if self.corr_bools.get("force_mg_if_below"):
            gain[(gain == 2) & ((data - offsetb[1]) < self.mg_hard_threshold)] = 1  # noqa

        if self.corr_bools.get("force_hg_if_below"):
            gain[(gain > 0) & ((data - offsetb[0]) < self.hg_hard_threshold)] = 0  # noqa

        # choose constants according to gain setting
        off = calgs.gain_choose(gain, offsetb)
        del offsetb

        # subtract offset
        data -= off
        del off

    def baseline_correction(self, i_proc: int, first: int, last: int):
        """
        Perform image-wise base-line shift correction for
        data in shared memory via histogram or stripe

        :param first: Index of the first image to be corrected
        :param last: Index of the last image to be corrected
        :param i_proc: Index of shared memory array to process
        """

        # before doing relative gain correction we need to evaluate any
        # baseline shifts
        # as they are effectively and additional offset in the data
        if not any(self.blc_bools):
            return

        module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
        data = self.shared_dict[i_proc]['data'][first:last]
        bl_shift = self.shared_dict[i_proc]['blShift'][first:last]
        gain = self.shared_dict[i_proc]['gain'][first:last]
        cellid = self.shared_dict[i_proc]['cellId'][first:last]
        # output is saved in sharedmem to pass for correct_agipd()
        # as this function takes about 3 seconds.
        self.shared_dict[i_proc]["msk"][first:last] = calgs.gain_choose(
            gain, self.mask[module_idx][:, cellid]
        )

        if hasattr(self, "rel_gain"):
            # Get the correct rel_gain depending on cell-id
            self.shared_dict[i_proc]['rel_corr'][first:last] = \
                calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid])

        # do this image wise, as the shift is per image
        for i in range(data.shape[0]):

            # first correction requested may be to evaluate shift via
            # noise peak
            if self.corr_bools.get('blc_noise'):
                mn_noise = np.nanmean(self.noise[module_idx][0, cellid[i]])
                dd, sh = baseline_correct_via_noise(data[i], mn_noise,
                                                    gain[i],
                                                    self.baseline_corr_noise_threshold)  # noqa
            # if not we continue with initial data
            else:
                dd = data[i]
                sh = 0

            # if we have enough pixels in medium or low gain and
            # correction via hist matching is requested to this now
            gcrit = np.count_nonzero(gain[i] > 0) > 1000
            # blc_hmatch correction has not been used for years.
            # TODO: decide about removing it.
            if (gcrit and self.corr_bools.get('blc_hmatch') and
                    hasattr(self, "rel_gain")):
                dd2, sh2 = correct_baseline_via_hist(data[i],
                                                     self.shared_dict[i_proc]['rel_corr'][first:last][i],  # noqa
                                                     gain[i])
                data[i] = np.maximum(dd, dd2)
                sh = np.minimum(sh, sh2)
                # finally correct diagonal effects if requested
                # This correction has not been used for years.
                # TODO: decide about removing it.
                if self.corr_bools.get('corr_asic_diag'):
                    ii = data[i, ...]
                    gg = gain[i, ...]
                    adim = correct_baseline_via_hist_asic(ii, gg)
                    data[i, ...] = adim
            # if there is not enough medium or low gain data to do an
            # evaluation, do nothing
            else:
                data[i, ...] = dd
                bl_shift[i] = sh

            if self.corr_bools.get('blc_stripes'):
                fmh = self.frac_high_med[module_idx][cellid[i]]
                dd, sh = baseline_correct_via_stripe(data[i, ...],
                                                     gain[i, ...],
                                                     self.shared_dict[i_proc]['msk'][first:last][i, ...],  # noqa
                                                     fmh)
                data[i, ...] = dd
                bl_shift[i] = sh

    def gain_correction(self, i_proc: int, first: int, last: int):
        """
        Perform several image-wise corrections for data in shared memory
        e.g. Relative gain, FlatField xray correction, .....

        :param first: Index of the first image to be corrected
        :param last: Index of the last image to be corrected
        :param i_proc: Index of shared memory array to process
        """
        module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
        data = self.shared_dict[i_proc]['data'][first:last]
        gain = self.shared_dict[i_proc]['gain'][first:last]
        cellid = self.shared_dict[i_proc]['cellId'][first:last]
        mask = self.shared_dict[i_proc]['mask'][first:last]
        rel_corr = self.shared_dict[i_proc]['rel_corr'][first:last]
        msk = self.shared_dict[i_proc]['msk'][first:last]
        # if baseline correction was not requested
        # msk and rel_corr will still be empty shared_mem arrays
        if not any(self.blc_bools):
            msk = calgs.gain_choose(gain, self.mask[module_idx][:, cellid])

            # same for relative gain and then bad pixel mask
            if hasattr(self, "rel_gain"):
                # Get the correct rel_gain depending on cell-id
                rel_corr = calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid])  # noqa

        # Correct for relative gain
        if (
            self.corr_bools.get("pc_corr") or self.corr_bools.get("cs_corr")
            ) and hasattr(self, "rel_gain"):
            data *= rel_corr
            del rel_corr

        # Adjust medium gain baseline to match highest high gain value
        if self.corr_bools.get("adjust_mg_baseline"):
                mgbc = self.md_additional_offset[module_idx][cellid, ...]
                data[gain == 1] += mgbc[gain == 1]
                del mgbc

        # Set negative values for medium gain to 0
        # TODO: Probably it would be better to add it to badpixel maps,
        # not just set to 0
        if self.corr_bools.get('blc_set_min'):
            data[(data < 0) & (gain == 1)] = 0

        # Do xray correction if requested
        # The slopes we have in our constants are already relative
        # slopeFF = slopeFFpix/avarege(slopeFFpix)
        # To apply them we have to / not *
        if self.corr_bools.get("xray_corr"):
            data /= self.xray_cor[module_idx][cellid, ...]

        # use sharedmem raw_data and t0_rgain
        # after calculating it while offset correcting.
        if self.corr_bools.get('melt_snow'):
            _ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last],  # noqa
                                  data, gain,
                                  self.shared_dict[i_proc]['t0_rgain'][first:last],  # noqa
                                  self.snow_resolution)

        # Inner ASIC borders are matched to the same signal level
        if self.corr_bools.get("match_asics"):
            data = match_asic_borders(data, 8, module_idx)

        # Add any non-finite values to the mask, zero them
        if self.corr_bools.get("zero_nans"):
            bidx = ~np.isfinite(data)
            data[bidx] = 0
            msk[bidx] |= BadPixels.VALUE_IS_NAN
            del bidx

        # Add pixels with unrealistically high and low values to the mask.
        # Zero them.
        if self.corr_bools.get("zero_orange"):
            bidx = (data < -1e7) | (data > 1e7)
            data[bidx] = 0
            msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
            del bidx

        # Round keV-normalized intensity to photons.
        if self.corr_bools.get("round_photons"):
            data_hist_preround, _ = np.histogram(data, bins=self.hist_bins_preround)

            data /= self.photon_energy

            # keep the noise peak symmetrical so that
            # the expected value of zero remains unshifted
            bidx = data < -self.rounding_threshold
            msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE

            np.subtract(data, self.rounding_threshold - 0.5, out=data, where=~bidx)
            np.round(data, out=data)

            # the interval of the noise peak may be greater than one,
            # which is why some of the noise values may be negative after rounding,
            # but should be not masked
            data[data < 0.0] = 0.0
            del bidx

            data_hist_postround, _ = np.histogram(data * self.photon_energy,
                                                  bins=self.hist_bins_postround)

            with self.hist_lock:
                self.shared_hist_preround += data_hist_preround
                self.shared_hist_postround += data_hist_postround

        if np.issubdtype(self.recast_image_fields.get('image'), np.integer):
            # If the image data is meant to be recast to an integer
            # type, make sure its values are within its bounds.

            type_info = np.iinfo(self.recast_image_data['image'])

            bidx = data < type_info.min
            data[bidx] = 0
            msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
            del bidx

            bidx = data > type_info.max
            data[bidx] = type_info.max
            msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
            del bidx

        # Mask entire ADC if they are noise above a threshold
        # TODO: Needs clarification if needed,
        # the returned arg is not used.
        if self.corr_bools.get("mask_noisy_adc"):
            _ = make_noisy_adc_mask(msk,
                                    self.noisy_adc_threshold)

        # Copy the data across into the existing shared-memory array
        mask[...] = msk[...]

    def get_valid_image_idx(self, im_dc: DataCollection) -> list:  # noqa
        """Return a list of valid train ids.

        Exclude non-valid train ids from past or future.
        """
        dc_trains = im_dc.train_ids
        if len(dc_trains) == 0:
            return []  # No trains to validate.
        # Check against train ID filter list, if any
        if self.train_ids is not None:
            valid = np.in1d(dc_trains, self.train_ids)
        else:
            valid = np.ones_like(dc_trains, dtype=bool)

        # Train indices are of type=f32
        # Validate that train indices values fall
        # between medianTrain +- 1e4
        medianTrain = np.nanmedian(dc_trains)
        lowok = (dc_trains > medianTrain - 1e4)
        highok = (dc_trains < medianTrain + 1e4)
        valid &= lowok & highok

        # exclude non valid trains
        valid_trains = valid * dc_trains

        return list(valid_trains[valid_trains != 0])

    def apply_selected_pulses(self, i_proc: int) -> int:
        """Select sharedmem data indices to correct based on selected
        pulses indices.

        :param i_proc: the index of sharedmem for a given file/module
        :return n_img: number of images to correct
        """
        data_dict = self.shared_dict[i_proc]
        n_img = data_dict['nImg'][0]

        if not data_dict["cm_presel"][0] or n_img == 0:
            return n_img

        ntrains = data_dict["n_valid_trains"][0]
        train_ids = data_dict["valid_trains"][:ntrains]
        nimg_in_trains = data_dict["nimg_in_trains"][:ntrains]
        cellid = data_dict["cellId"][:n_img]

        # Initializing can_calibrate array
        can_calibrate, nimg_in_trains = self.cell_sel.get_cells_on_trains(
            train_ids, nimg_in_trains, cellid, cm=self.cell_sel.CM_FINSEL
        )
        data_dict["nimg_in_trains"][:ntrains] = nimg_in_trains
        if np.all(can_calibrate):
            return n_img

        # Get selected number of images based on
        # selected pulses to correct
        n_img_sel = np.count_nonzero(can_calibrate)

        # Only select data corresponding to selected pulses
        # and overwrite data in shared-memory leaving
        # the required indices to correct
        array_names = [
            "data", "rawgain", "cellId", "pulseId", "trainId", "gain"]

        # if AGIPD in fixed gain mode or melting snow was not requested
        # `t0_rgain` and `raw_data` will be empty shared_mem arrays
        is_adaptive = self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN
        melt_snow = self.corr_bools.get("melt_snow")
        if (is_adaptive and melt_snow):
            array_names += ["t0_rgain", "raw_data"]

        # if baseline correction was not requested
        # `msk` and `rel_corr` will still be empty shared_mem arrays
        if any(self.blc_bools):
            array_names += ["blShift", "msk"]
            if hasattr(self, "rel_gain"):
                array_names.append("rel_corr")

        for name in array_names:
            arr = data_dict[name][:n_img][can_calibrate]
            data_dict[name][:n_img_sel] = arr

        # Overwrite number of images
        data_dict['nImg'][0] = n_img_sel

        return n_img_sel

    def copy_and_sanitize_non_cal_data(self, infile, outfile, agipd_base,
                                       idx_base, trains):
        """ Copy and sanitize data in `infile` that is not touched by
        `correctAGIPD`
        """
        # these are touched in the correct function, do not copy them here
        dont_copy = ["data", "cellId", "trainId", "pulseId", "status",
                     "length"]
        dont_copy = [posixpath.join(agipd_base, "image", ds)
                     for ds in dont_copy]

        # don't copy index as we may need to adjust if we filter trains
        dont_copy.append(posixpath.join(idx_base, "image"))

        h5_copy_except_paths(infile, outfile, dont_copy)

        # sanitize indices
        for do in ["image", ]:
            # uq: INDEX/trainID
            # fidxv: INDEX/.../image/first idx values
            # cntsv: INDEX/.../image/counts values

            # Extract parameters through identifying
            # unique trains, index and numbers.
            uq, fidxv, cntsv = np.unique(trains, return_index=True, return_counts=True)  # noqa

            # Validate calculated CORR INDEX contents by checking
            # difference between trainId stored in RAW data and trains from
            train_diff = np.isin(np.array(infile["/INDEX/trainId"]), uq, invert=True)  # noqa

            # Insert zeros for missing trains.
            # fidxv and cntsv should have same length as
            # raw INDEX/.../image/first and INDEX/.../image/count,
            # respectively

            # first_inc = first incrementation
            first_inc = True
            for i, diff in enumerate(train_diff):
                if diff:
                    if i < len(cntsv):
                        cntsv = np.insert(cntsv, i, 0)
                        fidxv = np.insert(fidxv, i, 0) if i == 0 else np.insert(fidxv, i, fidxv[i])
                    else:
                        # append if at the end of the array
                        cntsv = np.append(cntsv, 0)
                        # increment fidxv once with the
                        # no. of processed mem-cells.
                        if first_inc:
                            fidxv = np.append(fidxv,
                                              (2 * fidxv[i-1]) - fidxv[i-2])
                            first_inc = False
                        else:
                            fidxv = np.append(fidxv, fidxv[i-1])

            # save INDEX contents (first, count) in CORR files
            outfile.create_dataset(idx_base + "{}/first".format(do),
                                   fidxv.shape,
                                   dtype=fidxv.dtype,
                                   data=fidxv,
                                   fletcher32=True)
            outfile.create_dataset(idx_base + "{}/count".format(do),
                                   cntsv.shape,
                                   dtype=cntsv.dtype,
                                   data=cntsv,
                                   fletcher32=True)

    def init_constants(
        self, cons_data: dict, module_idx: int, variant: dict):
        """
        For CI derived gain, a mean multiplication factor of 4.48 compared
        to medium gain is used, as no reliable CI data for all memory cells
        exists of the current AGIPD instances.

        Relative gain is derived both from pulse capacitor as well as low
        intensity flat field data, information from flat field data is
        needed to 'calibrate' pulse capacitor data, if there is no
        available FF data, relative gain for High Gain stage is set to 1:

        * Relative gain for High gain stage - from the FF data we get
          the relative slopes of a given pixel and memory cells with
          respect to all memory cells and all pixels in the module,
          Please note: Current slopesFF avaialble in calibibration
          constants are created per pixel only, not per memory cell:


             rel_high_gain = 1 if only PC data is available
             rel_high_gain = rel_slopesFF if FF data is also available

        * Relative gain for Medium gain stage: we derive the factor
          between high and medium gain using slope information from
          fits to the linear part of high and medium gain:

             rfpc_high_medium = m_h/m_m

          where m_h and m_m is the medium gain slope of given memory cells
          and pixel and m_h is the high gain slope as above
           rel_gain_medium = rel_high_gain * rfpc_high_medium

        With this data the relative gain for the three gain stages evaluates
        to:

            rel_high gain = 1 or rel_slopesFF
            rel_medium gain = rel_high_gain * rfpc_high_medium
            rel_low gain = _rel_medium gain * 4.48

        :param cons_data: A dictionary for each retrieved constant value.
        :param module_idx: A module_idx index
        :param variant: A dictionary for the variant of each retrieved CCV.
        :return:
        """

        # Distribute threads for transposition evenly across all modules
        # assuming this method runs in parallel.
        calgs_opts = dict(num_threads=os.cpu_count() // len(self.offset))

        calgs.transpose_constant(
            self.offset[module_idx], cons_data["Offset"], **calgs_opts)

        # In case noise wasn't retrieved no need for transposing.
        if "Noise" in cons_data:
            calgs.transpose_constant(
                self.noise[module_idx], cons_data["Noise"], **calgs_opts)

        if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
            calgs.transpose_constant(self.thresholds[module_idx],
                                     cons_data["ThresholdsDark"][..., :3],
                                     **calgs_opts)

        if self.corr_bools.get("low_medium_gap"):
            t0 = self.thresholds[module_idx][0]
            t1 = self.thresholds[module_idx][1]
            t1[t1 <= 1.05 * t0] = np.iinfo(np.uint16).max

        bpixels = cons_data["BadPixelsDark"].astype(np.uint32)

        if self.corr_bools.get("xray_corr"):
            if "BadPixelsFF" in cons_data:
                bpixels |= cons_data["BadPixelsFF"].astype(np.uint32)[...,
                                                                      :bpixels.shape[2],  # noqa
                                                                      None]

            if "SlopesFF" in cons_data:  # Checking if constant was retrieved

                slopesFF = cons_data["SlopesFF"]
                # This could be used for backward compatibility
                # for very old SlopesFF constants
                if len(slopesFF.shape) == 4:
                    slopesFF = slopesFF[..., 0]
                # This is for backward compatability for old FF constants
                # (128, 512, mem_cells)
                if slopesFF.shape[-1] == 2:
                    xray_cor = np.squeeze(slopesFF[..., 0])
                    xray_cor_med = np.nanmedian(xray_cor)
                    xray_cor[np.isnan(xray_cor)] = xray_cor_med
                    xray_cor[(xray_cor < 0.8) | (
                        xray_cor > 1.2)] = xray_cor_med
                    xray_cor = np.dstack([xray_cor]*self.max_cells)
                else:
                    # Memory cell resolved xray_cor correction
                    xray_cor = slopesFF  # (128, 512, mem_cells)
                    if xray_cor.shape[-1] < self.max_cells:
                        # When working with new constant with fewer memory
                        # cells, eg. lacking enough FF data or during
                        # development, xray_cor must be expand its last memory
                        # cell to maintain a consistent shape.
                        xray_cor = np.dstack(xray_cor,
                                             np.dstack([xray_cor[..., -1]]
                                             * (self.max_cells - xray_cor.shape[-1])))  # noqa
                    elif xray_cor.shape[-1] > self.max_cells:
                        xray_cor = xray_cor[..., :self.max_cells]
                    # This is already done for old constants,
                    # but new constant is absolute and we need to have
                    # global ADU output for the moment
                    xray_cor /= self.ff_gain
            else:
                xray_cor = np.ones((128, 512, self.max_cells), np.float32)

            self.xray_cor[module_idx][...] = xray_cor.transpose()[...]

        # add additional bad pixel information
        if any(self.relgain_bools):
            for rg in ["CS", "PC"]:
                if f"BadPixels{rg}" in cons_data:
                    bp_relgain = np.moveaxis(
                        cons_data[f"BadPixels{rg}"].astype(np.uint32), 0, 2)
                    bpixels |= bp_relgain[..., :bpixels.shape[2], None]

            # calculate relative gain from the constants
            rel_gain = np.ones((128, 512, self.max_cells, 3), np.float32)

            # Either SlopesCS or SlopesPC is applied.
            if "SlopesCS" in cons_data:
                rel_gain[..., 1] = rel_gain[..., 0] * cons_data["SlopesCS"][..., :self.max_cells, 6]  # noqa
                rel_gain[..., 2] = rel_gain[..., 1] * cons_data["SlopesCS"][..., :self.max_cells, 7]  # noqa
                frac_high_med = np.median(
                    cons_data["SlopesCS"][..., :self.max_cells, 6])

                md_additional_offset = np.full(
                    (128, 512, self.max_cells),
                    fill_value=self.cs_mg_adjust,
                    dtype=np.float32)

            elif "SlopesPC" in cons_data:
                slopesPC = cons_data["SlopesPC"].astype(np.float32, copy=False)

                # This will handle some historical data in a different format
                # constant dimension injected first
                if slopesPC.shape[0] in [10, 11]:
                    slopesPC = np.moveaxis(slopesPC, 0, 3)
                    slopesPC = np.moveaxis(slopesPC, 0, 2)

                (
                    pc_high_m, pc_med_m, pc_high_l,
                    pc_med_l, pc_high_med, pc_med_med
                ) = get_gain_pc_slopes(
                    slopes_pc=slopesPC,
                    mem_cells=self.max_cells,
                    variant=variant
                )

                # ration between HG and MG per pixel per mem cell used
                # for rel gain calculation
                frac_high_med_pix = pc_high_m / pc_med_m
                # average ratio between HG and MG as a function of
                # mem cell (needed for bls_stripes)
                # TODO: Per pixel would be more optimal correction
                frac_high_med = pc_high_med / pc_med_med
                # calculate additional medium-gain offset
                md_additional_offset = pc_high_l - pc_med_l * pc_high_m / pc_med_m  # noqa

                # Calculate relative gain. If FF constants are available,
                # use them for high gain
                # if not rel_gain is calculated using PC data only
                # if self.corr_bools.get("xray_corr"):
                #     rel_gain[..., :self.max_cells, 0] /= xray_corr

                # PC data should be 'calibrated with X-ray data,
                # if it is not done, it is better to use 1 instead of bias
                # the results with PC arteffacts.
                # rel_gain[..., 0] = 1./(pc_high_m / pc_high_ave)
                rel_gain[..., 1] = rel_gain[..., 0] * frac_high_med_pix
                rel_gain[..., 2] = rel_gain[..., 1] * 4.48
            else:
                # Intialize with fake calculated parameters of Ones and Zeros
                md_additional_offset = np.zeros((128, 512, self.max_cells), np.float32)
                frac_high_med = np.ones((self.max_cells,), np.float32)

            self.md_additional_offset[module_idx][...] = md_additional_offset.transpose()[...]  # noqa
            calgs.transpose_constant(self.rel_gain[module_idx], rel_gain, **calgs_opts)
            self.frac_high_med[module_idx][...] = frac_high_med

        calgs.transpose_constant(self.mask[module_idx], bpixels, **calgs_opts)

        return

    def allocate_constants(self, modules, constant_shape):
        """
        Allocate memory for correction constants

        :param modules: Module indices
        :param constant_shape: Shape of expected constants (gain, cells, x, y)
        """
        for module_idx in modules:
            self.offset[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa
            if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
                self.thresholds[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa
            self.noise[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa

            self.md_additional_offset[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4")  # noqa
            self.rel_gain[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa
            self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], dtype="f4")  # noqa

            self.mask[module_idx] = sharedmem.empty(constant_shape, dtype="i4")
            self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4")  # noqa

    def allocate_images(self, shape, n_cores_files):
        """
        Allocate memory for image data and variable shared
        between correction functions

        :param shape: Shape of expected data (nImg, x, y)
        :param n_cores_files: Number of files, handled in parallel
        """
        self.shared_dict = []
        for i in range(n_cores_files):
            self.shared_dict.append({})
            self.shared_dict[i]["cellId"] = sharedmem.empty(shape[0], dtype="u2")  # noqa
            self.shared_dict[i]["pulseId"] = sharedmem.empty(shape[0], dtype="u8")  # noqa
            self.shared_dict[i]["trainId"] = sharedmem.empty(shape[0], dtype="u8")  # noqa
            self.shared_dict[i]["moduleIdx"] = sharedmem.empty(1, dtype="i4")
            self.shared_dict[i]["nImg"] = sharedmem.empty(1, dtype="i4")
            self.shared_dict[i]["mask"] = sharedmem.empty(shape, dtype="u4")
            self.shared_dict[i]["data"] = sharedmem.empty(shape, dtype="f4")
            self.shared_dict[i]["rawgain"] = sharedmem.empty(shape, dtype="u2")
            self.shared_dict[i]["gain"] = sharedmem.empty(shape, dtype="u1")
            self.shared_dict[i]["blShift"] = sharedmem.empty(shape[0], dtype="f4")  # noqa
            # Parameters shared between image-wise correction functions
            self.shared_dict[i]["msk"] = sharedmem.empty(shape, dtype="i4")
            self.shared_dict[i]["raw_data"] = sharedmem.empty(shape, dtype="f4")  # noqa
            self.shared_dict[i]["rel_corr"] = sharedmem.empty(shape, dtype="f4")  # noqa
            self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2")  # noqa
            # Valid trains
            self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b")
            self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4")  # noqa
            self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8")  # noqa
            self.shared_dict[i]["nimg_in_trains"] = sharedmem.empty(1024, dtype="i8")  # noqa

        if self.corr_bools.get("round_photons"):
            self.shared_hist_preround = sharedmem.empty(len(self.hist_bins_preround) - 1, dtype="i8")
            self.shared_hist_postround = sharedmem.empty(len(self.hist_bins_postround) - 1, dtype="i8")

__init__(max_cells, cell_sel, h5_data_path='SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/', h5_index_path='SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/', corr_bools=None, gain_mode=AgipdGainMode.ADAPTIVE_GAIN, comp_threads=1, train_ids=None)

Initialize an AgipdCorrections Class

:param max_cells: maximum number of memory cells to handle, e.g. if calibration constants only exist for a subset of cells :param cell_sel: the CellSelection indicates cells selected for calibration :param h5_data_path: path in HDF5 file which is prefixed to the image/data section :param h5_index_path: path in HDF5 file which is prefixed to the index section :param corr_bools: A dict with all of the correction booleans requested or available :param comp_threads: Number of threads to use for compressing gain/mask :param train_ids: train IDs to process, all if omitted.

The following example shows a typical use case: .. code-block:: python

cell_sel = CellRange(max_pulses, max_cells)
agipd_corr = AgipdCorrections(max_cells, cell_sel,
                              h5_data_path=h5path,
                              h5_index_path=h5path_idx,
                              corr_bools=corr_bools)

agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128))

metadata = cal_tools.tools.CalibrationMetadata(out_folder)
const_yaml = metadata["retrieved-constants"]

for mod in modules:
    agipd_corr.initialize_from_yaml(karabo_da, const_yaml, mod)

data_shape = (n_images_max, 512, 128)
agipd_corr.allocate_images(data_shape, n_cores_files)

i_proc=0
agipd_corr.read_file(i_proc, file_name, apply_sel_pulses)

# Correct first 1000 images
agipd_corr.correct_agipd(i_proc, first=0, last=1000)

agipd_corr.write_file(i_proc, file_name, ofile_name)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def __init__(
    self,
    max_cells: int,
    cell_sel: CellSelection,
    h5_data_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/",
    h5_index_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/",
    corr_bools: Optional[dict] = None,
    gain_mode: AgipdGainMode = AgipdGainMode.ADAPTIVE_GAIN,
    comp_threads: int = 1,
    train_ids: Optional[np.ndarray] = None
):
    """
    Initialize an AgipdCorrections Class

    :param max_cells: maximum number of memory cells to handle, e.g. if
                     calibration constants only exist for a subset of cells
    :param cell_sel: the CellSelection indicates cells selected for
        calibration
    :param h5_data_path: path in HDF5 file which is prefixed to the
        image/data section
    :param h5_index_path: path in HDF5 file which is prefixed to the
        index section
    :param corr_bools: A dict with all of the correction booleans requested
                       or available
    :param comp_threads: Number of threads to use for compressing gain/mask
    :param train_ids: train IDs to process, all if omitted.

    The following example shows a typical use case:
    .. code-block:: python

        cell_sel = CellRange(max_pulses, max_cells)
        agipd_corr = AgipdCorrections(max_cells, cell_sel,
                                      h5_data_path=h5path,
                                      h5_index_path=h5path_idx,
                                      corr_bools=corr_bools)

        agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128))

        metadata = cal_tools.tools.CalibrationMetadata(out_folder)
        const_yaml = metadata["retrieved-constants"]

        for mod in modules:
            agipd_corr.initialize_from_yaml(karabo_da, const_yaml, mod)

        data_shape = (n_images_max, 512, 128)
        agipd_corr.allocate_images(data_shape, n_cores_files)

        i_proc=0
        agipd_corr.read_file(i_proc, file_name, apply_sel_pulses)

        # Correct first 1000 images
        agipd_corr.correct_agipd(i_proc, first=0, last=1000)

        agipd_corr.write_file(i_proc, file_name, ofile_name)

    """

    if corr_bools is None:
        corr_bools = {}

    # Data description
    self.h5_data_path = h5_data_path
    self.h5_index_path = h5_index_path

    self.max_cells = max_cells
    self.gain_mode = gain_mode
    self.comp_threads = comp_threads
    self.train_ids = np.array(train_ids) if train_ids is not None else None

    self.cell_sel = cell_sel

    # Correction parameters
    self.baseline_corr_noise_threshold = -1000
    self.snow_resolution = SnowResolution.INTERPOLATE
    self.cm_dark_fraction = 0.66
    self.cm_dark_min = -25.
    self.cm_dark_max = 25.
    self.cm_n_itr = 4
    self.mg_hard_threshold = 100
    self.hg_hard_threshold = 100
    self.noisy_adc_threshold = 0.25
    self.ff_gain = 1
    self.photon_energy = 9.2
    self.rounding_threshold = 0.5
    self.cs_mg_adjust = 7e3

    # Output parameters
    self.compress_fields = ['gain', 'mask']
    self.recast_image_fields = {}

    # Shared variables for data and constants
    self.shared_dict = []
    self.offset = {}
    self.noise = {}
    self.thresholds = {}
    self.frac_high_med = {}
    self.md_additional_offset = {}
    self.rel_gain = {}
    self.mask = {}
    self.xray_cor = {}

    if corr_bools.get('round_photons'):
        # Variables related to histogramming.
        self.shared_hist_preround = None
        self.shared_hist_postround = None
        self.hist_bins_preround = np.linspace(-5.0, 60.0, 200)
        self.hist_bins_postround = np.arange(-5.5, 60.1)
        self.hist_lock = Manager().Lock()

    # check if given corr_bools are correct
    tot_corr_bools = ['only_offset', 'adjust_mg_baseline', 'pc_corr',
                      'cs_corr', 'xray_corr', 'blc_noise', 'blc_hmatch',
                      'blc_stripes', 'blc_set_min', 'match_asics',
                      'corr_asic_diag', 'zero_nans',
                      'zero_orange', 'force_hg_if_below',
                      'force_mg_if_below', 'mask_noisy_adc',
                      'melt_snow', 'common_mode', 'mask_zero_std',
                      'low_medium_gap', 'round_photons']

    if set(corr_bools).issubset(tot_corr_bools):
        self.corr_bools = corr_bools
    else:
        bools = list(set(corr_bools) - set(tot_corr_bools))
        raise Exception('Correction Booleans: '
                        f'{bools} are not available!')

    # Flags allowing for pulse capacitor / current source constant retrieval.
    self.relgain_bools = [
        self.corr_bools.get("pc_corr"),
        self.corr_bools.get("cs_corr"),
        # This correction is disabled if cs_corr == True
        self.corr_bools.get("adjust_mg_baseline"),
        self.corr_bools.get('blc_noise'),
        self.corr_bools.get('blc_hmatch'),
        self.corr_bools.get('blc_stripes'),
        self.corr_bools.get('melt_snow')
    ]

    self.blc_bools = [self.corr_bools.get('blc_noise'),
                      self.corr_bools.get('blc_hmatch'),
                      self.corr_bools.get('blc_stripes')]

allocate_constants(modules, constant_shape)

Allocate memory for correction constants

:param modules: Module indices :param constant_shape: Shape of expected constants (gain, cells, x, y)

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def allocate_constants(self, modules, constant_shape):
    """
    Allocate memory for correction constants

    :param modules: Module indices
    :param constant_shape: Shape of expected constants (gain, cells, x, y)
    """
    for module_idx in modules:
        self.offset[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa
        if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
            self.thresholds[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa
        self.noise[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa

        self.md_additional_offset[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4")  # noqa
        self.rel_gain[module_idx] = sharedmem.empty(constant_shape, dtype="f4")  # noqa
        self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], dtype="f4")  # noqa

        self.mask[module_idx] = sharedmem.empty(constant_shape, dtype="i4")
        self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4")  # noqa

allocate_images(shape, n_cores_files)

Allocate memory for image data and variable shared between correction functions

:param shape: Shape of expected data (nImg, x, y) :param n_cores_files: Number of files, handled in parallel

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def allocate_images(self, shape, n_cores_files):
    """
    Allocate memory for image data and variable shared
    between correction functions

    :param shape: Shape of expected data (nImg, x, y)
    :param n_cores_files: Number of files, handled in parallel
    """
    self.shared_dict = []
    for i in range(n_cores_files):
        self.shared_dict.append({})
        self.shared_dict[i]["cellId"] = sharedmem.empty(shape[0], dtype="u2")  # noqa
        self.shared_dict[i]["pulseId"] = sharedmem.empty(shape[0], dtype="u8")  # noqa
        self.shared_dict[i]["trainId"] = sharedmem.empty(shape[0], dtype="u8")  # noqa
        self.shared_dict[i]["moduleIdx"] = sharedmem.empty(1, dtype="i4")
        self.shared_dict[i]["nImg"] = sharedmem.empty(1, dtype="i4")
        self.shared_dict[i]["mask"] = sharedmem.empty(shape, dtype="u4")
        self.shared_dict[i]["data"] = sharedmem.empty(shape, dtype="f4")
        self.shared_dict[i]["rawgain"] = sharedmem.empty(shape, dtype="u2")
        self.shared_dict[i]["gain"] = sharedmem.empty(shape, dtype="u1")
        self.shared_dict[i]["blShift"] = sharedmem.empty(shape[0], dtype="f4")  # noqa
        # Parameters shared between image-wise correction functions
        self.shared_dict[i]["msk"] = sharedmem.empty(shape, dtype="i4")
        self.shared_dict[i]["raw_data"] = sharedmem.empty(shape, dtype="f4")  # noqa
        self.shared_dict[i]["rel_corr"] = sharedmem.empty(shape, dtype="f4")  # noqa
        self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2")  # noqa
        # Valid trains
        self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b")
        self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4")  # noqa
        self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8")  # noqa
        self.shared_dict[i]["nimg_in_trains"] = sharedmem.empty(1024, dtype="i8")  # noqa

    if self.corr_bools.get("round_photons"):
        self.shared_hist_preround = sharedmem.empty(len(self.hist_bins_preround) - 1, dtype="i8")
        self.shared_hist_postround = sharedmem.empty(len(self.hist_bins_postround) - 1, dtype="i8")

apply_selected_pulses(i_proc)

Select sharedmem data indices to correct based on selected pulses indices.

:param i_proc: the index of sharedmem for a given file/module :return n_img: number of images to correct

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def apply_selected_pulses(self, i_proc: int) -> int:
    """Select sharedmem data indices to correct based on selected
    pulses indices.

    :param i_proc: the index of sharedmem for a given file/module
    :return n_img: number of images to correct
    """
    data_dict = self.shared_dict[i_proc]
    n_img = data_dict['nImg'][0]

    if not data_dict["cm_presel"][0] or n_img == 0:
        return n_img

    ntrains = data_dict["n_valid_trains"][0]
    train_ids = data_dict["valid_trains"][:ntrains]
    nimg_in_trains = data_dict["nimg_in_trains"][:ntrains]
    cellid = data_dict["cellId"][:n_img]

    # Initializing can_calibrate array
    can_calibrate, nimg_in_trains = self.cell_sel.get_cells_on_trains(
        train_ids, nimg_in_trains, cellid, cm=self.cell_sel.CM_FINSEL
    )
    data_dict["nimg_in_trains"][:ntrains] = nimg_in_trains
    if np.all(can_calibrate):
        return n_img

    # Get selected number of images based on
    # selected pulses to correct
    n_img_sel = np.count_nonzero(can_calibrate)

    # Only select data corresponding to selected pulses
    # and overwrite data in shared-memory leaving
    # the required indices to correct
    array_names = [
        "data", "rawgain", "cellId", "pulseId", "trainId", "gain"]

    # if AGIPD in fixed gain mode or melting snow was not requested
    # `t0_rgain` and `raw_data` will be empty shared_mem arrays
    is_adaptive = self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN
    melt_snow = self.corr_bools.get("melt_snow")
    if (is_adaptive and melt_snow):
        array_names += ["t0_rgain", "raw_data"]

    # if baseline correction was not requested
    # `msk` and `rel_corr` will still be empty shared_mem arrays
    if any(self.blc_bools):
        array_names += ["blShift", "msk"]
        if hasattr(self, "rel_gain"):
            array_names.append("rel_corr")

    for name in array_names:
        arr = data_dict[name][:n_img][can_calibrate]
        data_dict[name][:n_img_sel] = arr

    # Overwrite number of images
    data_dict['nImg'][0] = n_img_sel

    return n_img_sel

baseline_correction(i_proc, first, last)

Perform image-wise base-line shift correction for data in shared memory via histogram or stripe

:param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def baseline_correction(self, i_proc: int, first: int, last: int):
    """
    Perform image-wise base-line shift correction for
    data in shared memory via histogram or stripe

    :param first: Index of the first image to be corrected
    :param last: Index of the last image to be corrected
    :param i_proc: Index of shared memory array to process
    """

    # before doing relative gain correction we need to evaluate any
    # baseline shifts
    # as they are effectively and additional offset in the data
    if not any(self.blc_bools):
        return

    module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
    data = self.shared_dict[i_proc]['data'][first:last]
    bl_shift = self.shared_dict[i_proc]['blShift'][first:last]
    gain = self.shared_dict[i_proc]['gain'][first:last]
    cellid = self.shared_dict[i_proc]['cellId'][first:last]
    # output is saved in sharedmem to pass for correct_agipd()
    # as this function takes about 3 seconds.
    self.shared_dict[i_proc]["msk"][first:last] = calgs.gain_choose(
        gain, self.mask[module_idx][:, cellid]
    )

    if hasattr(self, "rel_gain"):
        # Get the correct rel_gain depending on cell-id
        self.shared_dict[i_proc]['rel_corr'][first:last] = \
            calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid])

    # do this image wise, as the shift is per image
    for i in range(data.shape[0]):

        # first correction requested may be to evaluate shift via
        # noise peak
        if self.corr_bools.get('blc_noise'):
            mn_noise = np.nanmean(self.noise[module_idx][0, cellid[i]])
            dd, sh = baseline_correct_via_noise(data[i], mn_noise,
                                                gain[i],
                                                self.baseline_corr_noise_threshold)  # noqa
        # if not we continue with initial data
        else:
            dd = data[i]
            sh = 0

        # if we have enough pixels in medium or low gain and
        # correction via hist matching is requested to this now
        gcrit = np.count_nonzero(gain[i] > 0) > 1000
        # blc_hmatch correction has not been used for years.
        # TODO: decide about removing it.
        if (gcrit and self.corr_bools.get('blc_hmatch') and
                hasattr(self, "rel_gain")):
            dd2, sh2 = correct_baseline_via_hist(data[i],
                                                 self.shared_dict[i_proc]['rel_corr'][first:last][i],  # noqa
                                                 gain[i])
            data[i] = np.maximum(dd, dd2)
            sh = np.minimum(sh, sh2)
            # finally correct diagonal effects if requested
            # This correction has not been used for years.
            # TODO: decide about removing it.
            if self.corr_bools.get('corr_asic_diag'):
                ii = data[i, ...]
                gg = gain[i, ...]
                adim = correct_baseline_via_hist_asic(ii, gg)
                data[i, ...] = adim
        # if there is not enough medium or low gain data to do an
        # evaluation, do nothing
        else:
            data[i, ...] = dd
            bl_shift[i] = sh

        if self.corr_bools.get('blc_stripes'):
            fmh = self.frac_high_med[module_idx][cellid[i]]
            dd, sh = baseline_correct_via_stripe(data[i, ...],
                                                 gain[i, ...],
                                                 self.shared_dict[i_proc]['msk'][first:last][i, ...],  # noqa
                                                 fmh)
            data[i, ...] = dd
            bl_shift[i] = sh

cm_correction(i_proc, asic)

Perform common-mode correction of data in shared memory

In a given code a complete file is loaded to the memory. Asics common mode correction is calculated based on single image. Cell common mode is calculated across trains and groups of 32 cells. Both corrections are iterative and requires 4 iterations.

Correction is performed in chunks of (e.g. 512 images). A complete array of data from one file (256 trains, 352 cells) will take 256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory

:param i_proc: Index of shared memory array to process :param asic: Asic number to process

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def cm_correction(self, i_proc, asic):
    """
    Perform common-mode correction of data in shared memory

    In a given code a complete file is loaded to the memory.
    Asics common mode correction is calculated based on single image.
    Cell common mode is calculated across trains and groups of 32 cells.
    Both corrections are iterative and requires 4 iterations.

    Correction is performed in chunks of (e.g. 512 images).
    A complete array of data from one file
    (256 trains, 352 cells) will take
    256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory

    :param i_proc: Index of shared memory array to process
    :param asic: Asic number to process
    """
    if not self.corr_bools.get("common_mode"):
        return
    dark_min = self.cm_dark_min
    dark_max = self.cm_dark_max
    fraction = self.cm_dark_fraction
    n_itr = self.cm_n_itr
    n_img = self.shared_dict[i_proc]['nImg'][0]
    if n_img == 0:
        return
    cell_id = self.shared_dict[i_proc]['cellId'][:n_img]
    data = self.shared_dict[i_proc]['data'][:n_img]
    data = data.reshape(-1, 8, 64, 2, 64)

    asic_data = data[:, asic % 8, :, asic // 8, :]
    for _ in range(n_itr):
        calgs.cm_correction(
            asic_data, cell_id, dark_min, dark_max, fraction)

copy_and_sanitize_non_cal_data(infile, outfile, agipd_base, idx_base, trains)

Copy and sanitize data in infile that is not touched by correctAGIPD

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def copy_and_sanitize_non_cal_data(self, infile, outfile, agipd_base,
                                   idx_base, trains):
    """ Copy and sanitize data in `infile` that is not touched by
    `correctAGIPD`
    """
    # these are touched in the correct function, do not copy them here
    dont_copy = ["data", "cellId", "trainId", "pulseId", "status",
                 "length"]
    dont_copy = [posixpath.join(agipd_base, "image", ds)
                 for ds in dont_copy]

    # don't copy index as we may need to adjust if we filter trains
    dont_copy.append(posixpath.join(idx_base, "image"))

    h5_copy_except_paths(infile, outfile, dont_copy)

    # sanitize indices
    for do in ["image", ]:
        # uq: INDEX/trainID
        # fidxv: INDEX/.../image/first idx values
        # cntsv: INDEX/.../image/counts values

        # Extract parameters through identifying
        # unique trains, index and numbers.
        uq, fidxv, cntsv = np.unique(trains, return_index=True, return_counts=True)  # noqa

        # Validate calculated CORR INDEX contents by checking
        # difference between trainId stored in RAW data and trains from
        train_diff = np.isin(np.array(infile["/INDEX/trainId"]), uq, invert=True)  # noqa

        # Insert zeros for missing trains.
        # fidxv and cntsv should have same length as
        # raw INDEX/.../image/first and INDEX/.../image/count,
        # respectively

        # first_inc = first incrementation
        first_inc = True
        for i, diff in enumerate(train_diff):
            if diff:
                if i < len(cntsv):
                    cntsv = np.insert(cntsv, i, 0)
                    fidxv = np.insert(fidxv, i, 0) if i == 0 else np.insert(fidxv, i, fidxv[i])
                else:
                    # append if at the end of the array
                    cntsv = np.append(cntsv, 0)
                    # increment fidxv once with the
                    # no. of processed mem-cells.
                    if first_inc:
                        fidxv = np.append(fidxv,
                                          (2 * fidxv[i-1]) - fidxv[i-2])
                        first_inc = False
                    else:
                        fidxv = np.append(fidxv, fidxv[i-1])

        # save INDEX contents (first, count) in CORR files
        outfile.create_dataset(idx_base + "{}/first".format(do),
                               fidxv.shape,
                               dtype=fidxv.dtype,
                               data=fidxv,
                               fletcher32=True)
        outfile.create_dataset(idx_base + "{}/count".format(do),
                               cntsv.shape,
                               dtype=cntsv.dtype,
                               data=cntsv,
                               fletcher32=True)

gain_correction(i_proc, first, last)

Perform several image-wise corrections for data in shared memory e.g. Relative gain, FlatField xray correction, .....

:param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def gain_correction(self, i_proc: int, first: int, last: int):
    """
    Perform several image-wise corrections for data in shared memory
    e.g. Relative gain, FlatField xray correction, .....

    :param first: Index of the first image to be corrected
    :param last: Index of the last image to be corrected
    :param i_proc: Index of shared memory array to process
    """
    module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
    data = self.shared_dict[i_proc]['data'][first:last]
    gain = self.shared_dict[i_proc]['gain'][first:last]
    cellid = self.shared_dict[i_proc]['cellId'][first:last]
    mask = self.shared_dict[i_proc]['mask'][first:last]
    rel_corr = self.shared_dict[i_proc]['rel_corr'][first:last]
    msk = self.shared_dict[i_proc]['msk'][first:last]
    # if baseline correction was not requested
    # msk and rel_corr will still be empty shared_mem arrays
    if not any(self.blc_bools):
        msk = calgs.gain_choose(gain, self.mask[module_idx][:, cellid])

        # same for relative gain and then bad pixel mask
        if hasattr(self, "rel_gain"):
            # Get the correct rel_gain depending on cell-id
            rel_corr = calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid])  # noqa

    # Correct for relative gain
    if (
        self.corr_bools.get("pc_corr") or self.corr_bools.get("cs_corr")
        ) and hasattr(self, "rel_gain"):
        data *= rel_corr
        del rel_corr

    # Adjust medium gain baseline to match highest high gain value
    if self.corr_bools.get("adjust_mg_baseline"):
            mgbc = self.md_additional_offset[module_idx][cellid, ...]
            data[gain == 1] += mgbc[gain == 1]
            del mgbc

    # Set negative values for medium gain to 0
    # TODO: Probably it would be better to add it to badpixel maps,
    # not just set to 0
    if self.corr_bools.get('blc_set_min'):
        data[(data < 0) & (gain == 1)] = 0

    # Do xray correction if requested
    # The slopes we have in our constants are already relative
    # slopeFF = slopeFFpix/avarege(slopeFFpix)
    # To apply them we have to / not *
    if self.corr_bools.get("xray_corr"):
        data /= self.xray_cor[module_idx][cellid, ...]

    # use sharedmem raw_data and t0_rgain
    # after calculating it while offset correcting.
    if self.corr_bools.get('melt_snow'):
        _ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last],  # noqa
                              data, gain,
                              self.shared_dict[i_proc]['t0_rgain'][first:last],  # noqa
                              self.snow_resolution)

    # Inner ASIC borders are matched to the same signal level
    if self.corr_bools.get("match_asics"):
        data = match_asic_borders(data, 8, module_idx)

    # Add any non-finite values to the mask, zero them
    if self.corr_bools.get("zero_nans"):
        bidx = ~np.isfinite(data)
        data[bidx] = 0
        msk[bidx] |= BadPixels.VALUE_IS_NAN
        del bidx

    # Add pixels with unrealistically high and low values to the mask.
    # Zero them.
    if self.corr_bools.get("zero_orange"):
        bidx = (data < -1e7) | (data > 1e7)
        data[bidx] = 0
        msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
        del bidx

    # Round keV-normalized intensity to photons.
    if self.corr_bools.get("round_photons"):
        data_hist_preround, _ = np.histogram(data, bins=self.hist_bins_preround)

        data /= self.photon_energy

        # keep the noise peak symmetrical so that
        # the expected value of zero remains unshifted
        bidx = data < -self.rounding_threshold
        msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE

        np.subtract(data, self.rounding_threshold - 0.5, out=data, where=~bidx)
        np.round(data, out=data)

        # the interval of the noise peak may be greater than one,
        # which is why some of the noise values may be negative after rounding,
        # but should be not masked
        data[data < 0.0] = 0.0
        del bidx

        data_hist_postround, _ = np.histogram(data * self.photon_energy,
                                              bins=self.hist_bins_postround)

        with self.hist_lock:
            self.shared_hist_preround += data_hist_preround
            self.shared_hist_postround += data_hist_postround

    if np.issubdtype(self.recast_image_fields.get('image'), np.integer):
        # If the image data is meant to be recast to an integer
        # type, make sure its values are within its bounds.

        type_info = np.iinfo(self.recast_image_data['image'])

        bidx = data < type_info.min
        data[bidx] = 0
        msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
        del bidx

        bidx = data > type_info.max
        data[bidx] = type_info.max
        msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
        del bidx

    # Mask entire ADC if they are noise above a threshold
    # TODO: Needs clarification if needed,
    # the returned arg is not used.
    if self.corr_bools.get("mask_noisy_adc"):
        _ = make_noisy_adc_mask(msk,
                                self.noisy_adc_threshold)

    # Copy the data across into the existing shared-memory array
    mask[...] = msk[...]

get_valid_image_idx(im_dc)

Return a list of valid train ids.

Exclude non-valid train ids from past or future.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_valid_image_idx(self, im_dc: DataCollection) -> list:  # noqa
    """Return a list of valid train ids.

    Exclude non-valid train ids from past or future.
    """
    dc_trains = im_dc.train_ids
    if len(dc_trains) == 0:
        return []  # No trains to validate.
    # Check against train ID filter list, if any
    if self.train_ids is not None:
        valid = np.in1d(dc_trains, self.train_ids)
    else:
        valid = np.ones_like(dc_trains, dtype=bool)

    # Train indices are of type=f32
    # Validate that train indices values fall
    # between medianTrain +- 1e4
    medianTrain = np.nanmedian(dc_trains)
    lowok = (dc_trains > medianTrain - 1e4)
    highok = (dc_trains < medianTrain + 1e4)
    valid &= lowok & highok

    # exclude non valid trains
    valid_trains = valid * dc_trains

    return list(valid_trains[valid_trains != 0])

init_constants(cons_data, module_idx, variant)

For CI derived gain, a mean multiplication factor of 4.48 compared to medium gain is used, as no reliable CI data for all memory cells exists of the current AGIPD instances.

Relative gain is derived both from pulse capacitor as well as low intensity flat field data, information from flat field data is needed to 'calibrate' pulse capacitor data, if there is no available FF data, relative gain for High Gain stage is set to 1:

  • Relative gain for High gain stage - from the FF data we get the relative slopes of a given pixel and memory cells with respect to all memory cells and all pixels in the module, Please note: Current slopesFF avaialble in calibibration constants are created per pixel only, not per memory cell:

    rel_high_gain = 1 if only PC data is available rel_high_gain = rel_slopesFF if FF data is also available

  • Relative gain for Medium gain stage: we derive the factor between high and medium gain using slope information from fits to the linear part of high and medium gain:

    rfpc_high_medium = m_h/m_m

where m_h and m_m is the medium gain slope of given memory cells and pixel and m_h is the high gain slope as above rel_gain_medium = rel_high_gain * rfpc_high_medium

With this data the relative gain for the three gain stages evaluates

to

rel_high gain = 1 or rel_slopesFF rel_medium gain = rel_high_gain * rfpc_high_medium rel_low gain = _rel_medium gain * 4.48

:param cons_data: A dictionary for each retrieved constant value. :param module_idx: A module_idx index :param variant: A dictionary for the variant of each retrieved CCV. :return:

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def init_constants(
    self, cons_data: dict, module_idx: int, variant: dict):
    """
    For CI derived gain, a mean multiplication factor of 4.48 compared
    to medium gain is used, as no reliable CI data for all memory cells
    exists of the current AGIPD instances.

    Relative gain is derived both from pulse capacitor as well as low
    intensity flat field data, information from flat field data is
    needed to 'calibrate' pulse capacitor data, if there is no
    available FF data, relative gain for High Gain stage is set to 1:

    * Relative gain for High gain stage - from the FF data we get
      the relative slopes of a given pixel and memory cells with
      respect to all memory cells and all pixels in the module,
      Please note: Current slopesFF avaialble in calibibration
      constants are created per pixel only, not per memory cell:


         rel_high_gain = 1 if only PC data is available
         rel_high_gain = rel_slopesFF if FF data is also available

    * Relative gain for Medium gain stage: we derive the factor
      between high and medium gain using slope information from
      fits to the linear part of high and medium gain:

         rfpc_high_medium = m_h/m_m

      where m_h and m_m is the medium gain slope of given memory cells
      and pixel and m_h is the high gain slope as above
       rel_gain_medium = rel_high_gain * rfpc_high_medium

    With this data the relative gain for the three gain stages evaluates
    to:

        rel_high gain = 1 or rel_slopesFF
        rel_medium gain = rel_high_gain * rfpc_high_medium
        rel_low gain = _rel_medium gain * 4.48

    :param cons_data: A dictionary for each retrieved constant value.
    :param module_idx: A module_idx index
    :param variant: A dictionary for the variant of each retrieved CCV.
    :return:
    """

    # Distribute threads for transposition evenly across all modules
    # assuming this method runs in parallel.
    calgs_opts = dict(num_threads=os.cpu_count() // len(self.offset))

    calgs.transpose_constant(
        self.offset[module_idx], cons_data["Offset"], **calgs_opts)

    # In case noise wasn't retrieved no need for transposing.
    if "Noise" in cons_data:
        calgs.transpose_constant(
            self.noise[module_idx], cons_data["Noise"], **calgs_opts)

    if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
        calgs.transpose_constant(self.thresholds[module_idx],
                                 cons_data["ThresholdsDark"][..., :3],
                                 **calgs_opts)

    if self.corr_bools.get("low_medium_gap"):
        t0 = self.thresholds[module_idx][0]
        t1 = self.thresholds[module_idx][1]
        t1[t1 <= 1.05 * t0] = np.iinfo(np.uint16).max

    bpixels = cons_data["BadPixelsDark"].astype(np.uint32)

    if self.corr_bools.get("xray_corr"):
        if "BadPixelsFF" in cons_data:
            bpixels |= cons_data["BadPixelsFF"].astype(np.uint32)[...,
                                                                  :bpixels.shape[2],  # noqa
                                                                  None]

        if "SlopesFF" in cons_data:  # Checking if constant was retrieved

            slopesFF = cons_data["SlopesFF"]
            # This could be used for backward compatibility
            # for very old SlopesFF constants
            if len(slopesFF.shape) == 4:
                slopesFF = slopesFF[..., 0]
            # This is for backward compatability for old FF constants
            # (128, 512, mem_cells)
            if slopesFF.shape[-1] == 2:
                xray_cor = np.squeeze(slopesFF[..., 0])
                xray_cor_med = np.nanmedian(xray_cor)
                xray_cor[np.isnan(xray_cor)] = xray_cor_med
                xray_cor[(xray_cor < 0.8) | (
                    xray_cor > 1.2)] = xray_cor_med
                xray_cor = np.dstack([xray_cor]*self.max_cells)
            else:
                # Memory cell resolved xray_cor correction
                xray_cor = slopesFF  # (128, 512, mem_cells)
                if xray_cor.shape[-1] < self.max_cells:
                    # When working with new constant with fewer memory
                    # cells, eg. lacking enough FF data or during
                    # development, xray_cor must be expand its last memory
                    # cell to maintain a consistent shape.
                    xray_cor = np.dstack(xray_cor,
                                         np.dstack([xray_cor[..., -1]]
                                         * (self.max_cells - xray_cor.shape[-1])))  # noqa
                elif xray_cor.shape[-1] > self.max_cells:
                    xray_cor = xray_cor[..., :self.max_cells]
                # This is already done for old constants,
                # but new constant is absolute and we need to have
                # global ADU output for the moment
                xray_cor /= self.ff_gain
        else:
            xray_cor = np.ones((128, 512, self.max_cells), np.float32)

        self.xray_cor[module_idx][...] = xray_cor.transpose()[...]

    # add additional bad pixel information
    if any(self.relgain_bools):
        for rg in ["CS", "PC"]:
            if f"BadPixels{rg}" in cons_data:
                bp_relgain = np.moveaxis(
                    cons_data[f"BadPixels{rg}"].astype(np.uint32), 0, 2)
                bpixels |= bp_relgain[..., :bpixels.shape[2], None]

        # calculate relative gain from the constants
        rel_gain = np.ones((128, 512, self.max_cells, 3), np.float32)

        # Either SlopesCS or SlopesPC is applied.
        if "SlopesCS" in cons_data:
            rel_gain[..., 1] = rel_gain[..., 0] * cons_data["SlopesCS"][..., :self.max_cells, 6]  # noqa
            rel_gain[..., 2] = rel_gain[..., 1] * cons_data["SlopesCS"][..., :self.max_cells, 7]  # noqa
            frac_high_med = np.median(
                cons_data["SlopesCS"][..., :self.max_cells, 6])

            md_additional_offset = np.full(
                (128, 512, self.max_cells),
                fill_value=self.cs_mg_adjust,
                dtype=np.float32)

        elif "SlopesPC" in cons_data:
            slopesPC = cons_data["SlopesPC"].astype(np.float32, copy=False)

            # This will handle some historical data in a different format
            # constant dimension injected first
            if slopesPC.shape[0] in [10, 11]:
                slopesPC = np.moveaxis(slopesPC, 0, 3)
                slopesPC = np.moveaxis(slopesPC, 0, 2)

            (
                pc_high_m, pc_med_m, pc_high_l,
                pc_med_l, pc_high_med, pc_med_med
            ) = get_gain_pc_slopes(
                slopes_pc=slopesPC,
                mem_cells=self.max_cells,
                variant=variant
            )

            # ration between HG and MG per pixel per mem cell used
            # for rel gain calculation
            frac_high_med_pix = pc_high_m / pc_med_m
            # average ratio between HG and MG as a function of
            # mem cell (needed for bls_stripes)
            # TODO: Per pixel would be more optimal correction
            frac_high_med = pc_high_med / pc_med_med
            # calculate additional medium-gain offset
            md_additional_offset = pc_high_l - pc_med_l * pc_high_m / pc_med_m  # noqa

            # Calculate relative gain. If FF constants are available,
            # use them for high gain
            # if not rel_gain is calculated using PC data only
            # if self.corr_bools.get("xray_corr"):
            #     rel_gain[..., :self.max_cells, 0] /= xray_corr

            # PC data should be 'calibrated with X-ray data,
            # if it is not done, it is better to use 1 instead of bias
            # the results with PC arteffacts.
            # rel_gain[..., 0] = 1./(pc_high_m / pc_high_ave)
            rel_gain[..., 1] = rel_gain[..., 0] * frac_high_med_pix
            rel_gain[..., 2] = rel_gain[..., 1] * 4.48
        else:
            # Intialize with fake calculated parameters of Ones and Zeros
            md_additional_offset = np.zeros((128, 512, self.max_cells), np.float32)
            frac_high_med = np.ones((self.max_cells,), np.float32)

        self.md_additional_offset[module_idx][...] = md_additional_offset.transpose()[...]  # noqa
        calgs.transpose_constant(self.rel_gain[module_idx], rel_gain, **calgs_opts)
        self.frac_high_med[module_idx][...] = frac_high_med

    calgs.transpose_constant(self.mask[module_idx], bpixels, **calgs_opts)

    return

mask_zero_std(i_proc, cells)

Add bad pixel bit: DATA_STD_IS_ZERO to the mask of bad pixels

Pixel is bad if standard deviation for a given pixel and given memory cell is zero

:param i_proc: Index of shared memory array to process :param cells: List of cells to be considered

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def mask_zero_std(self, i_proc, cells):
    """
    Add bad pixel bit: DATA_STD_IS_ZERO to the mask of bad pixels

    Pixel is bad if standard deviation for a given pixel and
    given memory cell is zero

    :param i_proc: Index of shared memory array to process
    :param cells: List of cells to be considered
    """
    if not self.corr_bools.get("mask_zero_std"):
        return

    module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
    n_img = self.shared_dict[i_proc]['nImg'][0]
    data = self.shared_dict[i_proc]['data'][:n_img]
    cellid = self.shared_dict[i_proc]['cellId'][:n_img]
    mask_std = self.mask[module_idx]  # shape of n_cells, x, y

    for c in cells:
        std = np.nanstd(data[cellid == c, ...], axis=0)
        mask_std[:, c, std == 0] |= BadPixels.DATA_STD_IS_ZERO

offset_correction(i_proc, first, last)

Perform image-wise offset correction for data in shared memory

:param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def offset_correction(self, i_proc: int, first: int, last: int):
    """
    Perform image-wise offset correction for data in shared memory

    :param first: Index of the first image to be corrected
    :param last: Index of the last image to be corrected
    :param i_proc: Index of shared memory array to process
    """
    module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
    data = self.shared_dict[i_proc]['data'][first:last]
    rawgain = self.shared_dict[i_proc]['rawgain'][first:last]
    gain = self.shared_dict[i_proc]['gain'][first:last]
    cellid = self.shared_dict[i_proc]['cellId'][first:last]

    if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
        # first evaluate the gain into 0, 1, 2 --> high, medium, low
        t0 = self.thresholds[module_idx][0]
        t1 = self.thresholds[module_idx][1]

        # This correction has not been used for years.
        # TODO: decide about removing it.
        if self.corr_bools.get("melt_snow"):
            # load raw_data and rgain to be used during gain_correction
            self.shared_dict[i_proc]["t0_rgain"][first:last] = rawgain / t0[cellid, ...]  # noqa
            self.shared_dict[i_proc]["raw_data"][first:last] = np.copy(data)  # noqa

        # Often most pixels are in high-gain, so it's more efficient to
        # set the whole output block to zero than select the right pixels.
        gain[:] = 0
        # exceeding first threshold means data is medium or low gain
        gain[rawgain > t0[cellid, ...]] = 1
        # exceeding also second threshold means data is low gain
        gain[rawgain > t1[cellid, ...]] = 2
    else:
        # the enum values map 1, 2, 3 to (fixed) gain modes
        gain[:] = self.gain_mode - 1

    offsetb = self.offset[module_idx][:, cellid]

    # force into high or medium gain if requested
    if self.corr_bools.get("force_mg_if_below"):
        gain[(gain == 2) & ((data - offsetb[1]) < self.mg_hard_threshold)] = 1  # noqa

    if self.corr_bools.get("force_hg_if_below"):
        gain[(gain > 0) & ((data - offsetb[0]) < self.hg_hard_threshold)] = 0  # noqa

    # choose constants according to gain setting
    off = calgs.gain_choose(gain, offsetb)
    del offsetb

    # subtract offset
    data -= off
    del off

read_file(i_proc, file_name, apply_sel_pulses=True)

Read file with raw data to shared memory

:param file_name: Name of input file including path. :param i_proc: Index of shared memory array. :param apply_sel_pulses: apply selected pulses before all corrections. :return: - n_img: The number of images to correct.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def read_file(self, i_proc: int, file_name: str,
              apply_sel_pulses: Optional[bool] = True
              ) -> int:
    """Read file with raw data to shared memory

    :param file_name: Name of input file including path.
    :param i_proc: Index of shared memory array.
    :param apply_sel_pulses: apply selected pulses before
                             all corrections.
    :return:
        - n_img: The number of images to correct.
    """
    module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
    agipd_base = self.h5_data_path.format(module_idx)
    data_dict = self.shared_dict[i_proc]
    data_dict['moduleIdx'][0] = module_idx

    h5_dc = H5File(file_name)

    # Exclude trains without data.
    im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)

    valid_train_ids = self.get_valid_image_idx(
        im_dc[agipd_base, "image.trainId"])

    # filter out trains which will not be selected
    valid_train_ids = self.cell_sel.filter_trains(
        np.array(valid_train_ids)).tolist()

    if not valid_train_ids:
        # If there's not a single valid train, exit early.
        print(f"WARNING: No valid trains for {im_dc.files} to process.")
        data_dict['nImg'][0] = 0
        return 0

    # Exclude non_valid trains from the selected data collection.
    im_dc = im_dc.select_trains(by_id(valid_train_ids))

    # Just want to be sure that order is correct
    valid_train_ids = im_dc.train_ids
    # Get a count of images in each train
    nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
    nimg_in_trains = nimg_in_trains.astype(np.int64)

    # store valid trains in shared memory
    n_valid_trains = len(valid_train_ids)
    data_dict["n_valid_trains"][0] = n_valid_trains
    data_dict["valid_trains"][:n_valid_trains] = valid_train_ids

    # get selection for the images in this file
    cm = (self.cell_sel.CM_NONE if apply_sel_pulses
          else self.cell_sel.CM_PRESEL)

    agipd_src = im_dc[agipd_base]

    cellid = agipd_src["image.cellId"].ndarray()[:, 0]

    img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
        np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)

    data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
    data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)

    n_img = img_selected.sum()
    if img_selected.all():
        # All frames selected - use slice to skip unnecessary copy
        frm_ix = np.s_[:]
    else:
        frm_ix = np.flatnonzero(img_selected)

    # read raw data
    # [n_imgs, 2, x, y]
    raw_data = agipd_src['image.data'].ndarray()

    # store in shmem only selected images
    data_dict['nImg'][0] = n_img
    data_dict['data'][:n_img] = raw_data[frm_ix, 0]
    data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
    data_dict['cellId'][:n_img] = cellid[frm_ix]
    data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0]
    data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0]

    return n_img

write_file(i_proc, file_name, ofile_name)

Create output file and write corrected data to it

:param file_name: Name of input file including path :param ofile_name: Name of output file including path :param i_proc: Index of shared memory array

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def write_file(self, i_proc, file_name, ofile_name):
    """
    Create output file and write corrected data to it

    :param file_name: Name of input file including path
    :param ofile_name: Name of output file including path
    :param i_proc: Index of shared memory array
    """

    module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
    agipd_base = f'INSTRUMENT/{self.h5_data_path}/'.format(module_idx)
    idx_base = self.h5_index_path.format(module_idx)
    data_path = f'{agipd_base}/image'

    # Obtain a shallow copy of the pointer map to allow for local
    # changes in this method.
    data_dict = self.shared_dict[i_proc].copy()

    image_fields = [
        'trainId', 'pulseId', 'cellId', 'data', 'gain', 'mask', 'blShift',
    ]

    n_img = data_dict['nImg'][0]
    if n_img == 0:
        return
    trains = data_dict['trainId'][:n_img]

    # Re-cast fields in-place, i.e. using the same memory region.
    for field, dtype in self.recast_image_fields.items():
        data_dict[field] = cast_array_inplace(data_dict[field], dtype)

    with h5py.File(ofile_name, "w") as outfile:
        # Copy any other data from the input file.
        # This includes indexes, so it's important that the corrected data
        # we write is aligned with the raw data.
        with h5py.File(file_name, "r") as infile:
            self.copy_and_sanitize_non_cal_data(
                infile, outfile, agipd_base, idx_base, trains
            )

        # All corrected data goes in a /INSTRUMENT/.../image group
        image_grp = outfile[data_path]

        # Set up all the datasets before filling them. This puts the
        # metadata about the datasets together at the start of the file,
        # so it's efficient to examine the file structure.
        for field in image_fields:
            arr = data_dict[field][:n_img]
            if field in self.compress_fields:
                # gain/mask compressed with gzip level 1, but not
                # checksummed as we would have to implement this.
                kw = dict(
                    compression='gzip', compression_opts=1, shuffle=True
                )
            else:
                # Uncompressed data can easily be checksummed by HDF5's
                # filter pipeline. This should be cheap to compute.
                kw = {'fletcher32': True}
            if arr.ndim > 1:
                kw['chunks'] = (1,) + arr.shape[1:]  # 1 chunk = 1 image

            image_grp.create_dataset(
                field, shape=arr.shape, dtype=arr.dtype, **kw
            )

        # Write the corrected data
        for field in image_fields:
            if field in self.compress_fields:
                self._write_compressed_frames(
                    image_grp[field], data_dict[field][:n_img],
                )
            else:
                image_grp[field][:] = data_dict[field][:n_img]

AgipdCtrl dataclass

Access AGIPD control parameters from a single run.

Parameters:

Name Type Description Default
run_dc DataCollection

Run data collection with expected sources to read needed parameters.

required
image_src str

H5 source for image data.

required
ctrl_src str

H5 source for control (slow) data.

required
raise_error bool

Boolean to raise errors for missing sources and keys.

False
run

(int, optional): Run number.

required
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
@dataclass
class AgipdCtrl:
    """Access AGIPD control parameters from a single run.

    Args:
        run_dc (DataCollection): Run data collection with expected sources
            to read needed parameters.
        image_src (str): H5 source for image data.
        ctrl_src (str): H5 source for control (slow) data.
        raise_error (bool): Boolean to raise errors for missing
            sources and keys.
        run: (int, optional): Run number.
    """
    run_dc: DataCollection
    image_src: str
    ctrl_src: str
    raise_error: bool = False

    def _get_num_cells_ctrl(self) -> Optional[int]:
        """Get number of cells from CONTROL source."""
        # Attempt to look for number of cells in slow data
        ncell_src = (
            self.ctrl_src, "bunchStructure.nPulses.value")
        if (
            ncell_src[0] in self.run_dc.all_sources and
            ncell_src[1] in self.run_dc.keys_for_source(ncell_src[0])
        ):
            return int(self.run_dc[ncell_src].as_single_value(reduce_by='max'))
        else:
            return None

    def _get_num_cells_instr(self) -> int:
        """Get number of cells from INSTRUMENT source."""
        cells = np.squeeze(
            self.run_dc[
                self.image_src, "image.cellId"].drop_empty_trains().ndarray()
        )
        if cells.shape[0] != 0:
            maxcell = np.max(cells)
            options = [4, 32, 64, 76, 128, 176, 202, 250, 352]
            dists = [abs(o - maxcell) for o in options]
            ncell = options[np.argmin(dists)]
        else:
            raise ValueError(f"No raw images found for {self.image_src}")

        return ncell 

    def get_num_cells(self) -> int:
        """Read number of memory cells from fast data."""
        ncell = self._get_num_cells_ctrl()
        if ncell is None:  # ctrl_src unavailable
            # The method implemented in this function doesn't suit for
            # filtered data. If DAQ filters data and the last cell is removed,
            # the function returns wrong value.
            ncell = self._get_num_cells_instr()

        return ncell 

    def _get_acq_rate_ctrl(self) -> Optional[float]:
        """Get acquisition (repetition) rate from CONTROL source."""
        # Attempt to look for acquisition rate in slow data
        rep_rate_src = (
            self.ctrl_src, "bunchStructure.repetitionRate.value")
        if (
            rep_rate_src[0] in self.run_dc.all_sources and
            rep_rate_src[1] in self.run_dc.keys_for_source(rep_rate_src[0])
        ):
            # It is desired to loose precision here because the usage is
            # about bucketing the rate for managing meta-data.
            return round(float(self.run_dc[rep_rate_src].as_single_value()), 1)

    def _get_acq_rate_instr(self) -> Optional[float]:
        """Get acquisition (repetition rate) from INSTRUMENT source."""

        train_pulses = np.squeeze(
            self.run_dc[
                self.image_src,
                "image.pulseId"].drop_empty_trains().train_from_index(0)[1]
        )

        # Compute acquisition rate from fast data
        diff = train_pulses[1] - train_pulses[0]
        options = {8: 0.5, 4: 1.1, 2: 2.2, 1: 4.5}
        return options.get(diff, None)

    def get_acq_rate(self) -> Optional[float]:
        """Read the acquisition rate for the selected detector module.

        The value is read from CONTROL source e.g.`CONTROL/../MDL/FPGA_COMP`,
        If key is not available, the rate is calculated from
        two consecutive pulses of the same trainId.

        :return acq_rate: the acquisition rate.
                          return None, if not available.
        """
        acq_rate = self._get_acq_rate_ctrl()

        if acq_rate is not None:
            return acq_rate
        # For AGIPD500K this function would produce wrong value (always 4.5)
        # before W10/2022.
        # TODO: Confirm leaving this as it is.
        return self._get_acq_rate_instr()

    def _get_gain_setting_ctrl(self) -> Optional[int]:
        """Read gain_settings from CONTROL source and gain key."""
        return int(self.run_dc[self.ctrl_src, "gain"].as_single_value())

    def _get_gain_setting_ctrl_old(self) -> Optional[int]:
        """Read gain_settings from setupr and patterTypeIndex
        from old RAW data that is missing `gain.value`.

        If `gain.value` isn't available in MDL source,
        the setting is calculated from `setupr` and `patternTypeIndex`

        gain-setting 1: setupr@dark=8, setupr@slopespc=40
        gain-setting 0: setupr@dark=0, setupr@slopespc=32

        patternTypeIndex 1: High-gain
        patternTypeIndex 2: Medium-gain
        patternTypeIndex 3: Low-gain
        patternTypeIndex 4: SlopesPC
        Returns:
            int: gain_setting value.
        """
        setupr = self.run_dc[self.ctrl_src, "setupr"].as_single_value()
        pattern_type_idx = self.run_dc[
            self.ctrl_src, "patternTypeIndex"].as_single_value()

        if (setupr == 0 and pattern_type_idx < 4) or (
                setupr == 32 and pattern_type_idx == 4):
            return 0
        elif (setupr == 8 and pattern_type_idx < 4) or (
                setupr == 40 and pattern_type_idx == 4):
            return 1
        else:
            # TODO: Confirm that this can be removed.
            if self.raise_error:
                raise ValueError(
                    "Could not derive gain setting from"
                    " setupr and patternTypeIndex"
                )
            return

    def get_gain_setting(
        self,
        creation_time: Optional[datetime] = None,
    ) -> Optional[int]:
        """Read Gain setting from CONTROL sources.
        if key `gain.value` is not available, calculate gain_setting from
        setupr and patterTypeIndex. If it failed raise ValueError.

        :param creation_time: datetime object for the data creation time.
        :return: gain setting.
                 return 0, if not available.
        """
        # TODO: remove after fixing get_possible_conditions
        if (
            creation_time and
            creation_time.replace(tzinfo=None) < parser.parse('2020-01-31')
        ):
            print("Set gain-setting to None for runs taken before 2020-01-31")
            return

        if "gain.value" in self.run_dc.keys_for_source(self.ctrl_src):
            return self._get_gain_setting_ctrl()

        gain_setting = self._get_gain_setting_ctrl_old()

        if gain_setting is not None:
            return gain_setting
        else:
            # TODO: confirm that this can be removed.
            print(
                "ERROR: gain_setting is not available "
                f"at source {self.ctrl_src}.\nSet gain_setting to 0.")
            return 0

    def get_gain_mode(self) -> int:
        """Returns the gain mode (adaptive or fixed) from slow data."""

        if (
            self.ctrl_src in self.run_dc.all_sources and
            "gainModeIndex.value" in self.run_dc.keys_for_source(
                self.ctrl_src)
        ):
            return AgipdGainMode(int(self.run_dc[
                self.ctrl_src, "gainModeIndex"].as_single_value()))

        return AgipdGainMode.ADAPTIVE_GAIN

    def get_bias_voltage(
        self,
        karabo_id_control: str,
        module: Optional[int] = 0
    ) -> int:
        """Read the voltage information from the RUN source of module 0.

        Different modules may operate at different voltages.
        In practice, they all operate at the same voltage.
        As such, it is okay to read a single module's value.

        If the FPGA/PSC RUN source is not available, 300 will be returned.
        300 was the default bias_voltage value for
        MID_DET_AGIPD1M-1 and SPB_DET_AGIPD1M-1.

        Args:
            karabo_id_control: The karabo deviceId for the CONTROL device.
            module: defaults to module 0
        Returns:
            float: bias voltage value
        """
        # TODO: Add a breaking fix by passing the source and key through
        # get_bias_voltage arguments.
        if "AGIPD1M" in karabo_id_control:
            voltage_src = (
                f"{karabo_id_control[:-1]}/PSC/HV",
                f"channels.U{module}.measurementSenseVoltage.value")
            # TODO: Validate if removing this and depend on adding voltage value
            # from the Notebook's first cell.
            default_voltage = 300
        else:  # AGIPD500K
            voltage_src = (
                f"{karabo_id_control}/FPGA/M_{module}",
                "highVoltage.actual.value")
            default_voltage = None

        if (
            voltage_src[0] in self.run_dc.all_sources and
            voltage_src[1] in self.run_dc.keys_for_source(voltage_src[0])
        ):
            # Use RUN source for reading the bias voltage value.
            # As HED_DET_AGIPD500K2G has a hardware issue that leads
            # to storing arbitrary voltage values in the CONTROL source
            # array. e.g. /gpfs/exfel/exp/HED/202230/p900248/raw
            return int(self.run_dc.get_run_value(*voltage_src))
        else:
            # TODO: Validate if removing this and
            # and using NB value for old RAW data.
            error = ("ERROR: Unable to read bias_voltage from"
                     f" {voltage_src[0]}/{voltage_src[1].replace('.','/')}.")

            if default_voltage:
                print(f"{error} Returning {default_voltage} "
                      "as default bias voltage value.")
            else:
                raise ValueError(error)
            return default_voltage

    def get_integration_time(self) -> int:
        """Read integration time from the FPGA device.

        The integration time is specified as an integer number of clock
        cycles each spanning ~9ns. The default (and legacy) value is 12.

        :return: integration time
        """
        if (
            self.ctrl_src in self.run_dc.all_sources and
            'integrationTime.value' in self.run_dc.keys_for_source(
                self.ctrl_src)
        ):
            return int(self.run_dc[
                self.ctrl_src, 'integrationTime'].as_single_value())

        return 12

get_acq_rate()

Read the acquisition rate for the selected detector module.

The value is read from CONTROL source e.g.CONTROL/../MDL/FPGA_COMP, If key is not available, the rate is calculated from two consecutive pulses of the same trainId.

:return acq_rate: the acquisition rate. return None, if not available.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_acq_rate(self) -> Optional[float]:
    """Read the acquisition rate for the selected detector module.

    The value is read from CONTROL source e.g.`CONTROL/../MDL/FPGA_COMP`,
    If key is not available, the rate is calculated from
    two consecutive pulses of the same trainId.

    :return acq_rate: the acquisition rate.
                      return None, if not available.
    """
    acq_rate = self._get_acq_rate_ctrl()

    if acq_rate is not None:
        return acq_rate
    # For AGIPD500K this function would produce wrong value (always 4.5)
    # before W10/2022.
    # TODO: Confirm leaving this as it is.
    return self._get_acq_rate_instr()

get_bias_voltage(karabo_id_control, module=0)

Read the voltage information from the RUN source of module 0.

Different modules may operate at different voltages. In practice, they all operate at the same voltage. As such, it is okay to read a single module's value.

If the FPGA/PSC RUN source is not available, 300 will be returned. 300 was the default bias_voltage value for MID_DET_AGIPD1M-1 and SPB_DET_AGIPD1M-1.

Parameters:

Name Type Description Default
karabo_id_control str

The karabo deviceId for the CONTROL device.

required
module Optional[int]

defaults to module 0

0

Returns:

Name Type Description
float int

bias voltage value

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_bias_voltage(
    self,
    karabo_id_control: str,
    module: Optional[int] = 0
) -> int:
    """Read the voltage information from the RUN source of module 0.

    Different modules may operate at different voltages.
    In practice, they all operate at the same voltage.
    As such, it is okay to read a single module's value.

    If the FPGA/PSC RUN source is not available, 300 will be returned.
    300 was the default bias_voltage value for
    MID_DET_AGIPD1M-1 and SPB_DET_AGIPD1M-1.

    Args:
        karabo_id_control: The karabo deviceId for the CONTROL device.
        module: defaults to module 0
    Returns:
        float: bias voltage value
    """
    # TODO: Add a breaking fix by passing the source and key through
    # get_bias_voltage arguments.
    if "AGIPD1M" in karabo_id_control:
        voltage_src = (
            f"{karabo_id_control[:-1]}/PSC/HV",
            f"channels.U{module}.measurementSenseVoltage.value")
        # TODO: Validate if removing this and depend on adding voltage value
        # from the Notebook's first cell.
        default_voltage = 300
    else:  # AGIPD500K
        voltage_src = (
            f"{karabo_id_control}/FPGA/M_{module}",
            "highVoltage.actual.value")
        default_voltage = None

    if (
        voltage_src[0] in self.run_dc.all_sources and
        voltage_src[1] in self.run_dc.keys_for_source(voltage_src[0])
    ):
        # Use RUN source for reading the bias voltage value.
        # As HED_DET_AGIPD500K2G has a hardware issue that leads
        # to storing arbitrary voltage values in the CONTROL source
        # array. e.g. /gpfs/exfel/exp/HED/202230/p900248/raw
        return int(self.run_dc.get_run_value(*voltage_src))
    else:
        # TODO: Validate if removing this and
        # and using NB value for old RAW data.
        error = ("ERROR: Unable to read bias_voltage from"
                 f" {voltage_src[0]}/{voltage_src[1].replace('.','/')}.")

        if default_voltage:
            print(f"{error} Returning {default_voltage} "
                  "as default bias voltage value.")
        else:
            raise ValueError(error)
        return default_voltage

get_gain_mode()

Returns the gain mode (adaptive or fixed) from slow data.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_gain_mode(self) -> int:
    """Returns the gain mode (adaptive or fixed) from slow data."""

    if (
        self.ctrl_src in self.run_dc.all_sources and
        "gainModeIndex.value" in self.run_dc.keys_for_source(
            self.ctrl_src)
    ):
        return AgipdGainMode(int(self.run_dc[
            self.ctrl_src, "gainModeIndex"].as_single_value()))

    return AgipdGainMode.ADAPTIVE_GAIN

get_gain_setting(creation_time=None)

Read Gain setting from CONTROL sources. if key gain.value is not available, calculate gain_setting from setupr and patterTypeIndex. If it failed raise ValueError.

:param creation_time: datetime object for the data creation time. :return: gain setting. return 0, if not available.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_gain_setting(
    self,
    creation_time: Optional[datetime] = None,
) -> Optional[int]:
    """Read Gain setting from CONTROL sources.
    if key `gain.value` is not available, calculate gain_setting from
    setupr and patterTypeIndex. If it failed raise ValueError.

    :param creation_time: datetime object for the data creation time.
    :return: gain setting.
             return 0, if not available.
    """
    # TODO: remove after fixing get_possible_conditions
    if (
        creation_time and
        creation_time.replace(tzinfo=None) < parser.parse('2020-01-31')
    ):
        print("Set gain-setting to None for runs taken before 2020-01-31")
        return

    if "gain.value" in self.run_dc.keys_for_source(self.ctrl_src):
        return self._get_gain_setting_ctrl()

    gain_setting = self._get_gain_setting_ctrl_old()

    if gain_setting is not None:
        return gain_setting
    else:
        # TODO: confirm that this can be removed.
        print(
            "ERROR: gain_setting is not available "
            f"at source {self.ctrl_src}.\nSet gain_setting to 0.")
        return 0

get_integration_time()

Read integration time from the FPGA device.

The integration time is specified as an integer number of clock cycles each spanning ~9ns. The default (and legacy) value is 12.

:return: integration time

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_integration_time(self) -> int:
    """Read integration time from the FPGA device.

    The integration time is specified as an integer number of clock
    cycles each spanning ~9ns. The default (and legacy) value is 12.

    :return: integration time
    """
    if (
        self.ctrl_src in self.run_dc.all_sources and
        'integrationTime.value' in self.run_dc.keys_for_source(
            self.ctrl_src)
    ):
        return int(self.run_dc[
            self.ctrl_src, 'integrationTime'].as_single_value())

    return 12

get_num_cells()

Read number of memory cells from fast data.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_num_cells(self) -> int:
    """Read number of memory cells from fast data."""
    ncell = self._get_num_cells_ctrl()
    if ncell is None:  # ctrl_src unavailable
        # The method implemented in this function doesn't suit for
        # filtered data. If DAQ filters data and the last cell is removed,
        # the function returns wrong value.
        ncell = self._get_num_cells_instr()

    return ncell 

AgipdCtrlRuns dataclass

Get AGIPD control parameters across several runs, e.g. 3 runs for darks.

Parameters:

Name Type Description Default
raw_folder str

The RAW folder path.

required
runs list

The list of runs to read the operating conditions.

required
image_src str

H5 source for image data.

required
ctrl_src str

H5 source for control (slow) data.

required
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
@dataclass
class AgipdCtrlRuns:
    """Get AGIPD control parameters across several runs,
    e.g. 3 runs for darks.

    Args:
        raw_folder (str): The RAW folder path.
        runs (list): The list of runs to read the operating conditions.
        image_src (str): H5 source for image data.
        ctrl_src (str): H5 source for control (slow) data.
    """
    raw_folder: str
    runs: List[int]
    image_src: str
    ctrl_src: str
    sort_dark_runs_enabled: bool = False

    adaptive_gain_modes = [AgipdGainMode.ADAPTIVE_GAIN] * 3
    fixed_gain_modes = [
        AgipdGainMode.FIXED_HIGH_GAIN,
        AgipdGainMode.FIXED_MEDIUM_GAIN,
        AgipdGainMode.FIXED_LOW_GAIN,
    ]

    def __post_init__(self):
        # validate that all runs belong to the same
        self.run_ctrls = [
            AgipdCtrl(
                run_dc=RunDirectory(f"{self.raw_folder}/r{r:04d}"),
                image_src=self.image_src,
                ctrl_src=self.ctrl_src,
                ) for r in self.runs]
        self.gain_modes = self.get_gain_modes()
        if self.sort_dark_runs_enabled:
            self.sort_dark_runs()

    def _validate_same_value(self, name, values):
            if len(set(values)) != 1:
                # Should we raise an error and stop processing?
                warning(
                    f"{name} is not the same for all runs {self.runs}"
                    f" with values of {values}, respectively.")

    def sort_dark_runs(self):
        """Order dark runs based on run patterns for Adaptive mode
        or gain modes for Fixed mode.
        """
        assert len(self.runs) == 3, f"AGIPD dark runs are expected to be 3. {len(self.runs)} runs are given."  # noqa
        # Expected patterns:
        # XRay: 0, DarkHG: 1, DarkMG: 2, DarkLG: 3, PC: 4 and CS: 5.
        sort_by = None
        sort_values = []
        if self.gain_modes == self.adaptive_gain_modes:  # Adaptive gain # sort by patterns
            # Patterns -> DarkHG: 1, DarkMG: 2, DarkLG: 3
            if "patternTypeIndex" in self.run_ctrls[0].run_dc[self.ctrl_src]:
                sort_by = "patternTypeIndex"
            elif "expTypeIndex" in self.run_ctrls[0].run_dc[self.ctrl_src]:
                sort_by = "expTypeIndex"
            else:
                raise ValueError(
                    "Neither `patternTypeIndex` nor `expTypeIndex` "
                    "keys are available in CTRL data source.")

            for c in self.run_ctrls:
                sort_values.append(
                    c.run_dc[self.ctrl_src, sort_by].as_single_value())

        # Check if a mix of adaptive and fixed gain runs.
        elif any(gm == AgipdGainMode.ADAPTIVE_GAIN for gm in self.gain_modes):
            raise ValueError(
                f"Given runs {self.runs} have a mix of ADAPTIVE and "
                f"FIXED gain modes: {self.gain_modes}.")
        else:  # Fixed gain: Patterns is X-Ray: 0 for all runs.
            sort_by = "gainModeIndex"
            sort_values = [int(gm) for gm in self.gain_modes]

        zipped_lists = zip(sort_values, self.runs, self.run_ctrls)

        # Sort the lists based on the patterns
        sorted_zipped_lists = sorted(zipped_lists, key=lambda item: item[0])
        _, sorted_runs, sorted_run_ctrls = zip(*sorted_zipped_lists)
        if sorted_runs != self.runs:
            Warning("Given dark runs are unsorted. Runs will be sorted from"
                    f" {self.runs} with {sort_by}:"
                    f" {sort_values} to {sorted_runs}.")
            # Update run_ctrls and runs order
            self.runs = list(sorted_runs)
            self.run_ctrls = list(sorted_run_ctrls)
            self.gain_modes = self.get_gain_modes()

    def fixed_gain_mode(self):
        """Check if runs are in fixed gain mode.

        Raises:
            ValueError: Unexpected gain modes for the dark runs

        Returns:
            bool: runs are in fixed gain mode.
        """
        if self.gain_modes == self.adaptive_gain_modes:
            return False
        elif self.gain_modes == self.fixed_gain_modes:
            return True
        else:
            raise ValueError(f"Unexpected runs' gain modes: {self.gain_modes}")

    def get_gain_modes(self):
        """Get runs' gain modes.
        Returns:
            list: `AgipdGainMode`s
        """
        return [c.get_gain_mode() for c in self.run_ctrls]

    def get_integration_time(self):
        """
        Returns:
            float: Integration time
        """
        integration_times = [c.get_integration_time() for c in self.run_ctrls]
        self._validate_same_value("Integration Time", integration_times)
        return integration_times[0]

    def get_bias_voltage(
        self,
        karabo_id_control: str = None,
        module: Optional[int] = 0
    ):
        """
        Args:
            karabo_id_control (str):
                Karabo ID for control device.

        Returns:
            int: Bias voltage.
        """
        bias_voltages = [
            c.get_bias_voltage(karabo_id_control, module) for c in self.run_ctrls]
        self._validate_same_value("Bias Voltage", bias_voltages)
        return bias_voltages[0]

    def get_memory_cells(self):
        """
        Returns:
            int: number of memory cells.
        """
        memory_cells = [c.get_num_cells() for c in self.run_ctrls]
        self._validate_same_value("Memory cells", memory_cells)
        return memory_cells[0]

    def get_gain_setting(self, creation_time: Optional[datetime] = None):
        """
        Args:
            creation_time (Optional[datetime], optional):
                Creation time for the runs.

        Returns:
            float: Gain Setting
        """
        gain_settings = [
            c.get_gain_setting(creation_time) for c in self.run_ctrls]
        self._validate_same_value("Gain Setting", gain_settings)
        return gain_settings[0]

    def get_acq_rate(self):
        """
        Returns:
            float: Acquisition rate
        """
        acquisition_rates = [c.get_acq_rate() for c in self.run_ctrls]
        self._validate_same_value("acquisition_rate", acquisition_rates)
        return acquisition_rates[0]

fixed_gain_mode()

Check if runs are in fixed gain mode.

Raises:

Type Description
ValueError

Unexpected gain modes for the dark runs

Returns:

Name Type Description
bool

runs are in fixed gain mode.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def fixed_gain_mode(self):
    """Check if runs are in fixed gain mode.

    Raises:
        ValueError: Unexpected gain modes for the dark runs

    Returns:
        bool: runs are in fixed gain mode.
    """
    if self.gain_modes == self.adaptive_gain_modes:
        return False
    elif self.gain_modes == self.fixed_gain_modes:
        return True
    else:
        raise ValueError(f"Unexpected runs' gain modes: {self.gain_modes}")

get_acq_rate()

Returns:

Name Type Description
float

Acquisition rate

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_acq_rate(self):
    """
    Returns:
        float: Acquisition rate
    """
    acquisition_rates = [c.get_acq_rate() for c in self.run_ctrls]
    self._validate_same_value("acquisition_rate", acquisition_rates)
    return acquisition_rates[0]

get_bias_voltage(karabo_id_control=None, module=0)

Parameters:

Name Type Description Default
karabo_id_control str

Karabo ID for control device.

None

Returns:

Name Type Description
int

Bias voltage.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_bias_voltage(
    self,
    karabo_id_control: str = None,
    module: Optional[int] = 0
):
    """
    Args:
        karabo_id_control (str):
            Karabo ID for control device.

    Returns:
        int: Bias voltage.
    """
    bias_voltages = [
        c.get_bias_voltage(karabo_id_control, module) for c in self.run_ctrls]
    self._validate_same_value("Bias Voltage", bias_voltages)
    return bias_voltages[0]

get_gain_modes()

Get runs' gain modes.

Returns:

Name Type Description
list

AgipdGainModes

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_gain_modes(self):
    """Get runs' gain modes.
    Returns:
        list: `AgipdGainMode`s
    """
    return [c.get_gain_mode() for c in self.run_ctrls]

get_gain_setting(creation_time=None)

Parameters:

Name Type Description Default
creation_time Optional[datetime]

Creation time for the runs.

None

Returns:

Name Type Description
float

Gain Setting

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_gain_setting(self, creation_time: Optional[datetime] = None):
    """
    Args:
        creation_time (Optional[datetime], optional):
            Creation time for the runs.

    Returns:
        float: Gain Setting
    """
    gain_settings = [
        c.get_gain_setting(creation_time) for c in self.run_ctrls]
    self._validate_same_value("Gain Setting", gain_settings)
    return gain_settings[0]

get_integration_time()

Returns:

Name Type Description
float

Integration time

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_integration_time(self):
    """
    Returns:
        float: Integration time
    """
    integration_times = [c.get_integration_time() for c in self.run_ctrls]
    self._validate_same_value("Integration Time", integration_times)
    return integration_times[0]

get_memory_cells()

Returns:

Name Type Description
int

number of memory cells.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_memory_cells(self):
    """
    Returns:
        int: number of memory cells.
    """
    memory_cells = [c.get_num_cells() for c in self.run_ctrls]
    self._validate_same_value("Memory cells", memory_cells)
    return memory_cells[0]

sort_dark_runs()

Order dark runs based on run patterns for Adaptive mode or gain modes for Fixed mode.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def sort_dark_runs(self):
    """Order dark runs based on run patterns for Adaptive mode
    or gain modes for Fixed mode.
    """
    assert len(self.runs) == 3, f"AGIPD dark runs are expected to be 3. {len(self.runs)} runs are given."  # noqa
    # Expected patterns:
    # XRay: 0, DarkHG: 1, DarkMG: 2, DarkLG: 3, PC: 4 and CS: 5.
    sort_by = None
    sort_values = []
    if self.gain_modes == self.adaptive_gain_modes:  # Adaptive gain # sort by patterns
        # Patterns -> DarkHG: 1, DarkMG: 2, DarkLG: 3
        if "patternTypeIndex" in self.run_ctrls[0].run_dc[self.ctrl_src]:
            sort_by = "patternTypeIndex"
        elif "expTypeIndex" in self.run_ctrls[0].run_dc[self.ctrl_src]:
            sort_by = "expTypeIndex"
        else:
            raise ValueError(
                "Neither `patternTypeIndex` nor `expTypeIndex` "
                "keys are available in CTRL data source.")

        for c in self.run_ctrls:
            sort_values.append(
                c.run_dc[self.ctrl_src, sort_by].as_single_value())

    # Check if a mix of adaptive and fixed gain runs.
    elif any(gm == AgipdGainMode.ADAPTIVE_GAIN for gm in self.gain_modes):
        raise ValueError(
            f"Given runs {self.runs} have a mix of ADAPTIVE and "
            f"FIXED gain modes: {self.gain_modes}.")
    else:  # Fixed gain: Patterns is X-Ray: 0 for all runs.
        sort_by = "gainModeIndex"
        sort_values = [int(gm) for gm in self.gain_modes]

    zipped_lists = zip(sort_values, self.runs, self.run_ctrls)

    # Sort the lists based on the patterns
    sorted_zipped_lists = sorted(zipped_lists, key=lambda item: item[0])
    _, sorted_runs, sorted_run_ctrls = zip(*sorted_zipped_lists)
    if sorted_runs != self.runs:
        Warning("Given dark runs are unsorted. Runs will be sorted from"
                f" {self.runs} with {sort_by}:"
                f" {sort_values} to {sorted_runs}.")
        # Update run_ctrls and runs order
        self.runs = list(sorted_runs)
        self.run_ctrls = list(sorted_run_ctrls)
        self.gain_modes = self.get_gain_modes()

CellRange

Bases: CellSelection

Selection of detector memory cells given as a range

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
class CellRange(CellSelection):
    """Selection of detector memory cells given as a range"""

    def __init__(self, crange: List[int], max_cells: int):
        """Initialize range selection

        :param crange: range parameters of selected cells,
            list up to 3 elements
        :param max_cells: number of exposed cells
        """
        self.max_cells = max_cells
        self.crange = validate_selected_pulses(crange, max_cells)
        self.flag = np.zeros(self.max_cells, bool)
        self.flag_cm = np.zeros(self.ncell_max, bool)
        self.flag[slice(*crange)] = True
        self.flag_cm[:self.max_cells] = self.flag
        self.flag_cm = (self.flag_cm.reshape(-1, self.row_size).any(1)
                        .repeat(self.row_size)[:self.max_cells])
        self.sel_type = [self.flag, self.flag_cm, self.flag]

    def msg(self):
        return (
            f"Use range selection with crange={self.crange}, "
            f"max_cells={self.max_cells}\n"
            f"Frames per train: {self.flag.sum()}"
        )

    def get_cells_on_trains(
        self, train_sel: np.ndarray, nfrm: np.ndarray,
        cellid: np.ndarray, cm: int = 0
    ) -> np.array:
        if cm < 0 or cm > 2:
            raise ValueError("param 'cm' takes only 0,1,2")

        flag = self.sel_type[cm]
        sel = np.zeros(np.sum(nfrm), bool)
        counts = np.zeros(len(nfrm), int)
        i0 = 0
        for i, nfrm_i in enumerate(nfrm):
            iN = i0 + nfrm_i
            f = flag[cellid[i0:iN]]
            sel[i0:iN] = f
            counts[i] = np.sum(f)
            i0 = iN

        return sel, counts

    def filter_trains(self, train_sel: np.ndarray):
        return train_sel

__init__(crange, max_cells)

Initialize range selection

:param crange: range parameters of selected cells, list up to 3 elements :param max_cells: number of exposed cells

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def __init__(self, crange: List[int], max_cells: int):
    """Initialize range selection

    :param crange: range parameters of selected cells,
        list up to 3 elements
    :param max_cells: number of exposed cells
    """
    self.max_cells = max_cells
    self.crange = validate_selected_pulses(crange, max_cells)
    self.flag = np.zeros(self.max_cells, bool)
    self.flag_cm = np.zeros(self.ncell_max, bool)
    self.flag[slice(*crange)] = True
    self.flag_cm[:self.max_cells] = self.flag
    self.flag_cm = (self.flag_cm.reshape(-1, self.row_size).any(1)
                    .repeat(self.row_size)[:self.max_cells])
    self.sel_type = [self.flag, self.flag_cm, self.flag]

CellSelection

Selection of detector memory cells (abstract class)

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
class CellSelection:
    """Selection of detector memory cells (abstract class)"""
    row_size = 32
    ncell_max = 352
    CM_NONE = 0
    CM_PRESEL = 1
    CM_FINSEL = 2

    def filter_trains(self, train_sel: np.ndarray):
        """Filters out trains that will not be processed

        :param train_sel: list of a train ids selected for processing
        :return: array of filtered trains
        """
        raise NotImplementedError

    def get_cells_on_trains(
        self, train_sel: np.ndarray, nfrm: np.ndarray,
        cellid: np.ndarray, cm: int = 0
    ) -> np.array:
        """Returns mask of cells selected for processing

        :param train_sel: list of a train ids selected for processing
        :param nfrm: the number of frames expected for every train in
            the list `train_sel`
        :param cellid: array of cell IDs in the same sequence as images to
            filter
        :param cm: flag indicates the final selection or interim selection
            for common-mode correction
        :returns:
            - boolean array with flags indicating images for processing
            - integer array with number of selected frames in trains
        """
        raise NotImplementedError

    def msg(self):
        """Returns log message on initialization

        :return: message
        """
        raise NotImplementedError

filter_trains(train_sel)

Filters out trains that will not be processed

:param train_sel: list of a train ids selected for processing :return: array of filtered trains

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def filter_trains(self, train_sel: np.ndarray):
    """Filters out trains that will not be processed

    :param train_sel: list of a train ids selected for processing
    :return: array of filtered trains
    """
    raise NotImplementedError

get_cells_on_trains(train_sel, nfrm, cellid, cm=0)

Returns mask of cells selected for processing

:param train_sel: list of a train ids selected for processing :param nfrm: the number of frames expected for every train in the list train_sel :param cellid: array of cell IDs in the same sequence as images to filter :param cm: flag indicates the final selection or interim selection for common-mode correction :returns: - boolean array with flags indicating images for processing - integer array with number of selected frames in trains

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def get_cells_on_trains(
    self, train_sel: np.ndarray, nfrm: np.ndarray,
    cellid: np.ndarray, cm: int = 0
) -> np.array:
    """Returns mask of cells selected for processing

    :param train_sel: list of a train ids selected for processing
    :param nfrm: the number of frames expected for every train in
        the list `train_sel`
    :param cellid: array of cell IDs in the same sequence as images to
        filter
    :param cm: flag indicates the final selection or interim selection
        for common-mode correction
    :returns:
        - boolean array with flags indicating images for processing
        - integer array with number of selected frames in trains
    """
    raise NotImplementedError

msg()

Returns log message on initialization

:return: message

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def msg(self):
    """Returns log message on initialization

    :return: message
    """
    raise NotImplementedError

LitFrameSelection

Bases: CellSelection

Selection of detector memery cells indicated as lit frames by the AgipdLitFrameFinder

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
class LitFrameSelection(CellSelection):
    """Selection of detector memery cells indicated as lit frames
    by the AgipdLitFrameFinder
    """
    def __init__(self,
                 litfrmdata: 'AgipdLitFrameFinderOffline',
                 train_ids: List[int],
                 crange: Optional[List[int]] = None,
                 energy_threshold: float = -1000,
                 use_super_selection: str = 'off'):
        """Initialize lit frame selection

        :param litfrmdata: AgipdLitFrameFinder output data
        :param train_ids: the list of selected trains
        :param crange: range parameters of selected cells,
            list up to 3 elements
        :param energy_threshold: the minimum allowed value for
            pulse energy
        :param use_super_selection: the stage when super selection
            should be applied: `off`, `cm` or `final`
        """
        from extra_redu import FrameSelection, SelType
        self.dev = litfrmdata.meta.litFrmDev
        self.crange = validate_selected_pulses(crange, self.ncell_max)
        self.energy_threshold = energy_threshold
        self.use_super_selection = use_super_selection

        if use_super_selection == 'off':
            self.sel_type = [SelType.CELL, SelType.ROW, SelType.CELL]
        elif use_super_selection == 'cm':
            self.sel_type = [SelType.CELL, SelType.SUPER_ROW, SelType.CELL]
        elif use_super_selection == 'final':
            self.sel_type = [SelType.SUPER_CELL, SelType.SUPER_ROW, SelType.SUPER_CELL]
        else:
            raise ValueError("param 'use_super_selection' takes only "
                             "'off', 'cm' or 'final'")

        self._sel = FrameSelection(
            litfrmdata, guess_missed=True, crange=slice(*self.crange),
            energy_threshold=energy_threshold, select_litframes=True
        )

    def print_report(self, max_lines=25):
        rep = self._sel.report()
        nrec = len(rep)
        s = slice(max_lines - 1) if nrec > max_lines else slice(None)
        print(" # trains                   "
              "  Ntrn Nmis   Np  Nd  Nf lit frames")
        for rec in rep[s]:
            frmintf = ', '.join([':'.join([str(n) for n in slc])
                                 for slc in rec['litframe_slice']])
            t0, tN, st = (rec['train_range'] + (1,))[:3]
            ntrain = max((int(tN) - int(t0)) // int(st), 1)
            trsintf = ':'.join([str(n) for n in rec['train_range']])
            print(("{pattern_no:2d} {trsintf:25s} {ntrain:5d} "
                   "{nmissed_trains:4d} {npulse_exposed:4d} {ndataframe:3d} "
                   "{nframe_total:3d} [{frmintf}]"
                   ).format(frmintf=frmintf, ntrain=ntrain,
                            trsintf=trsintf, **rec))
        if nrec > max_lines:
            print(f"... {nrec - max_lines + 1} more lines skipped")

    def msg(self):
        return (
            f"Use lit frame selection from {self.dev}, crange={self.crange}\n"
        )

    def get_cells_on_trains(
        self, train_sel: np.ndarray, nfrm: np.ndarray,
        cellid: np.ndarray, cm: int = 0
    ) -> np.array:
        if cm < 0 or cm > 2:
            raise ValueError("param 'cm' takes only 0,1,2")

        (sel, counts), = self._sel.litframes_on_trains(
            train_sel, nfrm, cellid, [self.sel_type[cm]])

        return sel, counts

    def filter_trains(self, train_sel: np.ndarray):
        return self._sel.filter_trains(train_sel, drop_empty=True)

__init__(litfrmdata, train_ids, crange=None, energy_threshold=-1000, use_super_selection='off')

Initialize lit frame selection

:param litfrmdata: AgipdLitFrameFinder output data :param train_ids: the list of selected trains :param crange: range parameters of selected cells, list up to 3 elements :param energy_threshold: the minimum allowed value for pulse energy :param use_super_selection: the stage when super selection should be applied: off, cm or final

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def __init__(self,
             litfrmdata: 'AgipdLitFrameFinderOffline',
             train_ids: List[int],
             crange: Optional[List[int]] = None,
             energy_threshold: float = -1000,
             use_super_selection: str = 'off'):
    """Initialize lit frame selection

    :param litfrmdata: AgipdLitFrameFinder output data
    :param train_ids: the list of selected trains
    :param crange: range parameters of selected cells,
        list up to 3 elements
    :param energy_threshold: the minimum allowed value for
        pulse energy
    :param use_super_selection: the stage when super selection
        should be applied: `off`, `cm` or `final`
    """
    from extra_redu import FrameSelection, SelType
    self.dev = litfrmdata.meta.litFrmDev
    self.crange = validate_selected_pulses(crange, self.ncell_max)
    self.energy_threshold = energy_threshold
    self.use_super_selection = use_super_selection

    if use_super_selection == 'off':
        self.sel_type = [SelType.CELL, SelType.ROW, SelType.CELL]
    elif use_super_selection == 'cm':
        self.sel_type = [SelType.CELL, SelType.SUPER_ROW, SelType.CELL]
    elif use_super_selection == 'final':
        self.sel_type = [SelType.SUPER_CELL, SelType.SUPER_ROW, SelType.SUPER_CELL]
    else:
        raise ValueError("param 'use_super_selection' takes only "
                         "'off', 'cm' or 'final'")

    self._sel = FrameSelection(
        litfrmdata, guess_missed=True, crange=slice(*self.crange),
        energy_threshold=energy_threshold, select_litframes=True
    )

validate_selected_pulses(max_pulses, max_cells)

Validate the selected pulses given from the notebook

Validate the selected range of pulses to correct raw data of at least one image.

1) A pulse indices within one train can't be greater than the operating memory cells. 2) Validate the order of the given raneg of pulses. 3) Raise value error if generate list of pulses is empty.

:param max_pulses: a list of at most 3 elements defining the range of pulses to calibrate. :param max_cells: operating memory cells.

:return adjusted_range: An adjusted range of pulse indices to correct.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdlib.py
def validate_selected_pulses(
    max_pulses: List[int], max_cells: int
) -> List[int]:
    """Validate the selected pulses given from the notebook

    Validate the selected range of pulses to correct raw data
    of at least one image.

    1) A pulse indices within one train can't be greater
    than the operating memory cells.
    2) Validate the order of the given raneg of pulses.
    3) Raise value error if generate list of pulses is empty.

    :param max_pulses: a list of at most 3 elements defining the
    range of pulses to calibrate.
    :param max_cells: operating memory cells.

    :return adjusted_range: An adjusted range of pulse indices to correct.
    """

    # Validate selected pulses range:
    # 1) A pulseId can't be greater than the operating memory cells.
    pulses_range = [max_cells if p > max_cells else p for p in max_pulses]

    if pulses_range != max_pulses:
        print(
            "WARNING: \"max_pulses\" list has been modified from "
            f"{max_pulses} to {pulses_range}. As the number of "
            "operating memory cells are less than the selected "
            "maximum pulse."
        )

    if len(pulses_range) == 1:
        adjusted_range = (0, pulses_range[0], 1)
    elif len(pulses_range) == 2:
        adjusted_range = (pulses_range[0], pulses_range[1], 1)
    elif len(pulses_range) == 3:
        adjusted_range = tuple(pulses_range)
    else:
        raise ValueError(
            "ERROR: Wrong length for the list of pulses indices range. "
            "Please check the given range for \"max_pulses\":"
            f"{max_pulses}. \"max_pulses\" needs to be a list of "
            "3 elements, [start, last, step]")

    if adjusted_range[0] > adjusted_range[1]:
        raise ValueError(
            "ERROR: Pulse range start is greater than range end. "
            "Please check the given range for \"max_pulses\":"
            f"{max_pulses}. \"max_pulses\" needs to be a list of "
            "3 elements, [start, last, step]")

    if not np.all([isinstance(p, int) for p in max_pulses]):
        raise TypeError(
            "ERROR: \"max_pulses\" elements needs to be integers:"
            f" {max_pulses}.")

    print(
        "A range of pulse indices is selected to correct:"
        f" {pulses_range}")

    return adjusted_range