Skip to content

agipdutils

baseline_correct_via_noise(d, noise, g, threshold)

Correct baseline shifts by evaluating position of the noise peak

:param: d: the data to correct, should be a single image :param noise: the mean noise for the cell id of d :param g: gain array matching d array :param threshold: threshold below which corrected is not performed

Correction is done by identifying the left-most (significant) peak in the histogram of d and shifting it to be centered on 0. This is done via a continous wavelet transform (CWT), using a Ricker wavelet.

Only high gain data is evaulated, and data larger than 50 ADU, equivalent of roughly a 9 keV photon is ignored.

Two passes are executed: the first shift is accurate to 10 ADU and will catch baseline shifts smaller then -2000 ADU. CWT is evaluated for peaks of widths one, three and five time the noise. The baseline is then shifted by the position of the summed CWT maximum.

In a second pass histogram is evaluated within a range of +- 5*noise of the initial shift value, for peak of width noise.

Baseline shifts larger than the maximum threshold or positive shifts larger 10 are discarded, and the original data is returned, otherwise the shift corrected data is returned.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def baseline_correct_via_noise(d, noise, g, threshold):
    """ Correct baseline shifts by evaluating position of the noise peak

    :param: d: the data to correct, should be a single image
    :param noise: the mean noise for the cell id of `d`
    :param g: gain array matching `d` array
    :param threshold: threshold below which corrected is not performed

    Correction is done by identifying the left-most (significant) peak
    in the histogram of `d` and shifting it to be centered on 0.
    This is done via a continous wavelet transform (CWT), using a Ricker
    wavelet.

    Only high gain data is evaulated, and data larger than 50 ADU,
    equivalent of roughly a 9 keV photon is ignored.

    Two passes are executed: the first shift is accurate to 10 ADU and
    will catch baseline shifts smaller then -2000 ADU. CWT is evaluated
    for peaks of widths one, three and five time the noise.
    The baseline is then shifted by the position of the summed CWT maximum.

    In a second pass histogram is evaluated within a range of
    +- 5*noise of the initial shift value, for peak of width noise.

    Baseline shifts larger than the maximum threshold or positive shifts
    larger 10 are discarded, and the original data is returned, otherwise
    the shift corrected data is returned.

    """

    seln = (g == 0) & (d <= 50)
    h, e = np.histogram(d[seln], bins=210, range=(-2000, 100))
    c = (e[1:] + e[:-1]) / 2

    try:
        cwtmatr = cwt(h, ricker, [noise, 3. * noise, 5. * noise])
    except:
        return d, 0
    cwtall = np.sum(cwtmatr, axis=0)
    marg = np.argmax(cwtall)
    pc = c[marg]

    high_res_range = (d > pc - 5 * noise) & (d < pc + 5 * noise)
    h, e = np.histogram(d[seln & high_res_range], bins=200,
                        range=(pc - 5 * noise, pc + 5 * noise))
    c = (e[1:] + e[:-1]) / 2
    try:
        cwtmatr = cwt(h, ricker, [noise, ])
    except:
        return d, 0
    marg = np.argmax(cwtmatr)
    pc = c[marg]

    # too large shift to be easily decernable via noise
    if pc > 10 or pc < threshold:
        return d, 0
    return d - pc, pc

baseline_correct_via_stripe(d, g, m, frac_high_med)

Correct baseline shifts using shadowed stripes

:param d: the data to correct, should be offset corrected single image :param frac_high_med: ratio of high to medium PC slopes :param g: gain array matching d array :param m: bad pixel mask

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def baseline_correct_via_stripe(d, g, m, frac_high_med):
    """ Correct baseline shifts using shadowed stripes

    :param d: the data to correct, should be offset corrected single image
    :param frac_high_med: ratio of high to medium PC slopes
    :param g: gain array matching `d` array
    :param m: bad pixel mask
    """

    # Restrict the mask to those bad pixels obtained from darks.
    # This has ben introduced since the BadPixelsFF constants can add
    # additional masking that causes the baseline correction to fail due
    # to the abort conditions len(idx) < 3 below.
    # (see calibration/planning#96)
    m = m & (BadPixels.OFFSET_OUT_OF_THRESHOLD |
             BadPixels.NOISE_OUT_OF_THRESHOLD |
             BadPixels.OFFSET_NOISE_EVAL_ERROR |
             BadPixels.NO_DARK_DATA)

    dd = copy.copy(d)
    dd[g != 0] = np.nan  # only high gain data
    dd[m != 0] = np.nan  # only good pixels

    sh_rows = get_shadowed_stripe(dd, 30, 0.95)

    if sh_rows.sum() < 3:
        return d, 0

    shift = np.nanmedian(dd[sh_rows, :])
    d[g == 0] -= shift
    d[g == 1] -= shift / frac_high_med
    return d, shift

