Skip to content

jungfrau_ff

chunk_Multi(data, block_size)

chunks input array along the 'row' and 'col' dimensions

  • data (list): input data arrays, each at least 2-D
  • block_size (list, int): [chunk_row, chunk_col], dimension of the 2-D chunk

retruns: list fo chunked data

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def chunk_Multi(data, block_size):
    """
    chunks input array along the 'row' and 'col' dimensions

    arguments:
    * data (list): input data arrays, each at least 2-D
    * block_size (list, int): [chunk_row, chunk_col], dimension of the 2-D chunk

    retruns: list fo chunked data
    """
    intervals_r = range(0, data[0].shape[-2], block_size[0])
    intervals_c = range(0, data[0].shape[-1], block_size[1])

    chunks = []

    for irow in intervals_r:
        for icol in intervals_c:
            this_chunk = []
            this_chunk = [data[0][..., irow:irow + block_size[0], icol:icol + block_size[1]]]
            for j in range(1, len(data)):
                if data[j] is not None:
                    this_chunk.append(data[j][..., irow:irow + block_size[0], icol:icol + block_size[1]])
                else:
                    this_chunk.append(None) 
            chunks.append(this_chunk)

    return chunks

chunk_trains(data, train_chk)

chunks data along the train dimension

  • data (list): input data arrays, each at least 1-D
  • train_chk (int): number of trains per chunk

retruns: list fo chunked data

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def chunk_trains(data, train_chk):
    """
    chunks data along the train dimension

    arguments:
    * data (list): input data arrays, each at least 1-D
    * train_chk (int): number of trains per chunk

    retruns: list fo chunked data
    """
    n_trains = int(data[0].shape[0])
    chunks = []
    for i in range(0, n_trains, train_chk):
        this_chunk = []
        this_chunk = [data[0][i:min(i+train_chk, n_trains)]]
        for j in range(1, len(data)):
            if data[j] is not None:
                this_chunk.append(data[j][i:min(i+train_chk, n_trains)])
            else:
                this_chunk.append(None)          

        chunks.append(this_chunk)
    return chunks

fill_histogram(h_bins, _inp)

fills an histogram with shape (n_bins-1, memory_cells, n_rows, n_cols) from input images

  • h_bins (list, float): the binning of the x-axis
  • _inp (list): data to be histogrammed, as:
    • _inp[0]: images, with shape (trains, memory_cells, row, col)

retuns: histogram bin counts, bin edges

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fill_histogram(h_bins, _inp):
    """
    fills an histogram with shape (n_bins-1, memory_cells, n_rows, n_cols) from input images

    arguments:
    * h_bins (list, float): the binning of the x-axis
    * _inp (list): data to be histogrammed, as:
        * _inp[0]: images, with shape (trains, memory_cells, row, col)

    retuns: histogram bin counts, bin edges   

    """
    imgs = _inp[0]  # [trains, cells, rows, cols]
    n_cells = imgs.shape[1]
    n_rows = imgs.shape[2]
    n_cols = imgs.shape[3]

    h_chk = np.zeros((len(h_bins)-1, n_cells, n_rows, n_cols), dtype=np.int32)

    e_chk = None

    for cell in range(n_cells):
        for row in range(n_rows):
            for col in range(n_cols):
                this_cell = imgs[:, cell, ...]
                this_pix = this_cell[:, row, col]

                _h, _e = np.histogram(this_pix, bins=h_bins)                
                h_chk[..., cell, row, col] += _h

                if e_chk is None:
                    e_chk = np.array(_e)    

    return h_chk, e_chk                

fit_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio)

