Skip to content

lpdlib

LpdCorrections

The LpdCorrections class perfroms LPD offline correction

The following example shows a typical use case::

infile = h5py.File(filename, "r", driver="core")
outfile = h5py.File(filename_out, "w")

lpd_corr = LpdCorrections(infile, outfile, max_cells, channel,
max_pulses,
                          bins_gain_vs_signal, bins_signal_low_range,
                          bins_signal_high_range)

try:
    lpd_corr.initialize(offset, rel_gain, rel_gain_offset, mask,
    noise, flatfield)
except IOError:
    return

for irange in lpd_corr.get_iteration_range():
    lpd_corr.correct_lpd(irange)

hists, edges = lpd_corr.get_histograms()
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
class LpdCorrections:
    """
    The LpdCorrections class perfroms LPD offline correction

    The following example shows a typical use case::


        infile = h5py.File(filename, "r", driver="core")
        outfile = h5py.File(filename_out, "w")

        lpd_corr = LpdCorrections(infile, outfile, max_cells, channel,
        max_pulses,
                                  bins_gain_vs_signal, bins_signal_low_range,
                                  bins_signal_high_range)

        try:
            lpd_corr.initialize(offset, rel_gain, rel_gain_offset, mask,
            noise, flatfield)
        except IOError:
            return

        for irange in lpd_corr.get_iteration_range():
            lpd_corr.correct_lpd(irange)

        hists, edges = lpd_corr.get_histograms()

    """

    def __init__(self, infile, outfile, max_cells, channel, max_pulses,
                 bins_gain_vs_signal, bins_signal_low_range,
                 bins_signal_high_range,
                 raw_fmt_version=2, chunk_size=512,
                 h5_data_path="INSTRUMENT/FXE_DET_LPD1M-1/DET/{}CH0:xtdf/",
                 h5_index_path="INDEX/FXE_DET_LPD1M-1/DET/{}CH0:xtdf/",
                 do_ff=True, correct_non_linear=True, karabo_data_mode=False,
                 linear_between=None, mark_non_lin_region=True, nlc_version=2):
        """
        Initialize an LpdCorrections Class

        :param infile: to be corrected h5py input file
        :param outfile: writeable h5py output file
        :param channel: module/channel to correct
        :param max_pulses: maximum pulse id to consider for preview histograms
        :param bins_gain_vs_signal: number of bins for gain vs signal histogram
        :param bins_signal_low_range: number of bins for the low signal
            range histogram
        :param bins_signal_high_range: number of bins for the high signal
            range histogram
        :param raw_fmt_version: raw file format version to use
        :param chunk_size: images per chunk to compute upon
        :param h5_data_path: path in HDF5 file which is prefixed to the
            image/data section
        :param h5_index_path: path in HDF5 file which is prefixed to the
            index section
        :param do_ff: perform flat field corrections
        :param correct_non_linear: perform non-linear transition region corr.
        :param karabo_data_mode: set to true to use data iterated with karabo
            data
        """
        self.lpd_base = h5_data_path.format(channel)
        self.idx_base = h5_index_path.format(channel)
        self.infile = infile
        self.outfile = outfile
        self.channel = channel
        self.index_v = raw_fmt_version
        self.chunksize = chunk_size
        self.initialized = False
        self.max_pulses = max_pulses
        self.max_cells = max_cells
        self.hists_signal_low = 0
        self.hists_signal_high = 0
        self.hists_gain_vs_signal = 0
        self.bins_gain_vs_signal = bins_gain_vs_signal
        self.bins_signal_low_range = bins_signal_low_range
        self.bins_signal_high_range = bins_signal_high_range
        self.cidx = 0
        self.do_ff = do_ff
        filter_modules = []
        self.filter_cells = [0, 1] if channel in filter_modules else []
        self.cnl = correct_non_linear
        self.karabo_data_mode = karabo_data_mode
        self.linear_between = linear_between
        self.mark_nonlin = mark_non_lin_region
        self.nlc_version = nlc_version
        # emprically determined from APD datasets p900038, r155,r156
        # emprically determined from APD datasets p900038, r155,r156
        self.cnl_const = {
            'high': {'A': -0.000815934, 'lam': 0.00811281, 'c': 1908.89,
                     'm': 0, 'b': 0},
            'med': {'A': 0.0999894, 'lam': -0.00137652, 'c': 3107.83,
                    'm': 3.89982e-06, 'b': -0.116811},
            'low': {'A': 0.0119132, 'lam': -0.0002, 'c': 36738.6,
                    'm': 2.00273e-08, 'b': 0.245537}}

    def get_iteration_range(self):
        """Returns a range expression over which to iterate in chunks
        """
        return np.array_split(self.firange,
                              self.firange.size // self.chunksize)

    def initialize(self, offset, rel_gain, rel_gain_offset, mask, noise,
                   flatfield, swap_axis=True):
        """ Initialize the calibration constants and the output data file.

        Any data that is not touched by the corrections is copied into the
        output file at this point. Also data validity tests are performed.
        This functions must be called before `correct_lpd` is executed.

        :param offset: offset map to use for corrections
        :param rel_gain: relative gain map to use for corrections
        :param rel_gain_offset: relative gain offset to use for corrections
        :param mask: bad pixel mask to use for corrections
        :param noise: noise map to use for corrections
        :param flatfield: flatfield map to use for corrections
        :param swap_axis: set to True if using data from the calibration
        database.
        """

        if swap_axis:
            if offset is not None:
                mvd = np.moveaxis(np.moveaxis(offset, 2, 0), 2, 1)
                offset = np.ascontiguousarray(mvd)
            if rel_gain is not None:
                mvd = np.moveaxis(np.moveaxis(rel_gain, 2, 0), 2, 1)
                rel_gain = np.ascontiguousarray(mvd)
            if mask is not None:
                mvd = np.moveaxis(np.moveaxis(mask, 2, 0), 2, 1)
                mask = np.ascontiguousarray(mvd)
            if rel_gain_offset is not None:
                mvd = np.moveaxis(np.moveaxis(rel_gain_offset, 2, 0), 2, 1)
                rel_gain_offset = np.ascontiguousarray(mvd)
            if noise is not None:
                mvd = np.moveaxis(np.moveaxis(noise, 2, 0), 2, 1)
                noise = np.ascontiguousarray(mvd)
            if flatfield is not None:
                flatfield = np.ascontiguousarray(flatfield)

        if offset is not None:
            self.offset = offset
        if rel_gain is not None:
            self.rel_gain = rel_gain
        if rel_gain_offset is not None:
            self.rel_gain_b = rel_gain_offset
        if mask is not None:
            if not hasattr(self, "mask"):
                self.mask = mask
            else:
                self.mask |= mask[:self.mask.shape[0], ...]
        if noise is not None:
            self.noise = noise
        if flatfield is not None:
            self.flatfield = flatfield

        if not self.initialized:
            self.median_noise = np.nanmedian(self.noise[0, ...])
            allcells = [self.offset.shape[0],
                        self.mask.shape[0],
                        self.max_cells]
            self.max_cells = np.min(allcells)
            if not self.karabo_data_mode:
                self.gen_valid_range()
                if self.firange.size < self.chunksize:
                    self.chunksize = self.firange.size
                self.copy_and_sanitize_non_cal_data()
                self.create_output_datasets()
            self.initialized = True

    @staticmethod
    def split_gain(d):
        """ Split gain information off 16-bit LPD data

        Gain information can be found in bits 12 and 13 (0-based)
        """
        msk = np.zeros(d.shape, np.uint16)
        msk[...] = 0b0000111111111111
        data = np.bitwise_and(d, msk)

        gain = np.right_shift(d, 12)
        msk[...] = 0b0000000000000011
        gain = np.bitwise_and(gain, msk)
        return data, gain

    def correct_lpd(self, irange):
        """ Correct Raw LPD data for offset and relative gain effects

        :param irange: list of image indices to work on, should be contiguous,
                       or karabo_data structure to work on

        Will raise an RuntimeError if `initialize()` has not been called first.
        """
        if not self.initialized:
            raise RuntimeError("Must call initialize() first!")
        if not self.karabo_data_mode:
            lpd_base = self.lpd_base
            cidx = self.cidx
            im = np.array(self.infile[lpd_base + "image/data"][irange, ...])
            trainId = self.infile[lpd_base + "/image/trainId"][irange, ...]
            trainId = np.squeeze(trainId)
            pulseId = self.infile[lpd_base + "image/pulseId"][irange, ...]
            pulseId = np.squeeze(pulseId)
            status = self.infile[lpd_base + "/image/status"][irange, ...]
            status = np.squeeze(status)
            cells = self.infile[lpd_base + "/image/cellId"][irange, ...]
            cells = np.squeeze(cells)
            length = self.infile[lpd_base + "/image/length"][irange, ...]
            length = np.squeeze(length)
        else:
            cidx = 1  # do not produce any histograms
            im = irange['image.data']
            trainId = np.squeeze(irange['image.trainId'])
            status = np.squeeze(irange['image.status'])
            pulseId = np.squeeze(irange['image.pulseId'])
            cells = np.squeeze(irange['image.cellId'])
            length = np.squeeze(irange['image.length'])

        # split gain and image info into separate arrays
        im, gain = self.split_gain(im[:, 0, ...])

        # we need data as float from here on
        im = im.astype(np.float32)

        # invalid gain values
        im[gain > 2] = np.nan

        # on first iteration create histograms for reports
        if cidx == 0:
            H, xe, ye = np.histogram2d(im.flatten(), gain.flatten(),
                                       bins=self.bins_gain_vs_signal,
                                       range=[[0, 4096], [0, 4]])
            self.hists_gain_vs_signal += H
            self.signal_edges = (xe, ye)

        # zero invalid gains
        gain[gain > 2] = 0

        #  select constants by memory cells
        om = self.offset[cells, ...]
        rc = self.rel_gain[cells, ...]
        rbc = self.rel_gain_b[cells, ...]

        # and then by gain setting
        og = np.choose(gain, (om[..., 0], om[..., 1], om[..., 2]))
        rg = np.choose(gain, (rc[..., 0], rc[..., 1], rc[..., 2]))
        rgb = np.choose(gain, (rbc[..., 0], rbc[..., 1], rbc[..., 2]))

        mskg = self.mask[cells, ...]
        msk = np.choose(gain, (mskg[..., 0], mskg[..., 1], mskg[..., 2]))

        # correct offset
        im -= og

        nlf = 0
        if self.mark_nonlin and self.linear_between is not None:
            for gl, lr in enumerate(self.linear_between):

                midx = (gain == gl) & ((im < lr[0]) | (im > lr[1]))
                msk[midx] = BadPixels.NON_LIN_RESPONSE_REGION
                numnonlin = np.count_nonzero(midx, axis=(1,2))
                nlf += numnonlin
            nlf = nlf/float(im.shape[0] * im.shape[1])

        # hacky way of smoothening transition region between med and low

        cfac = 1
        if self.nlc_version == 1 and self.cnl:
            cfac = 0.314 * np.exp(-im * 0.001)

        # perform relative gain correction with additional gain-deduced
        # offset
        im = (im - rgb) / rg
        if self.do_ff:
            im /= self.flatfield[None, :, :]

        # hacky way of smoothening transition region between med and low
        if self.nlc_version == 1 and self.cnl:
            im[gain == 2] -= im[gain == 2] * cfac[gain == 2]

        # perform non-linear corrections if requested
        if self.cnl and self.nlc_version == 2:
            def lin_exp_fun(x, m, b, A, lam, c):
                return m * x + b + A * np.exp(lam * (x - c))

            x = im[(gain == 0)]
            cnl = self.cnl_const['high']
            cf = lin_exp_fun(x, cnl['m'], cnl['b'], cnl['A'], cnl['lam'],
                             cnl['c'])
            im[(gain == 0)] -= np.maximum(cf, -0.2) * x

            x = im[(gain == 1)]
            cnl = self.cnl_const['med']
            cf = lin_exp_fun(x, cnl['m'], cnl['b'], cnl['A'], cnl['lam'],
                             cnl['c'])
            im[(gain == 1)] -= np.minimum(cf, 0.2) * x

            x = im[(gain == 2)]
            cnl = self.cnl_const['low']
            cf = lin_exp_fun(x, cnl['m'], cnl['b'], cnl['A'], cnl['lam'],
                             cnl['c'])
            im[(gain == 2)] -= np.minimum(cf, 0.45) * x

        # create bad pixels masks, here non-finite values
        bidx = ~np.isfinite(im)
        im[bidx] = 0
        msk[bidx] = BadPixels.VALUE_IS_NAN

        # values which are unphysically large or small
        bidx = (im < -1e7) | (im > 1e7)
        im[bidx] = 0
        msk[bidx] = BadPixels.VALUE_OUT_OF_RANGE

        # on first iteration we create histograms for the report
        if cidx == 0:
            copim = copy.copy(im)
            copim[copim < self.median_noise] = np.nan
            bins = (self.bins_signal_low_range, self.max_pulses)
            rnge = [[-50, 1000], [0, self.max_pulses + 1]]
            H, xe, ye = np.histogram2d(np.nanmean(copim, axis=(1, 2)),
                                       pulseId,
                                       bins=bins,
                                       range=rnge)
            self.hists_signal_low += H
            self.low_edges = (xe, ye)
            bins = (self.bins_signal_high_range, self.max_pulses)
            rnge = [[0, 200000], [0, self.max_pulses + 1]]
            H, xe, ye = np.histogram2d(np.nanmean(copim, axis=(1, 2)),
                                       pulseId,
                                       bins=bins,
                                       range=rnge)
            self.hists_signal_high += H
            self.high_edges = (xe, ye)

        if not self.karabo_data_mode:
            # write data out
            # upper end of indices we are processing
            nidx = int(cidx + irange.size)
            self.ddset[cidx:nidx, ...] = im
            self.gdset[cidx:nidx, ...] = gain
            self.mdset[cidx:nidx, ...] = msk

            self.outfile[lpd_base + "image/cellId"][cidx:nidx] = cells
            self.outfile[lpd_base + "image/trainId"][cidx:nidx] = trainId
            self.outfile[lpd_base + "image/pulseId"][cidx:nidx] = pulseId
            self.outfile[lpd_base + "image/status"][cidx:nidx] = status
            self.outfile[lpd_base + "image/length"][cidx:nidx] = length
            if self.mark_nonlin:
                self.outfile[lpd_base + "image/nonLinear"][cidx:nidx] = nlf
            self.cidx = nidx
        else:
            irange['image.data'] = im
            irange['image.gain'] = gain
            irange['image.mask'] = msk
            irange['image.cellId'] = cells
            irange['image.trainId'] = trainId
            irange['image.pulseId'] = pulseId
            irange['image.status'] = status
            irange['image.length'] = length
            return irange

    def get_valid_image_idx(self):
        """ Return the indices of valid data
        """
        lpd_base = self.idx_base
        if self.index_v == 2:
            count = np.squeeze(self.infile[lpd_base + "image/count"])
            first = np.squeeze(self.infile[lpd_base + "image/first"])
            if np.count_nonzero(count != 0) == 0:
                raise IOError("File has no valid counts")
            valid = count != 0
            idxtrains = np.squeeze(self.infile["/INDEX/trainId"])
            medianTrain = np.nanmedian(idxtrains)
            lowok = (idxtrains > medianTrain - 1e4)
            highok = (idxtrains < medianTrain + 1e4)
            valid &= lowok & highok

            last_index = int(first[valid][-1] + count[valid][-1])
            first_index = int(first[valid][0])

        elif self.index_v == 1:
            status = np.squeeze(self.infile[lpd_base + "image/status"])
            if np.count_nonzero(status != 0) == 0:
                raise IOError("File {} has no valid counts".format(
                    self.infile))
            last = np.squeeze(self.infile[lpd_base + "image/last"])
            valid = status != 0

            idxtrains = np.squeeze(self.infile["/INDEX/trainId"])
            medianTrain = np.nanmedian(idxtrains)
            lowok = (idxtrains > medianTrain - 1e4)
            highok = (idxtrains < medianTrain + 1e4)
            valid &= lowok & highok

            last_index = int(last[valid][-1])
            first_index = int(last[valid][0])
        else:
            raise AttributeError(
                "Not a known raw format version: {}".format(self.index_v))

        self.valid = valid
        self.first_index = first_index
        self.last_index = last_index
        self.idxtrains = idxtrains

    def gen_valid_range(self):
        """ Generate an index range to pass to `correctLPD`.
        """
        first_index = self.first_index
        last_index = self.last_index
        max_cells = self.max_cells
        lpd_base = self.lpd_base
        allcells = self.infile[lpd_base + "image/cellId"]
        allcells = np.squeeze(allcells[first_index:last_index, ...])
        single_image = self.infile[lpd_base + "image/data"]
        single_image = np.array(single_image[first_index, ...])
        can_calibrate = allcells < max_cells
        for c in self.filter_cells:
            can_calibrate &= allcells != c
        if np.count_nonzero(can_calibrate) == 0:
            return
        allcells = allcells[can_calibrate]
        firange = np.arange(first_index, last_index)
        firange = firange[can_calibrate]
        self.oshape = (firange.size,
                       single_image.shape[1],
                       single_image.shape[2])
        self.firange = firange
        self.single_image = single_image

    def copy_and_sanitize_non_cal_data(self):
        """ Copy and sanitize data in `infile` that is not touched by
        `correctLPD`
        """
        lpd_base = self.lpd_base
        idx_base = self.idx_base
        first_index = self.first_index
        last_index = self.last_index
        firange = self.firange
        alltrains = self.infile[lpd_base + "image/trainId"]
        alltrains = np.squeeze(alltrains[first_index:last_index, ...])
        dont_copy = ["data", "cellId", "trainId", "pulseId", "status",
                     "length"]
        dont_copy = [lpd_base + "image/{}".format(do)
                     for do in dont_copy]

        dont_copy += [idx_base + "{}/first".format(do)
                      for do in ["image"]]
        dont_copy += [idx_base + "{}/count".format(do)
                      for do in ["image"]]

        # a visitor to copy outstanding data
        def visitor(k, item):
            if k not in dont_copy:

                if isinstance(item, h5py.Group):
                    self.outfile.create_group(k)
                elif isinstance(item, h5py.Dataset):
                    group = str(k).split("/")
                    group = "/".join(group[:-1])
                    self.infile.copy(k, self.outfile[group])

        self.infile.visititems(visitor)
        # sanitize indices
        for do in ["image", ]:
            uq, fidxv, cntsv = np.unique(alltrains[firange - firange[0]],
                                         return_index=True,
                                         return_counts=True)

            duq = (uq[1:] - uq[:-1]).astype(np.int64)

            cfidxv = [fidxv[0], ]
            ccntsv = [cntsv[0], ]
            for i, du in enumerate(duq.tolist()):
                if du > 1000:
                    du = 1
                    cntsv[i] = 0
                cfidxv += [0] * (du - 1) + [fidxv[i + 1], ]
                ccntsv += [0] * (du - 1) + [cntsv[i + 1], ]

            mv = len(cfidxv)
            fidx = np.zeros(len(cfidxv), fidxv.dtype)
            fidx[self.valid[:mv]] = np.array(cfidxv)[self.valid[:mv]]

            for i in range(len(fidx) - 1, 2, -1):
                if fidx[i - 1] == 0 and fidx[i] != 0:
                    fidx[i - 1] = fidx[i]

            cnts = np.zeros(len(cfidxv), cntsv.dtype)
            cnts[self.valid[:mv]] = np.array(ccntsv)[self.valid[:mv]]

            self.outfile.create_dataset(idx_base + "{}/first".format(do),
                                        fidx.shape,
                                        dtype=fidx.dtype,
                                        data=fidx,
                                        fletcher32=True)
            self.outfile.create_dataset(idx_base + "{}/count".format(do),
                                        cnts.shape,
                                        dtype=cnts.dtype,
                                        data=cnts,
                                        fletcher32=True)

    def create_output_datasets(self):
        """ Initialize output data sets
        """
        lpdbase = self.lpd_base
        chunksize = self.chunksize
        firange = self.firange
        oshape = self.oshape
        chunks = (chunksize, oshape[1], oshape[2])
        self.ddset = self.outfile.create_dataset(lpdbase + "image/data",
                                                 oshape, chunks=chunks,
                                                 dtype=np.float32,
                                                 fletcher32=True)
        self.gdset = self.outfile.create_dataset(lpdbase + "image/gain",
                                                 oshape, chunks=chunks,
                                                 dtype=np.uint8,
                                                 compression="gzip",
                                                 compression_opts=1,
                                                 shuffle=True,
                                                 fletcher32=True)
        self.mdset = self.outfile.create_dataset(lpdbase + "image/mask",
                                                 oshape, chunks=chunks,
                                                 dtype=np.uint32,
                                                 compression="gzip",
                                                 compression_opts=1,
                                                 shuffle=True,
                                                 fletcher32=True)
        fsz = firange.shape
        self.outfile.create_dataset(lpdbase + "image/cellId", fsz,
                                    dtype=np.uint16, fletcher32=True)
        self.outfile.create_dataset(lpdbase + "image/trainId", fsz,
                                    dtype=np.uint64, fletcher32=True)
        self.outfile.create_dataset(lpdbase + "image/pulseId", fsz,
                                    dtype=np.uint64, fletcher32=True)
        self.outfile.create_dataset(lpdbase + "image/status", fsz,
                                    dtype=np.uint16, fletcher32=True)
        self.outfile.create_dataset(lpdbase + "image/length", fsz,
                                    dtype=np.uint32, fletcher32=True)

        if self.mark_nonlin:
            self.outfile.create_dataset(lpdbase + "image/nonLinear", fsz,
                                        dtype=np.float32, fletcher32=True)

    def get_histograms(self):
        """ Return preview histograms computed from the first chunk
        """
        return ((self.hists_signal_low, self.hists_signal_high,
                 self.hists_gain_vs_signal),
                (self.low_edges, self.high_edges, self.signal_edges))

    def initialize_from_db(self, dbparms: List[Tuple['DBParms', 'DBParms_timeout']],
                           karabo_id: str, karabo_da: str,
                           only_dark: Optional[bool] = False):
        """ Initialize calibration constants from the calibration database

        :param dbparms: a tuple containing relevant database parameters,
        can be either:
            * cal_db_interface, creation_time, max_cells_db, capacitor,
              bias_voltage, photon_energy
              in which case the db timeout is set to 300 seconds,
              the cells to query dark image derived constants from the
              database is set to the global value

            * cal_db_interface, creation_time, max_cells_db, capacitor,
              bias_voltage, photon_energy, timeout
              additionally a timeout is given

        :param karabo_id: karabo identifier
        :param karabo_da: karabo data aggregator
        :param only_dark: load only dark image derived constants. This
            implies that a `calfile` is used to load the remaining
            constants. Useful to reduce DB traffic and interactions
            for non-frequently changing constants, i.e. such which are
            not usually updated during a beamtime.

        The `cal_db_interface` parameter in the `dbparms` tuple may be in
        one of the following notations:
            * tcp://host:port to directly identify the host and port to
              connect to
            * tcp://host:port_low#port_high to specify a port range from
              which a random port will be picked. E.g. specifying

              tcp://max-exfl-cal001:8015#8025

              will randomly pick an address in the range max-exfl-cal001:8015 and
              max-exfl-cal001:8025.


        The latter notation allows for load-balancing.

        This routine loads the following constants as given in
        `iCalibrationDB`:

            Dark Image Derived
            ------------------

            * Constants.LPD.Offset
            * Constants.LPD.Noise
            * Constants.LPD.BadPixelsDark


            CI  Derived
            -----------

            * Constants.LPD.SlopesCI
            * Constants.LPD.BadPixelsCI

            Flat-Field Derived
            ------------------

            * Constants.LPD.SlopesFF
            * Constants.LPD.BadPixelsFF

        """
        if len(dbparms) == 6:
            (cal_db_interface, creation_time, max_cells_db, capacitor,
             bias_voltage, photon_energy) = dbparms
            timeout = 300000
        else:
            (cal_db_interface, creation_time, max_cells_db, capacitor,
             bias_voltage, photon_energy, timeout) = dbparms
        #TODO: remove sw duplication during LPD correction modifications
        offset, when = get_constant_from_db_and_time(
            karabo_id, karabo_da,
            Constants.LPD.Offset(),
            Conditions.Dark.LPD(
                memory_cells=max_cells_db,
                bias_voltage=bias_voltage,
                capacitor=capacitor),
            np.zeros((256, 256, max_cells_db, 3)),
            cal_db_interface,
            creation_time=creation_time,
            timeout=timeout)

        noise = get_constant_from_db(karabo_id, karabo_da,
                                     Constants.LPD.Noise(),
                                     Conditions.Dark.LPD(
                                         memory_cells=max_cells_db,
                                         bias_voltage=bias_voltage,
                                         capacitor=capacitor),
                                     np.zeros((256, 256, max_cells_db, 3)),
                                     cal_db_interface,
                                     creation_time=creation_time,
                                     timeout=timeout)

        bpixels = get_constant_from_db(karabo_id, karabo_da,
                                       Constants.LPD.BadPixelsDark(),
                                       Conditions.Dark.LPD(
                                           memory_cells=max_cells_db,
                                           bias_voltage=bias_voltage,
                                           capacitor=capacitor),
                                       np.zeros((256, 256, max_cells_db, 3)),
                                       cal_db_interface,
                                       creation_time=creation_time,
                                       timeout=timeout).astype(np.uint32)

        # done loading constants and returning
        if only_dark:
            self.initialize(offset, None, None, bpixels, noise, None)
            return when

        slopesCI = get_constant_from_db(karabo_id, karabo_da,
                                        Constants.LPD.SlopesCI(),
                                        Conditions.Dark.LPD(
                                            memory_cells=max_cells_db,
                                            bias_voltage=bias_voltage,
                                            capacitor=capacitor),
                                        np.ones((256, 256, max_cells_db, 2)),
                                        cal_db_interface,
                                        creation_time=creation_time,
                                        timeout=timeout)

        rel_gains = slopesCI[..., 0]
        rel_gain_offset = slopesCI[..., 1]

        flat_fields = np.squeeze(
            get_constant_from_db(karabo_id, karabo_da,
                                 Constants.LPD.SlopesFF(),
                                 Conditions.Illuminated.LPD(max_cells_db,
                                                            bias_voltage,
                                                            photon_energy,
                                                            pixels_x=256,
                                                            pixels_y=256,
                                                            beam_energy=None,
                                                            capacitor=capacitor),  # noqa
                                 np.ones((256, 256)),
                                 cal_db_interface,
                                 creation_time=creation_time,
                                 timeout=timeout))

        bpixels |= get_constant_from_db(karabo_id, karabo_da,
                                        Constants.LPD.BadPixelsCI(),
                                        Conditions.Dark.LPD(
                                            memory_cells=max_cells_db,
                                            bias_voltage=bias_voltage,
                                            capacitor=capacitor),
                                        np.zeros((256, 256, max_cells_db, 3)),
                                        cal_db_interface,
                                        creation_time=creation_time,
                                        timeout=timeout).astype(np.uint32)

        bpix = get_constant_from_db(karabo_id, karabo_da,
                                    Constants.LPD.BadPixelsFF(),
                                    Conditions.Illuminated.LPD(
                                        max_cells_db, bias_voltage,
                                        photon_energy,
                                        pixels_x=256, pixels_y=256,
                                        beam_energy=None,
                                        capacitor=capacitor),
                                    np.zeros((256, 256, max_cells_db)),
                                    cal_db_interface,
                                    creation_time=creation_time,
                                    timeout=timeout).astype(np.uint32)
        bpixels |= bpix[..., None]
        self.initialize(offset, rel_gains, rel_gain_offset, bpixels, noise,
                        flat_fields)
        return when

    def initialize_from_file(self, filename, qm, with_dark=True):
        """ Initialize calibration constants from a calibration file

        :param filename: path to a file containing the calibration
        constants. It is expected to have the following structure:

            /{qm}/BadPixelsFF/data
            /{qm}/BadPixelsCI/data
            /{qm}/Offset/data
            /{qm}/Noise/data
            /{qm}/BadPixelsDark/data
            /{qm}/SlopesFF/data
            /{qm}/SlopesCI/data

            where qm is the `qm` parameter.

        :param qm: quadrant and module of the constants to load in Q1M1
            notation
        :param with_dark: also load dark image derived constants from the
            file. This will overwrite any constants previously loaded from
            the calibration database.

        """
        offsets = None
        noises = None
        with h5py.File(filename, "r") as calfile:
            bpixels = calfile["{}/{}/data".format(qm, "BadPixelsCI")][()]
            bpix = calfile["{}/{}/data".format(qm, "BadPixelsFF")][()]
            bpixels |= bpix[..., None, None]
            if with_dark:
                offsets = calfile["{}/{}/data".format(qm, "Offset")][()]
                noises = calfile["{}/{}/data".format(qm, "Noise")][()]
                bpix = calfile["{}/{}/data".format(qm, "BadPixelsDark")]
                bpixels |= bpix[()]

            slopesCI = calfile["{}/{}/data".format(qm, "SlopesCI")][()]
            rel_gains = slopesCI[..., 0]
            rel_gains_b = slopesCI[..., 1]

            flat_fields = calfile["{}/{}/data".format(qm, "SlopesFF")][()][
                          ::-1, ::-1]

        self.initialize(offsets, rel_gains, rel_gains_b, bpixels, noises,
                        flat_fields)

__init__(infile, outfile, max_cells, channel, max_pulses, bins_gain_vs_signal, bins_signal_low_range, bins_signal_high_range, raw_fmt_version=2, chunk_size=512, h5_data_path='INSTRUMENT/FXE_DET_LPD1M-1/DET/{}CH0:xtdf/', h5_index_path='INDEX/FXE_DET_LPD1M-1/DET/{}CH0:xtdf/', do_ff=True, correct_non_linear=True, karabo_data_mode=False, linear_between=None, mark_non_lin_region=True, nlc_version=2)

Initialize an LpdCorrections Class

:param infile: to be corrected h5py input file :param outfile: writeable h5py output file :param channel: module/channel to correct :param max_pulses: maximum pulse id to consider for preview histograms :param bins_gain_vs_signal: number of bins for gain vs signal histogram :param bins_signal_low_range: number of bins for the low signal range histogram :param bins_signal_high_range: number of bins for the high signal range histogram :param raw_fmt_version: raw file format version to use :param chunk_size: images per chunk to compute upon :param h5_data_path: path in HDF5 file which is prefixed to the image/data section :param h5_index_path: path in HDF5 file which is prefixed to the index section :param do_ff: perform flat field corrections :param correct_non_linear: perform non-linear transition region corr. :param karabo_data_mode: set to true to use data iterated with karabo data

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def __init__(self, infile, outfile, max_cells, channel, max_pulses,
             bins_gain_vs_signal, bins_signal_low_range,
             bins_signal_high_range,
             raw_fmt_version=2, chunk_size=512,
             h5_data_path="INSTRUMENT/FXE_DET_LPD1M-1/DET/{}CH0:xtdf/",
             h5_index_path="INDEX/FXE_DET_LPD1M-1/DET/{}CH0:xtdf/",
             do_ff=True, correct_non_linear=True, karabo_data_mode=False,
             linear_between=None, mark_non_lin_region=True, nlc_version=2):
    """
    Initialize an LpdCorrections Class

    :param infile: to be corrected h5py input file
    :param outfile: writeable h5py output file
    :param channel: module/channel to correct
    :param max_pulses: maximum pulse id to consider for preview histograms
    :param bins_gain_vs_signal: number of bins for gain vs signal histogram
    :param bins_signal_low_range: number of bins for the low signal
        range histogram
    :param bins_signal_high_range: number of bins for the high signal
        range histogram
    :param raw_fmt_version: raw file format version to use
    :param chunk_size: images per chunk to compute upon
    :param h5_data_path: path in HDF5 file which is prefixed to the
        image/data section
    :param h5_index_path: path in HDF5 file which is prefixed to the
        index section
    :param do_ff: perform flat field corrections
    :param correct_non_linear: perform non-linear transition region corr.
    :param karabo_data_mode: set to true to use data iterated with karabo
        data
    """
    self.lpd_base = h5_data_path.format(channel)
    self.idx_base = h5_index_path.format(channel)
    self.infile = infile
    self.outfile = outfile
    self.channel = channel
    self.index_v = raw_fmt_version
    self.chunksize = chunk_size
    self.initialized = False
    self.max_pulses = max_pulses
    self.max_cells = max_cells
    self.hists_signal_low = 0
    self.hists_signal_high = 0
    self.hists_gain_vs_signal = 0
    self.bins_gain_vs_signal = bins_gain_vs_signal
    self.bins_signal_low_range = bins_signal_low_range
    self.bins_signal_high_range = bins_signal_high_range
    self.cidx = 0
    self.do_ff = do_ff
    filter_modules = []
    self.filter_cells = [0, 1] if channel in filter_modules else []
    self.cnl = correct_non_linear
    self.karabo_data_mode = karabo_data_mode
    self.linear_between = linear_between
    self.mark_nonlin = mark_non_lin_region
    self.nlc_version = nlc_version
    # emprically determined from APD datasets p900038, r155,r156
    # emprically determined from APD datasets p900038, r155,r156
    self.cnl_const = {
        'high': {'A': -0.000815934, 'lam': 0.00811281, 'c': 1908.89,
                 'm': 0, 'b': 0},
        'med': {'A': 0.0999894, 'lam': -0.00137652, 'c': 3107.83,
                'm': 3.89982e-06, 'b': -0.116811},
        'low': {'A': 0.0119132, 'lam': -0.0002, 'c': 36738.6,
                'm': 2.00273e-08, 'b': 0.245537}}

copy_and_sanitize_non_cal_data()

Copy and sanitize data in infile that is not touched by correctLPD

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def copy_and_sanitize_non_cal_data(self):
    """ Copy and sanitize data in `infile` that is not touched by
    `correctLPD`
    """
    lpd_base = self.lpd_base
    idx_base = self.idx_base
    first_index = self.first_index
    last_index = self.last_index
    firange = self.firange
    alltrains = self.infile[lpd_base + "image/trainId"]
    alltrains = np.squeeze(alltrains[first_index:last_index, ...])
    dont_copy = ["data", "cellId", "trainId", "pulseId", "status",
                 "length"]
    dont_copy = [lpd_base + "image/{}".format(do)
                 for do in dont_copy]

    dont_copy += [idx_base + "{}/first".format(do)
                  for do in ["image"]]
    dont_copy += [idx_base + "{}/count".format(do)
                  for do in ["image"]]

    # a visitor to copy outstanding data
    def visitor(k, item):
        if k not in dont_copy:

            if isinstance(item, h5py.Group):
                self.outfile.create_group(k)
            elif isinstance(item, h5py.Dataset):
                group = str(k).split("/")
                group = "/".join(group[:-1])
                self.infile.copy(k, self.outfile[group])

    self.infile.visititems(visitor)
    # sanitize indices
    for do in ["image", ]:
        uq, fidxv, cntsv = np.unique(alltrains[firange - firange[0]],
                                     return_index=True,
                                     return_counts=True)

        duq = (uq[1:] - uq[:-1]).astype(np.int64)

        cfidxv = [fidxv[0], ]
        ccntsv = [cntsv[0], ]
        for i, du in enumerate(duq.tolist()):
            if du > 1000:
                du = 1
                cntsv[i] = 0
            cfidxv += [0] * (du - 1) + [fidxv[i + 1], ]
            ccntsv += [0] * (du - 1) + [cntsv[i + 1], ]

        mv = len(cfidxv)
        fidx = np.zeros(len(cfidxv), fidxv.dtype)
        fidx[self.valid[:mv]] = np.array(cfidxv)[self.valid[:mv]]

        for i in range(len(fidx) - 1, 2, -1):
            if fidx[i - 1] == 0 and fidx[i] != 0:
                fidx[i - 1] = fidx[i]

        cnts = np.zeros(len(cfidxv), cntsv.dtype)
        cnts[self.valid[:mv]] = np.array(ccntsv)[self.valid[:mv]]

        self.outfile.create_dataset(idx_base + "{}/first".format(do),
                                    fidx.shape,
                                    dtype=fidx.dtype,
                                    data=fidx,
                                    fletcher32=True)
        self.outfile.create_dataset(idx_base + "{}/count".format(do),
                                    cnts.shape,
                                    dtype=cnts.dtype,
                                    data=cnts,
                                    fletcher32=True)

correct_lpd(irange)

Correct Raw LPD data for offset and relative gain effects

:param irange: list of image indices to work on, should be contiguous, or karabo_data structure to work on

Will raise an RuntimeError if initialize() has not been called first.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def correct_lpd(self, irange):
    """ Correct Raw LPD data for offset and relative gain effects

    :param irange: list of image indices to work on, should be contiguous,
                   or karabo_data structure to work on

    Will raise an RuntimeError if `initialize()` has not been called first.
    """
    if not self.initialized:
        raise RuntimeError("Must call initialize() first!")
    if not self.karabo_data_mode:
        lpd_base = self.lpd_base
        cidx = self.cidx
        im = np.array(self.infile[lpd_base + "image/data"][irange, ...])
        trainId = self.infile[lpd_base + "/image/trainId"][irange, ...]
        trainId = np.squeeze(trainId)
        pulseId = self.infile[lpd_base + "image/pulseId"][irange, ...]
        pulseId = np.squeeze(pulseId)
        status = self.infile[lpd_base + "/image/status"][irange, ...]
        status = np.squeeze(status)
        cells = self.infile[lpd_base + "/image/cellId"][irange, ...]
        cells = np.squeeze(cells)
        length = self.infile[lpd_base + "/image/length"][irange, ...]
        length = np.squeeze(length)
    else:
        cidx = 1  # do not produce any histograms
        im = irange['image.data']
        trainId = np.squeeze(irange['image.trainId'])
        status = np.squeeze(irange['image.status'])
        pulseId = np.squeeze(irange['image.pulseId'])
        cells = np.squeeze(irange['image.cellId'])
        length = np.squeeze(irange['image.length'])

    # split gain and image info into separate arrays
    im, gain = self.split_gain(im[:, 0, ...])

    # we need data as float from here on
    im = im.astype(np.float32)

    # invalid gain values
    im[gain > 2] = np.nan

    # on first iteration create histograms for reports
    if cidx == 0:
        H, xe, ye = np.histogram2d(im.flatten(), gain.flatten(),
                                   bins=self.bins_gain_vs_signal,
                                   range=[[0, 4096], [0, 4]])
        self.hists_gain_vs_signal += H
        self.signal_edges = (xe, ye)

    # zero invalid gains
    gain[gain > 2] = 0

    #  select constants by memory cells
    om = self.offset[cells, ...]
    rc = self.rel_gain[cells, ...]
    rbc = self.rel_gain_b[cells, ...]

    # and then by gain setting
    og = np.choose(gain, (om[..., 0], om[..., 1], om[..., 2]))
    rg = np.choose(gain, (rc[..., 0], rc[..., 1], rc[..., 2]))
    rgb = np.choose(gain, (rbc[..., 0], rbc[..., 1], rbc[..., 2]))

    mskg = self.mask[cells, ...]
    msk = np.choose(gain, (mskg[..., 0], mskg[..., 1], mskg[..., 2]))

    # correct offset
    im -= og

    nlf = 0
    if self.mark_nonlin and self.linear_between is not None:
        for gl, lr in enumerate(self.linear_between):

            midx = (gain == gl) & ((im < lr[0]) | (im > lr[1]))
            msk[midx] = BadPixels.NON_LIN_RESPONSE_REGION
            numnonlin = np.count_nonzero(midx, axis=(1,2))
            nlf += numnonlin
        nlf = nlf/float(im.shape[0] * im.shape[1])

    # hacky way of smoothening transition region between med and low

    cfac = 1
    if self.nlc_version == 1 and self.cnl:
        cfac = 0.314 * np.exp(-im * 0.001)

    # perform relative gain correction with additional gain-deduced
    # offset
    im = (im - rgb) / rg
    if self.do_ff:
        im /= self.flatfield[None, :, :]

    # hacky way of smoothening transition region between med and low
    if self.nlc_version == 1 and self.cnl:
        im[gain == 2] -= im[gain == 2] * cfac[gain == 2]

    # perform non-linear corrections if requested
    if self.cnl and self.nlc_version == 2:
        def lin_exp_fun(x, m, b, A, lam, c):
            return m * x + b + A * np.exp(lam * (x - c))

        x = im[(gain == 0)]
        cnl = self.cnl_const['high']
        cf = lin_exp_fun(x, cnl['m'], cnl['b'], cnl['A'], cnl['lam'],
                         cnl['c'])
        im[(gain == 0)] -= np.maximum(cf, -0.2) * x

        x = im[(gain == 1)]
        cnl = self.cnl_const['med']
        cf = lin_exp_fun(x, cnl['m'], cnl['b'], cnl['A'], cnl['lam'],
                         cnl['c'])
        im[(gain == 1)] -= np.minimum(cf, 0.2) * x

        x = im[(gain == 2)]
        cnl = self.cnl_const['low']
        cf = lin_exp_fun(x, cnl['m'], cnl['b'], cnl['A'], cnl['lam'],
                         cnl['c'])
        im[(gain == 2)] -= np.minimum(cf, 0.45) * x

    # create bad pixels masks, here non-finite values
    bidx = ~np.isfinite(im)
    im[bidx] = 0
    msk[bidx] = BadPixels.VALUE_IS_NAN

    # values which are unphysically large or small
    bidx = (im < -1e7) | (im > 1e7)
    im[bidx] = 0
    msk[bidx] = BadPixels.VALUE_OUT_OF_RANGE

    # on first iteration we create histograms for the report
    if cidx == 0:
        copim = copy.copy(im)
        copim[copim < self.median_noise] = np.nan
        bins = (self.bins_signal_low_range, self.max_pulses)
        rnge = [[-50, 1000], [0, self.max_pulses + 1]]
        H, xe, ye = np.histogram2d(np.nanmean(copim, axis=(1, 2)),
                                   pulseId,
                                   bins=bins,
                                   range=rnge)
        self.hists_signal_low += H
        self.low_edges = (xe, ye)
        bins = (self.bins_signal_high_range, self.max_pulses)
        rnge = [[0, 200000], [0, self.max_pulses + 1]]
        H, xe, ye = np.histogram2d(np.nanmean(copim, axis=(1, 2)),
                                   pulseId,
                                   bins=bins,
                                   range=rnge)
        self.hists_signal_high += H
        self.high_edges = (xe, ye)

    if not self.karabo_data_mode:
        # write data out
        # upper end of indices we are processing
        nidx = int(cidx + irange.size)
        self.ddset[cidx:nidx, ...] = im
        self.gdset[cidx:nidx, ...] = gain
        self.mdset[cidx:nidx, ...] = msk

        self.outfile[lpd_base + "image/cellId"][cidx:nidx] = cells
        self.outfile[lpd_base + "image/trainId"][cidx:nidx] = trainId
        self.outfile[lpd_base + "image/pulseId"][cidx:nidx] = pulseId
        self.outfile[lpd_base + "image/status"][cidx:nidx] = status
        self.outfile[lpd_base + "image/length"][cidx:nidx] = length
        if self.mark_nonlin:
            self.outfile[lpd_base + "image/nonLinear"][cidx:nidx] = nlf
        self.cidx = nidx
    else:
        irange['image.data'] = im
        irange['image.gain'] = gain
        irange['image.mask'] = msk
        irange['image.cellId'] = cells
        irange['image.trainId'] = trainId
        irange['image.pulseId'] = pulseId
        irange['image.status'] = status
        irange['image.length'] = length
        return irange

create_output_datasets()

Initialize output data sets

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def create_output_datasets(self):
    """ Initialize output data sets
    """
    lpdbase = self.lpd_base
    chunksize = self.chunksize
    firange = self.firange
    oshape = self.oshape
    chunks = (chunksize, oshape[1], oshape[2])
    self.ddset = self.outfile.create_dataset(lpdbase + "image/data",
                                             oshape, chunks=chunks,
                                             dtype=np.float32,
                                             fletcher32=True)
    self.gdset = self.outfile.create_dataset(lpdbase + "image/gain",
                                             oshape, chunks=chunks,
                                             dtype=np.uint8,
                                             compression="gzip",
                                             compression_opts=1,
                                             shuffle=True,
                                             fletcher32=True)
    self.mdset = self.outfile.create_dataset(lpdbase + "image/mask",
                                             oshape, chunks=chunks,
                                             dtype=np.uint32,
                                             compression="gzip",
                                             compression_opts=1,
                                             shuffle=True,
                                             fletcher32=True)
    fsz = firange.shape
    self.outfile.create_dataset(lpdbase + "image/cellId", fsz,
                                dtype=np.uint16, fletcher32=True)
    self.outfile.create_dataset(lpdbase + "image/trainId", fsz,
                                dtype=np.uint64, fletcher32=True)
    self.outfile.create_dataset(lpdbase + "image/pulseId", fsz,
                                dtype=np.uint64, fletcher32=True)
    self.outfile.create_dataset(lpdbase + "image/status", fsz,
                                dtype=np.uint16, fletcher32=True)
    self.outfile.create_dataset(lpdbase + "image/length", fsz,
                                dtype=np.uint32, fletcher32=True)

    if self.mark_nonlin:
        self.outfile.create_dataset(lpdbase + "image/nonLinear", fsz,
                                    dtype=np.float32, fletcher32=True)

gen_valid_range()

Generate an index range to pass to correctLPD.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def gen_valid_range(self):
    """ Generate an index range to pass to `correctLPD`.
    """
    first_index = self.first_index
    last_index = self.last_index
    max_cells = self.max_cells
    lpd_base = self.lpd_base
    allcells = self.infile[lpd_base + "image/cellId"]
    allcells = np.squeeze(allcells[first_index:last_index, ...])
    single_image = self.infile[lpd_base + "image/data"]
    single_image = np.array(single_image[first_index, ...])
    can_calibrate = allcells < max_cells
    for c in self.filter_cells:
        can_calibrate &= allcells != c
    if np.count_nonzero(can_calibrate) == 0:
        return
    allcells = allcells[can_calibrate]
    firange = np.arange(first_index, last_index)
    firange = firange[can_calibrate]
    self.oshape = (firange.size,
                   single_image.shape[1],
                   single_image.shape[2])
    self.firange = firange
    self.single_image = single_image

get_histograms()

Return preview histograms computed from the first chunk

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def get_histograms(self):
    """ Return preview histograms computed from the first chunk
    """
    return ((self.hists_signal_low, self.hists_signal_high,
             self.hists_gain_vs_signal),
            (self.low_edges, self.high_edges, self.signal_edges))

get_iteration_range()

Returns a range expression over which to iterate in chunks

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def get_iteration_range(self):
    """Returns a range expression over which to iterate in chunks
    """
    return np.array_split(self.firange,
                          self.firange.size // self.chunksize)

get_valid_image_idx()

Return the indices of valid data

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def get_valid_image_idx(self):
    """ Return the indices of valid data
    """
    lpd_base = self.idx_base
    if self.index_v == 2:
        count = np.squeeze(self.infile[lpd_base + "image/count"])
        first = np.squeeze(self.infile[lpd_base + "image/first"])
        if np.count_nonzero(count != 0) == 0:
            raise IOError("File has no valid counts")
        valid = count != 0
        idxtrains = np.squeeze(self.infile["/INDEX/trainId"])
        medianTrain = np.nanmedian(idxtrains)
        lowok = (idxtrains > medianTrain - 1e4)
        highok = (idxtrains < medianTrain + 1e4)
        valid &= lowok & highok

        last_index = int(first[valid][-1] + count[valid][-1])
        first_index = int(first[valid][0])

    elif self.index_v == 1:
        status = np.squeeze(self.infile[lpd_base + "image/status"])
        if np.count_nonzero(status != 0) == 0:
            raise IOError("File {} has no valid counts".format(
                self.infile))
        last = np.squeeze(self.infile[lpd_base + "image/last"])
        valid = status != 0

        idxtrains = np.squeeze(self.infile["/INDEX/trainId"])
        medianTrain = np.nanmedian(idxtrains)
        lowok = (idxtrains > medianTrain - 1e4)
        highok = (idxtrains < medianTrain + 1e4)
        valid &= lowok & highok

        last_index = int(last[valid][-1])
        first_index = int(last[valid][0])
    else:
        raise AttributeError(
            "Not a known raw format version: {}".format(self.index_v))

    self.valid = valid
    self.first_index = first_index
    self.last_index = last_index
    self.idxtrains = idxtrains

initialize(offset, rel_gain, rel_gain_offset, mask, noise, flatfield, swap_axis=True)

Initialize the calibration constants and the output data file.

Any data that is not touched by the corrections is copied into the output file at this point. Also data validity tests are performed. This functions must be called before correct_lpd is executed.

:param offset: offset map to use for corrections :param rel_gain: relative gain map to use for corrections :param rel_gain_offset: relative gain offset to use for corrections :param mask: bad pixel mask to use for corrections :param noise: noise map to use for corrections :param flatfield: flatfield map to use for corrections :param swap_axis: set to True if using data from the calibration database.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def initialize(self, offset, rel_gain, rel_gain_offset, mask, noise,
               flatfield, swap_axis=True):
    """ Initialize the calibration constants and the output data file.

    Any data that is not touched by the corrections is copied into the
    output file at this point. Also data validity tests are performed.
    This functions must be called before `correct_lpd` is executed.

    :param offset: offset map to use for corrections
    :param rel_gain: relative gain map to use for corrections
    :param rel_gain_offset: relative gain offset to use for corrections
    :param mask: bad pixel mask to use for corrections
    :param noise: noise map to use for corrections
    :param flatfield: flatfield map to use for corrections
    :param swap_axis: set to True if using data from the calibration
    database.
    """

    if swap_axis:
        if offset is not None:
            mvd = np.moveaxis(np.moveaxis(offset, 2, 0), 2, 1)
            offset = np.ascontiguousarray(mvd)
        if rel_gain is not None:
            mvd = np.moveaxis(np.moveaxis(rel_gain, 2, 0), 2, 1)
            rel_gain = np.ascontiguousarray(mvd)
        if mask is not None:
            mvd = np.moveaxis(np.moveaxis(mask, 2, 0), 2, 1)
            mask = np.ascontiguousarray(mvd)
        if rel_gain_offset is not None:
            mvd = np.moveaxis(np.moveaxis(rel_gain_offset, 2, 0), 2, 1)
            rel_gain_offset = np.ascontiguousarray(mvd)
        if noise is not None:
            mvd = np.moveaxis(np.moveaxis(noise, 2, 0), 2, 1)
            noise = np.ascontiguousarray(mvd)
        if flatfield is not None:
            flatfield = np.ascontiguousarray(flatfield)

    if offset is not None:
        self.offset = offset
    if rel_gain is not None:
        self.rel_gain = rel_gain
    if rel_gain_offset is not None:
        self.rel_gain_b = rel_gain_offset
    if mask is not None:
        if not hasattr(self, "mask"):
            self.mask = mask
        else:
            self.mask |= mask[:self.mask.shape[0], ...]
    if noise is not None:
        self.noise = noise
    if flatfield is not None:
        self.flatfield = flatfield

    if not self.initialized:
        self.median_noise = np.nanmedian(self.noise[0, ...])
        allcells = [self.offset.shape[0],
                    self.mask.shape[0],
                    self.max_cells]
        self.max_cells = np.min(allcells)
        if not self.karabo_data_mode:
            self.gen_valid_range()
            if self.firange.size < self.chunksize:
                self.chunksize = self.firange.size
            self.copy_and_sanitize_non_cal_data()
            self.create_output_datasets()
        self.initialized = True

initialize_from_db(dbparms, karabo_id, karabo_da, only_dark=False)

Initialize calibration constants from the calibration database

:param dbparms: a tuple containing relevant database parameters,

can be either
  • cal_db_interface, creation_time, max_cells_db, capacitor, bias_voltage, photon_energy in which case the db timeout is set to 300 seconds, the cells to query dark image derived constants from the database is set to the global value

  • cal_db_interface, creation_time, max_cells_db, capacitor, bias_voltage, photon_energy, timeout additionally a timeout is given

:param karabo_id: karabo identifier :param karabo_da: karabo data aggregator :param only_dark: load only dark image derived constants. This implies that a calfile is used to load the remaining constants. Useful to reduce DB traffic and interactions for non-frequently changing constants, i.e. such which are not usually updated during a beamtime.

The cal_db_interface parameter in the dbparms tuple may be in

one of the following notations
  • tcp://host:port to directly identify the host and port to connect to
  • tcp://host:port_low#port_high to specify a port range from which a random port will be picked. E.g. specifying

tcp://max-exfl-cal001:8015#8025

will randomly pick an address in the range max-exfl-cal001:8015 and max-exfl-cal001:8025.

The latter notation allows for load-balancing.

This routine loads the following constants as given in iCalibrationDB:

Dark Image Derived
------------------

* Constants.LPD.Offset
* Constants.LPD.Noise
* Constants.LPD.BadPixelsDark


CI  Derived
-----------

* Constants.LPD.SlopesCI
* Constants.LPD.BadPixelsCI

Flat-Field Derived
------------------

* Constants.LPD.SlopesFF
* Constants.LPD.BadPixelsFF
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def initialize_from_db(self, dbparms: List[Tuple['DBParms', 'DBParms_timeout']],
                       karabo_id: str, karabo_da: str,
                       only_dark: Optional[bool] = False):
    """ Initialize calibration constants from the calibration database

    :param dbparms: a tuple containing relevant database parameters,
    can be either:
        * cal_db_interface, creation_time, max_cells_db, capacitor,
          bias_voltage, photon_energy
          in which case the db timeout is set to 300 seconds,
          the cells to query dark image derived constants from the
          database is set to the global value

        * cal_db_interface, creation_time, max_cells_db, capacitor,
          bias_voltage, photon_energy, timeout
          additionally a timeout is given

    :param karabo_id: karabo identifier
    :param karabo_da: karabo data aggregator
    :param only_dark: load only dark image derived constants. This
        implies that a `calfile` is used to load the remaining
        constants. Useful to reduce DB traffic and interactions
        for non-frequently changing constants, i.e. such which are
        not usually updated during a beamtime.

    The `cal_db_interface` parameter in the `dbparms` tuple may be in
    one of the following notations:
        * tcp://host:port to directly identify the host and port to
          connect to
        * tcp://host:port_low#port_high to specify a port range from
          which a random port will be picked. E.g. specifying

          tcp://max-exfl-cal001:8015#8025

          will randomly pick an address in the range max-exfl-cal001:8015 and
          max-exfl-cal001:8025.


    The latter notation allows for load-balancing.

    This routine loads the following constants as given in
    `iCalibrationDB`:

        Dark Image Derived
        ------------------

        * Constants.LPD.Offset
        * Constants.LPD.Noise
        * Constants.LPD.BadPixelsDark


        CI  Derived
        -----------

        * Constants.LPD.SlopesCI
        * Constants.LPD.BadPixelsCI

        Flat-Field Derived
        ------------------

        * Constants.LPD.SlopesFF
        * Constants.LPD.BadPixelsFF

    """
    if len(dbparms) == 6:
        (cal_db_interface, creation_time, max_cells_db, capacitor,
         bias_voltage, photon_energy) = dbparms
        timeout = 300000
    else:
        (cal_db_interface, creation_time, max_cells_db, capacitor,
         bias_voltage, photon_energy, timeout) = dbparms
    #TODO: remove sw duplication during LPD correction modifications
    offset, when = get_constant_from_db_and_time(
        karabo_id, karabo_da,
        Constants.LPD.Offset(),
        Conditions.Dark.LPD(
            memory_cells=max_cells_db,
            bias_voltage=bias_voltage,
            capacitor=capacitor),
        np.zeros((256, 256, max_cells_db, 3)),
        cal_db_interface,
        creation_time=creation_time,
        timeout=timeout)

    noise = get_constant_from_db(karabo_id, karabo_da,
                                 Constants.LPD.Noise(),
                                 Conditions.Dark.LPD(
                                     memory_cells=max_cells_db,
                                     bias_voltage=bias_voltage,
                                     capacitor=capacitor),
                                 np.zeros((256, 256, max_cells_db, 3)),
                                 cal_db_interface,
                                 creation_time=creation_time,
                                 timeout=timeout)

    bpixels = get_constant_from_db(karabo_id, karabo_da,
                                   Constants.LPD.BadPixelsDark(),
                                   Conditions.Dark.LPD(
                                       memory_cells=max_cells_db,
                                       bias_voltage=bias_voltage,
                                       capacitor=capacitor),
                                   np.zeros((256, 256, max_cells_db, 3)),
                                   cal_db_interface,
                                   creation_time=creation_time,
                                   timeout=timeout).astype(np.uint32)

    # done loading constants and returning
    if only_dark:
        self.initialize(offset, None, None, bpixels, noise, None)
        return when

    slopesCI = get_constant_from_db(karabo_id, karabo_da,
                                    Constants.LPD.SlopesCI(),
                                    Conditions.Dark.LPD(
                                        memory_cells=max_cells_db,
                                        bias_voltage=bias_voltage,
                                        capacitor=capacitor),
                                    np.ones((256, 256, max_cells_db, 2)),
                                    cal_db_interface,
                                    creation_time=creation_time,
                                    timeout=timeout)

    rel_gains = slopesCI[..., 0]
    rel_gain_offset = slopesCI[..., 1]

    flat_fields = np.squeeze(
        get_constant_from_db(karabo_id, karabo_da,
                             Constants.LPD.SlopesFF(),
                             Conditions.Illuminated.LPD(max_cells_db,
                                                        bias_voltage,
                                                        photon_energy,
                                                        pixels_x=256,
                                                        pixels_y=256,
                                                        beam_energy=None,
                                                        capacitor=capacitor),  # noqa
                             np.ones((256, 256)),
                             cal_db_interface,
                             creation_time=creation_time,
                             timeout=timeout))

    bpixels |= get_constant_from_db(karabo_id, karabo_da,
                                    Constants.LPD.BadPixelsCI(),
                                    Conditions.Dark.LPD(
                                        memory_cells=max_cells_db,
                                        bias_voltage=bias_voltage,
                                        capacitor=capacitor),
                                    np.zeros((256, 256, max_cells_db, 3)),
                                    cal_db_interface,
                                    creation_time=creation_time,
                                    timeout=timeout).astype(np.uint32)

    bpix = get_constant_from_db(karabo_id, karabo_da,
                                Constants.LPD.BadPixelsFF(),
                                Conditions.Illuminated.LPD(
                                    max_cells_db, bias_voltage,
                                    photon_energy,
                                    pixels_x=256, pixels_y=256,
                                    beam_energy=None,
                                    capacitor=capacitor),
                                np.zeros((256, 256, max_cells_db)),
                                cal_db_interface,
                                creation_time=creation_time,
                                timeout=timeout).astype(np.uint32)
    bpixels |= bpix[..., None]
    self.initialize(offset, rel_gains, rel_gain_offset, bpixels, noise,
                    flat_fields)
    return when

initialize_from_file(filename, qm, with_dark=True)

Initialize calibration constants from a calibration file

:param filename: path to a file containing the calibration constants. It is expected to have the following structure:

/{qm}/BadPixelsFF/data
/{qm}/BadPixelsCI/data
/{qm}/Offset/data
/{qm}/Noise/data
/{qm}/BadPixelsDark/data
/{qm}/SlopesFF/data
/{qm}/SlopesCI/data

where qm is the `qm` parameter.

:param qm: quadrant and module of the constants to load in Q1M1 notation :param with_dark: also load dark image derived constants from the file. This will overwrite any constants previously loaded from the calibration database.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def initialize_from_file(self, filename, qm, with_dark=True):
    """ Initialize calibration constants from a calibration file

    :param filename: path to a file containing the calibration
    constants. It is expected to have the following structure:

        /{qm}/BadPixelsFF/data
        /{qm}/BadPixelsCI/data
        /{qm}/Offset/data
        /{qm}/Noise/data
        /{qm}/BadPixelsDark/data
        /{qm}/SlopesFF/data
        /{qm}/SlopesCI/data

        where qm is the `qm` parameter.

    :param qm: quadrant and module of the constants to load in Q1M1
        notation
    :param with_dark: also load dark image derived constants from the
        file. This will overwrite any constants previously loaded from
        the calibration database.

    """
    offsets = None
    noises = None
    with h5py.File(filename, "r") as calfile:
        bpixels = calfile["{}/{}/data".format(qm, "BadPixelsCI")][()]
        bpix = calfile["{}/{}/data".format(qm, "BadPixelsFF")][()]
        bpixels |= bpix[..., None, None]
        if with_dark:
            offsets = calfile["{}/{}/data".format(qm, "Offset")][()]
            noises = calfile["{}/{}/data".format(qm, "Noise")][()]
            bpix = calfile["{}/{}/data".format(qm, "BadPixelsDark")]
            bpixels |= bpix[()]

        slopesCI = calfile["{}/{}/data".format(qm, "SlopesCI")][()]
        rel_gains = slopesCI[..., 0]
        rel_gains_b = slopesCI[..., 1]

        flat_fields = calfile["{}/{}/data".format(qm, "SlopesFF")][()][
                      ::-1, ::-1]

    self.initialize(offsets, rel_gains, rel_gains_b, bpixels, noises,
                    flat_fields)

split_gain(d) staticmethod

Split gain information off 16-bit LPD data

Gain information can be found in bits 12 and 13 (0-based)

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
@staticmethod
def split_gain(d):
    """ Split gain information off 16-bit LPD data

    Gain information can be found in bits 12 and 13 (0-based)
    """
    msk = np.zeros(d.shape, np.uint16)
    msk[...] = 0b0000111111111111
    data = np.bitwise_and(d, msk)

    gain = np.right_shift(d, 12)
    msk[...] = 0b0000000000000011
    gain = np.bitwise_and(gain, msk)
    return data, gain

get_mem_cell_pattern(run, sources)

Load the memory cell order to use as a condition to find constants

This looks at the first train for each source, issuing a warning if the pattern differs between sources.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def get_mem_cell_pattern(run, sources) -> np.ndarray:
    """Load the memory cell order to use as a condition to find constants

    This looks at the first train for each source, issuing a warning if the
    pattern differs between sources.
    """
    patterns = []
    for source in sources:
        cell_id_data = run[source, 'image.cellId'].drop_empty_trains()
        if len(cell_id_data.train_ids) == 0:
            continue  # No data for this module
        cell_ids = cell_id_data[0].ndarray().flatten()
        if not any(np.array_equal(cell_ids, p) for p in patterns):
            patterns.append(cell_ids)

    if len(patterns) > 1:
        warn("Memory cell order varies between detector modules: "
             "; ".join([f"{s[:10]}...{s[-10:]}" for s in patterns]))
    elif not patterns:
        raise ValueError("Couldn't find memory cell order for any modules")

    return patterns[0]

make_cell_order_condition(use_param, cellid_pattern)

Convert the cell ID array to a condition string, or None if not used

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def make_cell_order_condition(use_param, cellid_pattern) -> Optional[str]:
    """Convert the cell ID array to a condition string, or None if not used"""
    if use_param == 'auto':
        # auto -> use cell order if it wraps around (cells not filled monotonically)
        use = len(cellid_pattern) > 2 and (
                np.diff(cellid_pattern.astype(np.int32)) < 0
        ).any()
    else:
        use = (use_param == 'always')

    return (",".join([str(c) for c in cellid_pattern]) + ",") if use else None

sort_dark_runs_by_gain(raw_folder, runs, ctrl_src)

Check gain factors [100, 10, 1] to decide, if provided dark runs are in the correct order.

Parameters:

Name Type Description Default
raw_folder Union[str, Path]

The raw data path.

required
runs list

A list of 3 runs.

required
ctrl_src str

The CTRL source to check RUN/.../femAsicGain.

required
Return

(list): Sorted dark runs.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/lpdlib.py
def sort_dark_runs_by_gain(raw_folder, runs, ctrl_src):
    """Check gain factors [100, 10, 1] to decide,
    if provided dark runs are in the correct order.

    Args:
        raw_folder (Union[str, Path]): The raw data path.
        runs (list): A list of 3 runs.
        ctrl_src (str): The CTRL source to check `RUN/.../femAsicGain`.
    Return:
        (list): Sorted dark runs.
    """
    run_to_gain = dict()
    expected_gain_factors = [100, 10, 1]
    assert len(set(runs)) == 3, f"A list of {len(runs)} runs is provided, three different dark runs are expected." # noqa

    for r in runs:
        run_to_gain[r] = RunDirectory(
            Path(raw_folder) / f"r{r:04d}").get_run_value(
                ctrl_src, "femAsicGain")

    if len(set(run_to_gain.values())) < 3:
        raise ValueError(
            f"Incorrect gain factors for the provided dark runs: {run_to_gain}."
            f" The expected gain factors for these runs are {expected_gain_factors}.")

    sorted_run_to_gain = dict(sorted(
        run_to_gain.items(),
        key=lambda x: expected_gain_factors.index(x[1])))

    sorted_runs = list(sorted_run_to_gain.keys())
    if list(run_to_gain.values()) != expected_gain_factors:
        warning(
            "Dark runs were incorrectly sorted and "
            f"have now been corrected to: {sorted_runs}.")

    return sorted_runs