cast_array_inplace(inp, dtype)

Cast an ndarray to a different dtype in place.

The resulting array will occupy the same memory as the input array, and the cast will most likely make interpretating the buffer content through the input array nonsensical.

Parameters:

Name Type Description Default
inp ndarray

Input array to cast, must be contiguous and castable to the target dtype without copy.

required
dtype DTypeLike

Data type to cast to.

required
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def cast_array_inplace(inp, dtype):
    """Cast an ndarray to a different dtype in place.

    The resulting array will occupy the same memory as the input array,
    and the cast will most likely make interpretating the buffer content
    through the input array nonsensical.

    Args:
        inp (ndarray): Input array to cast, must be contiguous and
            castable to the target dtype without copy.
        dtype (DTypeLike): Data type to cast to.
    """

    # Save shape to recast later
    orig_shape = inp.shape

    # Create a new view of the input and flatten it in-place. Unlike
    # inp.reshape(-1) this operation fails if a copy is required.
    inp = inp.view()
    inp.shape = inp.size

    # Create a new view with the target dtype and slice it to the number
    # of elements the input array has. This accounts for smaller dtypes
    # using less space for the same size.
    # The output array will be contiguous but not own its data.
    outp = inp.view(dtype)[:len(inp)]

    # "Copy" over the data, performing the cast.
    outp[:] = inp

    # Reshape back to the original.
    outp.shape = orig_shape

    return outp

contiguous_regions(condition)

Finds contiguous True regions of the boolean array "condition". Returns a 2D array where the first column is the start index of the region and the second column is the end index.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def contiguous_regions(condition):
    """Finds contiguous True regions of the boolean array "condition". Returns
    a 2D array where the first column is the start index of the region and the
    second column is the end index."""

    # Find the indices of changes in "condition"
    d = np.diff(condition)
    idx, = d.nonzero()

    # We need to start things after the change in "condition". Therefore,
    # we'll shift the index by 1 to the right.
    idx += 1

    if condition[0]:
        # If the start of condition is True prepend a 0
        idx = np.r_[0, idx]

    if condition[-1]:
        # If the end of condition is True, append the length of the array
        idx = np.r_[idx, condition.size] # Edit

    # Reshape the result into two columns
    idx.shape = (-1,2)
    return idx

correct_baseline_via_hist(d, pcm, g)

Correct baseline shifts by matching edges of high and medium gain histogram

:param d: single image to correct :param pcm: relative gain slope for medium gain of each pixel in d :param g: gain values for d

As a preparation the median value of medium gain pixels is shifted in increments of 50 ADU as long as it is negative.

Correction is then performed by evaluating histograms for high gain and medium gain pixels in d. The right-most significant bin of the high gain histogram is then shifted such that it coincides with the left-most significant bin of the medium gain histogram. Significant here means that bin counts are at least 10% of the average bin count of the histogram for all bins larger 10 counts. This initial evaluation uses histograms in range between -10000 and +10000 ADU with a resolution of 100 ADU per bin. The shift required to match both histogram borders is an initial estimate for the baseline shift.

Finally, the mean bin count of the five outermost bins of the high and medium gain histograms is compared. The baseline is shifted by increments of 1 ADU until they are within 20% of each other.

From this point on additional shifts are performed while the deviation stays within 30% of each other. The final shift is then evaluated as the point of minimal deviation of these values.