fits histogram with single photon function with charge sharing tail

  • x (array, float): x values
  • y (array, float): y values
  • yerr (array, float): errors of the y values
  • sigma0 (float): rough estimate of peak variance
  • n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
  • ratio (float): ratio parameter of the peak finder
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
    """
    fits histogram with single photon function with charge sharing tail

    arguments:
    * x (array, float): x values
    * y (array, float): y values
    * yerr (array, float): errors of the y values
    * sigma0 (float): rough estimate of peak variance
    * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
    * ratio (float): ratio parameter of the peak finder

    returns: peak position, peak variance, chi2/ndf, charge sharing parameter 
    """

    norm = np.sum(y) * (x[1] - x[0])
    alpha0 = 0.3
    thr = n_sigma * sigma0
    _peaks, imin = _peak_position(x, y, thr, ratio=ratio)

    q = -1.
    sigma = -1.
    alpha = -1.
    chi2ndf = -1.
    q0 = -1.

    if len(_peaks) > 0:
        q0 = np.ma.min(x[_peaks])

    if q0 > -1.:
        limit_q = (q0 - sigma0, q0 + sigma0)
        limit_sigma = (0.1 * sigma0, 2. * sigma0)
        limit_alpha = (0.01 * alpha0, 1.)
        limit_norm = (np.sqrt(0.1 * norm), 3. *norm)

        res = chi_square_fit(
            charge_sharing,
            x,
            y,
            yerr,
            fit_range=(0.5 * q0, q0 + 2. * sigma0),
            ped=0.,
            q=q0,
            sigma=sigma0,
            alpha=alpha0,
            norm=norm,
            limit_q=limit_q,
            limit_sigma=limit_sigma,
            limit_alpha=limit_alpha,
            limit_norm=limit_norm,
            fix_ped=True,
            print_level=0,
            pedantic=False,
        )
        values = res[0]
        errors = res[1]
        chi2 = res[2]
        ndf = res[3]
        is_valid = res[4]

        if is_valid:
            q = values['q']
            sigma = values['sigma']
            alpha = values['alpha']
            chi2ndf = chi2/ndf
        else:
            q = -1000.
            sigma = -1000.
            alpha = -1000.
            chi2ndf = -1000.

    return q, sigma, chi2ndf, alpha

fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio)

fits histogram with sum of two single photon functions with charge sharing tail

  • x (array, float): x values
  • y (array, float): y values
  • yerr (array, float): errors of the y values
  • sigma0 (float): rough estimate of peak variance
  • n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
  • ratio (float): ratio parameter of the peak finder
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
    """
    fits histogram with sum of two single photon functions with charge sharing tail

    arguments:
    * x (array, float): x values
    * y (array, float): y values
    * yerr (array, float): errors of the y values
    * sigma0 (float): rough estimate of peak variance
    * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
    * ratio (float): ratio parameter of the peak finder

    returns: peak position, peak variance, chi2/ndf, charge sharing parameter (all of them of the first peak)

    """
    norm = np.sum(y) * (x[1] - x[0])
    alpha0 = 0.3
    thr = n_sigma * sigma0
    _peaks, imin = _peak_position(x, y, thr, ratio=ratio)

    qa = -1.
    if len(_peaks) > 0:
        qa = np.ma.min(x[_peaks])

    q = -1.
    sigma = -1.
    alpha = -1.
    chi2ndf = -1.

    if qa > -1.:

        qb = 1.1 * qa
        limit_q_a = (qa - sigma0, qa + sigma0)
        limit_sigma_a = (0.1 * sigma0, 2. * sigma0)
        limit_alpha = (0.01, 1.)
        limit_norm_a = (np.ma.sqrt(0.1 *norm), 2. * norm)
        limit_q_b = (qb - sigma0, qb + sigma0)
        limit_sigma_b = (0.1 * sigma0, 2. * sigma0)
        limit_norm_b = (np.ma.sqrt(0.1 * 0.2 * norm), 2 * 0.2 *norm)

        res = chi_square_fit(
            double_peak, x, y, yerr,
            fit_range=(0.5*qa, qb + sigma0),
            ped=0.,
            q_a=qa,
            sigma_a=sigma0,
            alpha=alpha0,
            norm_a=norm,
            q_b=qb,
            sigma_b=sigma0,
            norm_b=norm,
            limit_q_a=limit_q_a,
            limit_sigma_a=limit_sigma_a,
            limit_alpha=limit_alpha,
            limit_norm_a=limit_norm_a,
            limit_q_b=limit_q_b,
            limit_sigma_b=limit_sigma_b,
            limit_norm_b=limit_norm_b,
            fix_ped=True,
            print_level=0,
            pedantic=False,
        )
        values = res[0]
        errors = res[1]
        chi2 = res[2]
        ndf = res[3]
        is_valid = res[4]

        if is_valid:
            q = values['q_a']
            sigma = values['sigma_a']
            alpha = values['alpha']
            chi2ndf = chi2/ndf
        else:
            q = -1000.
            sigma = -1000.
            alpha = -1000.
            chi2ndf = -1000.

    return q, sigma, chi2ndf, alpha

fit_gauss(x, y, yerr, sigma0, n_sigma, ratio)

fits histogram with a gaussian function

  • x (array, float): x values
  • y (array, float): y values
  • yerr (array, float): errors of the y values
  • sigma0 (float): rough estimate of peak variance
  • n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
  • ratio (float): ratio parameter of the peak finder

peak position, peak variance, chi2/ndf, charge sharing parameter (last one alway == 0)

Type Description

all of them are kept for compatibility with the wrap around function

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio):
    """
    fits histogram with a gaussian function

    arguments:
    * x (array, float): x values
    * y (array, float): y values
    * yerr (array, float): errors of the y values
    * sigma0 (float): rough estimate of peak variance
    * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
    * ratio (float): ratio parameter of the peak finder

    returns: peak position, peak variance, chi2/ndf, charge sharing parameter (last one alway == 0)
            all of them are kept for compatibility with the wrap around function

    """
    norm = np.sum(y) * (x[1] - x[0])/np.sqrt(2.*np.pi*sigma0**2)
    alpha0 = 0.3
    thr = n_sigma * sigma0
    _peaks, imin = _peak_position(x, y, thr, ratio=ratio)

    q0 = -1000.
    if len(_peaks) > 0:
        q0 = np.ma.min(x[_peaks])

    q = -1.
    sigma = -1.
    alpha = -1.
    chi2ndf = -1.

    if q0 > -1000.:
        limit_amp = (0.01*norm, 2.*norm)
        limit_mean = (q0-sigma0, q0+sigma0)
        limit_sigma = (0.1*sigma0, 3.*sigma0)

        res = chi_square_fit(
            gauss, x, y, yerr,
            fit_range=[q0 - sigma0, q0 + 2.*sigma0],
            amp=norm,
            mean=q0,
            sigma=sigma0,
            limit_amp=limit_amp,
            limit_mean=limit_mean,
            limit_sigma=limit_sigma,
            print_level=0,
            pedantic=False,
        )
        values = res[0]
        errors = res[1]
        chi2 = res[2]
        ndf = res[3]
        is_valid = res[4]

        if is_valid:
            q = values['mean']
            sigma = values['sigma']
            alpha = 0.
            chi2ndf = chi2/ndf
        else:
            q = -1000.
            sigma = -1000.
            alpha = -1000.
            chi2ndf = -1000.

    return q, sigma, chi2ndf, alpha        

fit_histogram(x, _fit_func, n_sigma, rebin, ratio, noise, _inp)