A maximum iteration count of 200 is imposed on the procedure. If no convergence is reached within this limit, the original data is returned, otherwise the baseline corrected data is returned.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def correct_baseline_via_hist(d, pcm, g):
    """ Correct baseline shifts by matching edges of high and medium
    gain histogram

    :param d: single image to correct
    :param pcm: relative gain slope for medium gain of each pixel in `d`
    :param g: gain values for `d`

    As a preparation the median value of medium gain pixels is shifted in
    increments of 50 ADU as long as it is negative.

    Correction is then performed by evaluating histograms for high gain
    and medium gain pixels in `d`. The right-most significant bin of the
    high gain histogram is then shifted such that it coincides with the
    left-most significant bin of the medium gain histogram. Significant
    here means that bin counts are at least 10% of the average bin count
    of the histogram for all bins larger 10 counts. This initial evaluation
    uses histograms in range between -10000 and +10000 ADU with a
    resolution of 100 ADU per bin. The shift required to match both
    histogram borders is an initial estimate for the baseline shift.

    Finally, the mean bin count of the five outermost bins of the high and
    medium gain histograms is compared. The baseline is shifted by
    increments of 1 ADU until they are within 20% of each other.

    From this point on additional shifts are performed while the
    deviation stays within 30% of each other. The final shift is then
    evaluated as the point of minimal deviation of these values.

    A maximum iteration count of 200 is imposed on the procedure. If no
    convergence is reached within this limit, the original data is
    returned, otherwise the baseline corrected data is returned.
    """
    dd = copy.copy(d)
    # shift while the medium gain produces negative values on average
    pc = 0
    it = 0
    max_it = 100
    while np.nanmedian(dd[g == 1] - pc) < 0:
        pc -= 50
        if it > max_it:
            return d, 0
        it += 1

    def min_hist_distance(pc: int,
                          bins: int = 100,
                          ran: Tuple[int, int] = (-10000, 10000),
                          minbin: int = 10) -> float:
        hh, e = np.histogram(dd[g == 0] - pc, bins=bins, range=ran)
        hm, e = np.histogram((dd[g == 1] - pc) * pcm[g == 1], bins=bins,
                             range=ran)

        # right most significant value of high gain histogram
        hhm = np.nanmean(hh[hh > minbin])
        rmh = hh.size - np.argmax((hh > 0.1 * hhm)[::-1])

        # left most significant value of high gain histogram
        hmm = np.nanmean(hm[hm > minbin])
        lmm = np.argmax((hm > 0.1 * hmm))
        med_pcm = np.nanmedian(pcm[g == 1])
        eh = e[rmh]
        el = e[lmm]
        pc += (el - eh) / (med_pcm - 1)
        return pc

    # set a pc one halfstep higher than initial guesstimate
    pc += 50
    pc = min_hist_distance(pc)

    # now start rolling back until we have similar signal levels
    def comp_signal_level(pc, minbin=1):
        hh, e = np.histogram(dd[g == 0] - pc, bins=100, range=(5000, 7000))
        hm, e = np.histogram((dd[g == 1] - pc) * pcm[g == 1], bins=100,
                             range=(5000, 7000))

        # right most significant value of high gain histogram
        hhm = np.nanmean(hh[hh > minbin])
        rmh = hh.size - np.argmax((hh > 0.1 * hhm)[::-1])

        # left most significant value of high gain histogram
        hmm = np.nanmean(hm[hm > minbin])
        lmm = np.argmax((hm > 0.1 * hmm))

        reg = 5
        ret = np.abs(np.sum(hh[rmh - reg:rmh]) - np.sum(hm[lmm:lmm + reg]))
        ret /= np.sum(hh[rmh - reg:rmh])
        return ret

    it = 0
    max_it = 200
    opc = pc
    m_it_break = False
    while comp_signal_level(pc) > 0.2:
        pc += 1
        if it > max_it:
            pc = opc
            m_it_break = True
            break
        it += 1

    it = 0
    if not m_it_break:
        pcs = [pc]
        slevs = []
        slev = comp_signal_level(pc)
        slevs.append(slev)
        while slev < 0.3:
            pc += 1
            slev = comp_signal_level(pc)
            slevs.append(slev)
            pcs.append(pc)
            it += 1

        pc = pcs[np.argmin(slevs)]

    return d - pc, pc

correct_baseline_via_hist_asic(d, g)

Correct diagonal falloff on ASICs by matching corner histograms

:param d: single image data :param g: gain values for d

Corrections are performed for the top and bottom row of ASICs seperately.

In easch row, starting from the beam-hole closest ASIC, the histogram of a square region of size 8x8 pixels is evaluted for its maximum bin count location (only medium gain values), and compared to a histogram produced from a neighbouring region on the next ASIC. Double sized pixels are avoided.