wrap around function for fitting of histogram

  • x (list, float): - bin centers along x
  • _fit_func (string): which function to use for fit
    • CHARGE_SHARING: single peak with charge sharing tail
    • CHARGE_SHARING_2: sum of two CHARGE_SHARING
    • GAUSS: gaussian function
  • n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
  • ratio (float): ratio parameter of the peak finder
  • _input (list): contains the data to preform the fit
    • _input[0]: histogram bin counts with shape (n_bins, memory_cells, row, col)
    • _input[1]: noise map with shape (3, memory_cells, row, col)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_histogram(x, _fit_func, n_sigma, rebin, ratio, noise, _inp):
    """
    wrap around function for fitting of histogram

    arguments:
    * x (list, float): - bin centers along x
    * _fit_func (string): which function to use for fit
         * CHARGE_SHARING: single peak with charge sharing tail
         * CHARGE_SHARING_2: sum of two CHARGE_SHARING
         * GAUSS: gaussian function
    * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
    * ratio (float): ratio parameter of the peak finder
    * _input (list): contains the data to preform the fit
         * _input[0]: histogram bin counts with shape (n_bins, memory_cells, row, col)
         * _input[1]: noise map with shape (3, memory_cells, row, col)

    returns: map of peak values, map of peak variances, map of chi2/ndf, map of charge sharing parameter values
    """
    _funcs = dict(
        CHARGE_SHARING=fit_charge_sharing, 
        GAUSS=fit_gauss, 
        CHARGE_SHARING_2=fit_double_charge_sharing,
    )
    fit_func = _funcs[_fit_func]

    histo = _inp[0]
    n_cells, n_rows, n_cols = histo.shape[1:]

    sigma0 = 15.

    _init_array = np.zeros((n_cells, n_rows, n_cols), dtype=np.float32)
    g0_chk = _init_array.copy()
    sigma_chk = _init_array.copy()
    chi2ndf_chk = _init_array.copy()
    alpha_chk = _init_array.copy()

    for cell in range(n_cells):
        for row in range(n_rows):
            for col in range(n_cols):

                y = histo[:, cell, row, col] 
                y, _x = rebin_histo(y, x, rebin)
                yerr = histogram_errors(y)

                if noise[0, cell, row, col] > 0.:
                    sigma0 = noise[0, cell, row, col]

                q, sigma, chi2ndf, alpha = fit_func(_x, y, yerr, sigma0, n_sigma, ratio)

                g0_chk[cell, row, col] = q
                sigma_chk[cell, row, col] = sigma
                chi2ndf_chk[cell, row, col] = chi2ndf
                alpha_chk[cell, row, col] = alpha

    return g0_chk, sigma_chk, chi2ndf_chk, alpha_chk

set_histo_range(_x, _h, _h_range)

reduces the histogram bin axis to the range specified

  • _x (array, float): the bin centers array
  • _h (array, integers): the histogram with shape (bins, cells, columns, row)
  • _h_range (tuple, float): the (liminf, limsup) of the desired range

returns the new bin centers array and the new histogram

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def set_histo_range(_x, _h, _h_range):
    """
    reduces the histogram bin axis to the range specified

    args:
    * _x (array, float): the bin centers array
    * _h (array, integers): the histogram with shape (bins, cells, columns, row)
    * _h_range (tuple, float): the (liminf, limsup) of the desired range

    returns the new bin centers array and the new histogram

    """

    i_min = (np.abs(_x - _h_range[0])).argmin()
    i_max = (np.abs(_x - _h_range[1])).argmin()

    return _x[i_min:i_max], _h[i_min:i_max]

subtract_offset(offset_map, _inp)

perform offset subtraction on raw data

  • offset_map (xarray, float): map with offset constants, with shape (3, memory_cells, row, col)
  • _inp (list): input data as:
    • _inp[0]: raw images, with shape (trains, memory_cells, row, col)
    • _inp[1]: gain bit map, with shape (trains, memory_cells, row, col)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def subtract_offset(offset_map, _inp):
    """
    perform offset subtraction on raw data

    Args:
    * offset_map (xarray, float): map with offset constants,
        with shape (3, memory_cells, row, col)
    * _inp (list): input data as:
        * _inp[0]: raw images, with shape (trains, memory_cells, row, col)
        * _inp[1]: gain bit map, with shape (trains, memory_cells, row, col)

    Returns: offset subtracted images
    """
    imgs = _inp[0]
    gbit = _inp[1]

    n_cells = imgs.shape[1]

    for cell in range(n_cells):
        this_cell_gbit = gbit.sel(cell=cell)

        this_cell_off = offset_map[:, cell]

        _o = np.choose(
            this_cell_gbit, (
                np.expand_dims(this_cell_off[0], axis=0),
                np.expand_dims(this_cell_off[1], axis=0),
                np.expand_dims(this_cell_off[2], axis=0)
            )
        )

        imgs.loc[dict(cell=cell)] -= _o

    return imgs