The reasoning is that the histograms should not dramatically differ for continuously distributed photon events. Each ASIC is then shifted such that histograms match and the corrected data is returned.

Warning: this feature is very experimental!

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def correct_baseline_via_hist_asic(d, g):
    """ Correct diagonal falloff on ASICs by matching corner histograms

    :param d: single image data
    :param g: gain values for `d`

    Corrections are performed for the top and bottom row of ASICs
    seperately.

    In easch row, starting from the beam-hole closest ASIC, the histogram
    of a square region of size 8x8 pixels is evaluted for its maximum
    bin count location (only medium gain values), and compared to a
    histogram produced from a neighbouring region on the next ASIC.
    Double sized pixels are avoided.

    The reasoning is that the histograms should not dramatically differ
    for continuously distributed photon events. Each ASIC is then shifted
    such that histograms match and the corrected data is returned.

    Warning: this feature is very experimental!
    """

    rsize = 8

    def hist_ends(dm, bins=5000, ran=(-5000, 5000),
                  minbin=4 / (16 / rsize)):

        hm, e = np.histogram(dm, bins=bins, range=ran)

        # left most significant value of medium gain histogram
        # hmm = np.nanmean(hm[hm > minbin])
        lmm = np.argmax((hm > minbin))

        return e[lmm], np.sum(hm)

    for i in range(1, 8):

        # upper asics
        casic = np.concatenate(
            (d[i * 64 - rsize:i * 64 - 1, 64:64 + rsize].flatten(),
             d[i * 64 + 1:i * 64 + rsize, 64 - rsize:64].flatten()))

        casic_g = np.concatenate(
            (g[i * 64 - rsize:i * 64 - 1, 64:64 + rsize].flatten(),
             g[i * 64 + 1:i * 64 + rsize, 64 - rsize:64].flatten()))

        idxm = casic_g == 1
        cme, cms = hist_ends(casic[idxm])

        asic = d[i * 64 + 1:i * 64 + rsize, 64:64 + rsize].flatten()
        asic_g = g[i * 64 + 1:i * 64 + rsize, 64:64 + rsize].flatten()

        idxm = asic_g == 1
        me, ms = hist_ends(asic[idxm])

        if cms > rsize * rsize / 10 and ms > rsize * rsize / 10:
            pc = cme - me
        else:
            pc = 0

        if np.abs(pc) > 500:
            # somthing when wrong
            pc = 0
        d[i * 64:(i + 1) * 64, 64:] += pc

    for i in range(0, 7):
        # lower asics
        casic = np.concatenate((d[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
                                64:64 + rsize].flatten(),
                                d[(i + 1) * 64 + 1:(i + 1) * 64 + rsize,
                                64 - rsize:64].flatten()))

        casic_g = np.concatenate((g[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
                                  64:64 + rsize].flatten(),
                                  g[(i + 1) * 64 + 1:(i + 1) * 64 + rsize,
                                  64 - rsize:64].flatten()))

        idxm = casic_g == 1
        cme, cms = hist_ends(casic[idxm])

        asic = d[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
               64 - rsize:64].flatten()
        asic_g = g[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
                 64 - rsize:64].flatten()

        idxm = asic_g == 1
        me, ms = hist_ends(asic[idxm])

        if cms > rsize * rsize / 10 and ms > rsize * rsize / 10:
            pc = cme - me
        else:
            pc = 0

        if np.abs(pc) > 500:
            # somthing went wrong
            pc = 0
        d[i * 64:(i + 1) * 64, :64] += pc

    return d, np.nan

get_gain_pc_slopes(slopes_pc, mem_cells, variant=0)

Calculate gain PC slopes and the high and medium gain medians.

Parameters:

Name Type Description Default
slopes_pc np.ndarray

SlopesPC gain constant array.

required
mem_cells int

Number of operating memory cells.

required
variant int

Calibration Constant Version variant.

0

Returns:

Type Description
np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray

np.ndarray: description

np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray

np.ndarray: description

np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray

np.ndarray: description

np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray

np.ndarray: description

np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray

np.ndarray: description

np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray

np.ndarray: description

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def get_gain_pc_slopes(
    slopes_pc: np.ndarray,
    mem_cells: int,
    variant: int = 0,
    ) -> (
        np.ndarray, np.ndarray, np.ndarray,
        np.ndarray, np.ndarray, np.ndarray
    ):
    """Calculate gain PC slopes and the high and medium gain medians.

    Args:
        slopes_pc (np.ndarray): SlopesPC gain constant array.
        mem_cells (int): Number of operating memory cells.
        variant (int): Calibration Constant Version variant.

    Returns:
        np.ndarray: _description_
        np.ndarray: _description_
        np.ndarray: _description_
        np.ndarray: _description_
        np.ndarray: _description_
        np.ndarray: _description_
    """
    # high gain slope from pulse capacitor data
    pc_high_m = slopes_pc[..., :mem_cells, 0]
    pc_high_l = slopes_pc[..., :mem_cells, 1]
    # medium gain slope from pulse capacitor data
    pc_med_m = slopes_pc[..., :mem_cells, 3]
    pc_med_l = slopes_pc[..., :mem_cells, 4]

    # calculate median for slopes
    pc_high_med = np.nanmedian(pc_high_m, axis=(0, 1))
    pc_med_med = np.nanmedian(pc_med_m, axis=(0, 1))

    if variant.get("SlopesPC", 0) == 0:
        # calculate median for intercepts:
        pc_high_l_med = np.nanmedian(pc_high_l, axis=(0, 1))
        pc_med_l_med = np.nanmedian(pc_med_l, axis=(0, 1))

        # sanitize PC data with CCV variant = 0.
        # Sanitization is already done for constants
        # with CCV variant = 1
        # In the following loop,
        # replace `nan`s across memory cells with
        # the median value calculated previously.
        # Then, values outside of the valid range (0.8 and 1.2)
        # are fixed to the median value.
        # This is applied for high and medium gain stages
        for i in range(mem_cells):

            pc_high_m[
                np.isnan(pc_high_m[..., i]), i] = pc_high_med[i]
            pc_med_m[
                np.isnan(pc_med_m[..., i]), i] = pc_med_med[i]

            pc_high_l[
                np.isnan(pc_high_l[..., i]), i] = pc_high_l_med[i]
            pc_med_l[
                np.isnan(pc_med_l[..., i]), i] = pc_med_l_med[i]

            pc_high_m[
                (pc_high_m[..., i] < 0.8 * pc_high_med[i]) |
                (pc_high_m[..., i] > 1.2 * pc_high_med[i]), i] = pc_high_med[i]  # noqa
            pc_med_m[
                (pc_med_m[..., i] < 0.8 * pc_med_med[i]) |
                (pc_med_m[..., i] > 1.2 * pc_med_med[i]), i] = pc_med_med[i]  # noqa

            pc_high_l[
                (pc_high_l[..., i] < 0.8 * pc_high_l_med[i]) |
                (pc_high_l[..., i] > 1.2 * pc_high_l_med[i]), i] = pc_high_l_med[i]  # noqa
            pc_med_l[
                (pc_med_l[..., i] < 0.8 * pc_med_l_med[i]) |
                (pc_med_l[..., i] > 1.2 * pc_med_l_med[i]), i] = pc_med_l_med[i]  # noqa

    return pc_high_m, pc_med_m, pc_high_l, pc_med_l, pc_high_med, pc_med_med

get_shadowed_stripe(data, threshold, fraction)

Return 1D bool array of which pixel rows are shadowed. Shadowed region is presented by the list of lines of pixels along numpy axis 1. Regions are defined with respect of threshold and fraction. Margin of one pixel is used. Double-pixels are excluded, so only regular square pixels are used.

:param data: (numpy.ndarray, rank=2) offset corrected single image :param threshold: (float) a threshold, below which pixes is considered as dark :param fraction: (float) a fraction of pixels in a line along axis 1 below which a full line is considered as dark

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def get_shadowed_stripe(data, threshold, fraction):
    """
    Return 1D bool array of which pixel rows are shadowed.
    Shadowed region is presented by the list of lines of pixels along
    numpy axis 1. Regions are defined with respect of threshold
    and fraction. Margin of one pixel is used.
    Double-pixels are excluded, so only regular square pixels are used.

    :param data: (numpy.ndarray, rank=2) offset corrected single image
    :param threshold: (float) a threshold, below which
    pixes is considered as dark
    :param fraction: (float) a fraction of pixels in a line along axis 1
    below which a full line is considered as dark
    """
    # Identify rows with > fraction of their pixels < threshold
    npx_all = np.count_nonzero(~np.isnan(data), axis=1)
    npx_sh = np.count_nonzero(data < threshold, axis=1)
    rows_to_use = npx_sh / npx_all > fraction

    # TODO: is this necessary?
    # The previous code excludes the first and last rows of each shadowed
    # stripe, before accounting for the double-width pixels.
    # For now, I'll preserve the behaviour. -TK
    shadow_bounds = contiguous_regions(rows_to_use)
    rows_to_use[shadow_bounds[:, 0]] = False
    rows_to_use[shadow_bounds[:, 1] - 1] = False

    # Ignore rows with double-width pixels
    rows_to_use[64:512:64] = False
    rows_to_use[63:511:64] = False

    # TODO: is this necessary?
    # The previous code here and in baseline_correct_via_stripe ignored any
    # stripes <= 2 pixels width. Is this by design, or just an accident of
    # ignoring the double-width pixels (which are in stripes of 2 together)?
    # For now, I'll preserve the behaviour. -TK
    shadow_bounds = contiguous_regions(rows_to_use)
    stripe_widths = shadow_bounds[:, 1] - shadow_bounds[:, 0]
    stripes_too_narrow = shadow_bounds[stripe_widths <= 2]
    for start, end in stripes_too_narrow:
        rows_to_use[start:end] = False

    return rows_to_use

make_noisy_adc_mask(bmask, noise_threshold)

Mask entire ADC if they are noise above a relative threshold

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def make_noisy_adc_mask(bmask, noise_threshold):
    """ Mask entire ADC if they are noise above a relative threshold
    """
    msk = np.count_nonzero(
        ((bmask & BadPixels.NOISE_OUT_OF_THRESHOLD != 0)
         | (bmask & BadPixels.OFFSET_NOISE_EVAL_ERROR != 0)).astype(
            np.int), axis=0)

    fmask = np.zeros_like(msk).astype(np.uint32)
    adc_i = bmask.shape[1] // 8
    adc_j = bmask.shape[2] // 8
    for i in range(adc_i):
        for j in range(adc_j):
            adc_cnt = np.count_nonzero(
                msk[i * adc_i:(i + 1) * adc_i, j * adc_j:(j + 1) * adc_j] >
                bmask.shape[0] // 32)
            if adc_cnt / (adc_i * adc_j) >= noise_threshold:
                fmask[i * adc_i:(i + 1) * adc_i,
                j * adc_j:(j + 1) * adc_j] = BadPixels.NOISY_ADC
    return fmask

match_asic_borders(d, chunk=8, channel=0)

Match border pixels of the two ASIC rows to the same median value

:param d: single image data :param chunk: number of pixels on each ASIC boundary to generate mean values for :param channel: Module index of given data

Each ASIC has n=64/chunk mean values calculated along its two border pixel rows. The deviation of chunk pairs is then evaluated and the upper/lower ASICs are shifted by the mean deviation for the right/left hemisphere of the detector.

The corrected data is returned.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def match_asic_borders(d, chunk=8, channel=0):
    """ Match border pixels of the two ASIC rows to the same median value

    :param d: single image data
    :param chunk: number of pixels on each ASIC boundary to generate
    mean values for
    :param channel: Module index of given data

    Each ASIC has `n=64/chunk` mean values calculated along its two border
    pixel rows. The deviation of chunk pairs is then evaluated and the
    upper/lower ASICs are shifted by the mean deviation for the
    right/left hemisphere of the detector.

    The corrected data is returned.
    """
    for i in range(8):

        these_asics = d[:, i * 64:(i + 1) * 64, :]
        diffs = np.zeros((d.shape[0], 8))
        for k in range(64 // chunk):
            ldat = these_asics[:, k * chunk:(k + 1) * chunk, 62:64]
            lowerasic = np.nanmedian(ldat, axis=(1, 2))
            udat = these_asics[:, k * chunk:(k + 1) * chunk, 64:66]
            upperasic = np.nanmedian(udat, axis=(1, 2))
            low_up = lowerasic - upperasic
            up_low = upperasic - lowerasic
            diff = low_up if channel < 8 else up_low
            diffs[:, k] = diff

        mn = np.nanmean(diffs, axis=1)[:, None, None] / 2
        if channel < 8:
            d[:, i * 64:(i + 1) * 64, 64:] += mn
            d[:, i * 64:(i + 1) * 64, :64] -= mn
        else:
            d[:, i * 64:(i + 1) * 64, :64] += mn
            d[:, i * 64:(i + 1) * 64, 64:] -= mn
    return d

melt_snowy_pixels(raw, im, gain, rgain, resolution=None)

Identify (and optionally interpolate) 'snowy' pixels

:param raw: raw data :param im: offset and relative gain corrected data: :param gain: gain values for raw :param rgain: raw gain data, scaled by the threshold for high-to-medium gain switching :param resolution: resolution strategy, should be of enum-type SnowResolution

Snowy pixels are pixels which are already identified as pixels in the medium gain stage by their gain values, but which have transitional image values between the largest high gain value and the smalles medium gain value. As these values may be encountered again in the context of medium gain, they are ambiguous and cannot readily be identified by simple thresholding alone.

It is attempted to identify these snowy pixels by using a Gaussian mixture clustering algorithm acting on multispectral input. Positions in the cluster are identified as follows:

x: abs(p_r-p_bg)p_rg*2 y: p_r

where p_r is a given pixel raw value, p_bg is the mean value of the 8 direct neighbor pixels, and p_rg is the raw gain value of the pixel.

Note that these cluster params are purely empirically derived, and do not have any connection to ASIC design etc.

Only pixels in medium gain are evaluated, and evaluation is image-wise, but subdivided further into ASICs.

The so-created graph [[x_0, x_1, ..., x_n], [y_0, y_1, ..., y_n]] is scaled using a sklearn.StandardScaler and input into a two-component GaussianMixture clustering algorithm. This results in a set of labels, identifying pixels as likely snowy, or not. However, for a given image we do not know which label is which from the output. Hence, labels are differentiated under the assumption that snowy pixel occur to be out-of-context, i.e. their surrounding pixels are more likely at lower signal values, than if the pixel is in a region were gain switching led to a large value.

Depending on the resolution strategy so-identified pixels are either set to np.nan or to the interpolated value of the direct (high-gain) neighbors.

The corrected data is returned alongside an updated gain mask and bad pixel mask.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/agipdutils.py
def melt_snowy_pixels(raw, im, gain, rgain, resolution=None):
    """ Identify (and optionally interpolate) 'snowy' pixels

    :param raw: raw data
    :param im: offset and relative gain corrected data:
    :param gain: gain values for `raw`
    :param rgain: raw gain data, scaled by the threshold for
        high-to-medium gain switching
    :param resolution: resolution strategy, should be of enum-type
        `SnowResolution`

    Snowy pixels are pixels which are already identified as pixels in
    the medium gain stage by their gain values, but which have
    transitional image values between the largest high gain value and
    the smalles medium gain value. As these values may be encountered again
    in the context of medium gain, they are  ambiguous and cannot
    readily be identified by simple thresholding alone.

    It is attempted to identify these snowy pixels by using a Gaussian
    mixture clustering algorithm acting on multispectral input.
    Positions in the cluster are identified as follows:

    x: abs(p_r-p_bg)*p_rg**2
    y: p_r

    where p_r is a given pixel raw value, p_bg is the mean value of the
    8 direct  neighbor pixels, and p_rg is the raw gain value of the pixel.

    Note that these cluster params are purely empirically derived,
    and do not have any connection to ASIC design etc.

    Only pixels in medium gain are evaluated, and evaluation is
    image-wise, but subdivided further into ASICs.

    The so-created graph [[x_0, x_1, ..., x_n], [y_0, y_1, ..., y_n]] is
    scaled using a `sklearn.StandardScaler` and input into a two-component
    `GaussianMixture` clustering algorithm. This results in a set of
    labels, identifying pixels as likely snowy, or not. However,
    for a given image we do not know which label is which from the
    output. Hence, labels are differentiated under the assumption that
    snowy pixel occur to be out-of-context, i.e. their surrounding pixels
    are more likely at lower signal values, than if the pixel is in a region
    were gain switching led to a large value.

    Depending on the resolution strategy so-identified pixels are either
    set to `np.nan` or to the interpolated value of the direct (high-gain)
    neighbors.

    The corrected data is returned alongside an updated gain mask and bad
    pixel
    mask.
    """
    snow_mask = np.zeros(im.shape, np.uint32)
    for k in range(im.shape[0]):
        for i in range(8):
            for j in range(2):

                fidx = gain[k, i * 64:(i + 1) * 64,
                       j * 64:(j + 1) * 64] == 1
                fidx_h = gain[k, i * 64:(i + 1) * 64,
                         j * 64:(j + 1) * 64] == 0

                # need at least two pixels in medium gain to be able to
                # cluster in two groups
                if np.count_nonzero(fidx) < 2:
                    continue

                # data for a given asic
                asic = raw[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
                asic_g = gain[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
                asic_r = rgain[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]

                asic_h = copy.copy(asic)
                asic_h[~fidx_h] = np.nan

                # calculate the background of by averaging the 8 direct
                # neighbours of each pixel
                rl = np.roll(asic, 1, axis=0)
                rr = np.roll(asic, -1, axis=0)
                ru = np.roll(asic, 1, axis=1)
                rd = np.roll(asic, -1, axis=1)
                rlu = np.roll(rl, 1, axis=1)
                rld = np.roll(rl, -1, axis=1)
                rru = np.roll(rr, 1, axis=1)
                rrd = np.roll(rr, -1, axis=1)
                bg = (rl + rr + ru + rd + rlu + rld + rru + rrd) / 8

                # calculate the background of by averaging the 8 direct
                # neighbours of each pixel
                # here only for high gain pixels
                rl = np.roll(asic_h, 1, axis=0)
                rr = np.roll(asic_h, -1, axis=0)
                ru = np.roll(asic_h, 1, axis=1)
                rd = np.roll(asic_h, -1, axis=1)
                rlu = np.roll(asic_h, 1, axis=1)
                rld = np.roll(asic_h, -1, axis=1)
                rru = np.roll(asic_h, 1, axis=1)
                rrd = np.roll(asic_h, -1, axis=1)
                bg_h = np.nanmean(
                    np.array([rl, rr, ru, rd, rlu, rld, rru, rrd]), axis=0)

                # construct a graph
                graph = np.array(
                    [np.abs(asic[fidx] - bg[fidx]) * asic_r[fidx] ** 2,
                     asic[fidx]]).T
                # scale it
                graph = StandardScaler().fit_transform(graph)
                # perform clustering
                spc = GaussianMixture(n_components=2, random_state=0)
                spc.fit(graph)
                # and get labels
                labels = spc.predict(graph)

                asic = im[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
                asic_msk = snow_mask[k, i * 64:(i + 1) * 64,
                           j * 64:(j + 1) * 64]
                mim = asic[fidx]
                asicb = bg_h
                mimb = asicb[fidx]
                mimg = asic_g[fidx]
                mmsk = asic_msk[fidx]

                # identify which labels are which
                mn1 = np.nanmean(bg[fidx][labels == 0])
                mn2 = np.nanmean(bg[fidx][labels == 1])
                implabel = 1
                if mn1 > mn2:
                    implabel = 0

                # if we've misidentified, then we will have to many
                # snowy pixels
                # happens if the ASIC is generally on a high signal level
                if np.count_nonzero([labels == implabel]) > 64 * 64 / 3:
                    continue

                # or a large portion of misidenfied label covers the ASIC
                if np.count_nonzero([labels != implabel]) > 0.8 * 64 * 64:
                    continue

                # set to NaN if requested
                if resolution is SnowResolution.NONE:
                    mim[labels == implabel] = np.nan
                # or use the interpolated value
                elif resolution is SnowResolution.INTERPOLATE:
                    mim[labels == implabel] = mimb[labels == implabel]
                    mimg[labels == implabel] = 0
                # identify these pixels in a bad pixel mask
                mmsk[labels == implabel] = BadPixels.INTERPOLATED

                # now set back to data
                asic[fidx] = mim
                asic_g[fidx] = mimg
                asic_msk[fidx] = mmsk
                im[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64] = asic
                gain[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64] = asic_g
                snow_mask[k, i * 64:(i + 1) * 64,
                j * 64:(j + 1) * 64] = asic_msk
    return im, gain, snow_mask