jungfrau_ff
chunk_Multi(data, block_size)
¶
chunks input array along the 'row' and 'col' dimensions
- data (list): input data arrays, each at least 2-D
- block_size (list, int): [chunk_row, chunk_col], dimension of the 2-D chunk
retruns: list fo chunked data
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def chunk_Multi(data, block_size):
"""
chunks input array along the 'row' and 'col' dimensions
arguments:
* data (list): input data arrays, each at least 2-D
* block_size (list, int): [chunk_row, chunk_col], dimension of the 2-D chunk
retruns: list fo chunked data
"""
intervals_r = range(0, data[0].shape[-2], block_size[0])
intervals_c = range(0, data[0].shape[-1], block_size[1])
chunks = []
for irow in intervals_r:
for icol in intervals_c:
this_chunk = []
this_chunk = [data[0][..., irow:irow + block_size[0], icol:icol + block_size[1]]]
for j in range(1, len(data)):
if data[j] is not None:
this_chunk.append(data[j][..., irow:irow + block_size[0], icol:icol + block_size[1]])
else:
this_chunk.append(None)
chunks.append(this_chunk)
return chunks
chunk_trains(data, train_chk)
¶
chunks data along the train dimension
- data (list): input data arrays, each at least 1-D
- train_chk (int): number of trains per chunk
retruns: list fo chunked data
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def chunk_trains(data, train_chk):
"""
chunks data along the train dimension
arguments:
* data (list): input data arrays, each at least 1-D
* train_chk (int): number of trains per chunk
retruns: list fo chunked data
"""
n_trains = int(data[0].shape[0])
chunks = []
for i in range(0, n_trains, train_chk):
this_chunk = []
this_chunk = [data[0][i:min(i+train_chk, n_trains)]]
for j in range(1, len(data)):
if data[j] is not None:
this_chunk.append(data[j][i:min(i+train_chk, n_trains)])
else:
this_chunk.append(None)
chunks.append(this_chunk)
return chunks
fill_histogram(h_bins, _inp)
¶
fills an histogram with shape (n_bins-1, memory_cells, n_rows, n_cols) from input images
- h_bins (list, float): the binning of the x-axis
- _inp (list): data to be histogrammed, as:
- _inp[0]: images, with shape (trains, memory_cells, row, col)
retuns: histogram bin counts, bin edges
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fill_histogram(h_bins, _inp):
"""
fills an histogram with shape (n_bins-1, memory_cells, n_rows, n_cols) from input images
arguments:
* h_bins (list, float): the binning of the x-axis
* _inp (list): data to be histogrammed, as:
* _inp[0]: images, with shape (trains, memory_cells, row, col)
retuns: histogram bin counts, bin edges
"""
imgs = _inp[0] # [trains, cells, rows, cols]
n_cells = imgs.shape[1]
n_rows = imgs.shape[2]
n_cols = imgs.shape[3]
h_chk = np.zeros((len(h_bins)-1, n_cells, n_rows, n_cols), dtype=np.int32)
e_chk = None
for cell in range(n_cells):
for row in range(n_rows):
for col in range(n_cols):
this_cell = imgs[:, cell, ...]
this_pix = this_cell[:, row, col]
_h, _e = np.histogram(this_pix, bins=h_bins)
h_chk[..., cell, row, col] += _h
if e_chk is None:
e_chk = np.array(_e)
return h_chk, e_chk
fit_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio)
¶
fits histogram with single photon function with charge sharing tail
- x (array, float): x values
- y (array, float): y values
- yerr (array, float): errors of the y values
- sigma0 (float): rough estimate of peak variance
- n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
- ratio (float): ratio parameter of the peak finder
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
"""
fits histogram with single photon function with charge sharing tail
arguments:
* x (array, float): x values
* y (array, float): y values
* yerr (array, float): errors of the y values
* sigma0 (float): rough estimate of peak variance
* n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
* ratio (float): ratio parameter of the peak finder
returns: peak position, peak variance, chi2/ndf, charge sharing parameter
"""
norm = np.sum(y) * (x[1] - x[0])
alpha0 = 0.3
thr = n_sigma * sigma0
_peaks, imin = _peak_position(x, y, thr, ratio=ratio)
q = -1.
sigma = -1.
alpha = -1.
chi2ndf = -1.
q0 = -1.
if len(_peaks) > 0:
q0 = np.ma.min(x[_peaks])
if q0 > -1.:
limit_q = (q0 - sigma0, q0 + sigma0)
limit_sigma = (0.1 * sigma0, 2. * sigma0)
limit_alpha = (0.01 * alpha0, 1.)
limit_norm = (np.sqrt(0.1 * norm), 3. *norm)
res = chi_square_fit(
charge_sharing,
x,
y,
yerr,
fit_range=(0.5 * q0, q0 + 2. * sigma0),
ped=0.,
q=q0,
sigma=sigma0,
alpha=alpha0,
norm=norm,
limit_q=limit_q,
limit_sigma=limit_sigma,
limit_alpha=limit_alpha,
limit_norm=limit_norm,
fix_ped=True,
print_level=0,
pedantic=False,
)
values = res[0]
errors = res[1]
chi2 = res[2]
ndf = res[3]
is_valid = res[4]
if is_valid:
q = values['q']
sigma = values['sigma']
alpha = values['alpha']
chi2ndf = chi2/ndf
else:
q = -1000.
sigma = -1000.
alpha = -1000.
chi2ndf = -1000.
return q, sigma, chi2ndf, alpha
fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio)
¶
fits histogram with sum of two single photon functions with charge sharing tail
- x (array, float): x values
- y (array, float): y values
- yerr (array, float): errors of the y values
- sigma0 (float): rough estimate of peak variance
- n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
- ratio (float): ratio parameter of the peak finder
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
"""
fits histogram with sum of two single photon functions with charge sharing tail
arguments:
* x (array, float): x values
* y (array, float): y values
* yerr (array, float): errors of the y values
* sigma0 (float): rough estimate of peak variance
* n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
* ratio (float): ratio parameter of the peak finder
returns: peak position, peak variance, chi2/ndf, charge sharing parameter (all of them of the first peak)
"""
norm = np.sum(y) * (x[1] - x[0])
alpha0 = 0.3
thr = n_sigma * sigma0
_peaks, imin = _peak_position(x, y, thr, ratio=ratio)
qa = -1.
if len(_peaks) > 0:
qa = np.ma.min(x[_peaks])
q = -1.
sigma = -1.
alpha = -1.
chi2ndf = -1.
if qa > -1.:
qb = 1.1 * qa
limit_q_a = (qa - sigma0, qa + sigma0)
limit_sigma_a = (0.1 * sigma0, 2. * sigma0)
limit_alpha = (0.01, 1.)
limit_norm_a = (np.ma.sqrt(0.1 *norm), 2. * norm)
limit_q_b = (qb - sigma0, qb + sigma0)
limit_sigma_b = (0.1 * sigma0, 2. * sigma0)
limit_norm_b = (np.ma.sqrt(0.1 * 0.2 * norm), 2 * 0.2 *norm)
res = chi_square_fit(
double_peak, x, y, yerr,
fit_range=(0.5*qa, qb + sigma0),
ped=0.,
q_a=qa,
sigma_a=sigma0,
alpha=alpha0,
norm_a=norm,
q_b=qb,
sigma_b=sigma0,
norm_b=norm,
limit_q_a=limit_q_a,
limit_sigma_a=limit_sigma_a,
limit_alpha=limit_alpha,
limit_norm_a=limit_norm_a,
limit_q_b=limit_q_b,
limit_sigma_b=limit_sigma_b,
limit_norm_b=limit_norm_b,
fix_ped=True,
print_level=0,
pedantic=False,
)
values = res[0]
errors = res[1]
chi2 = res[2]
ndf = res[3]
is_valid = res[4]
if is_valid:
q = values['q_a']
sigma = values['sigma_a']
alpha = values['alpha']
chi2ndf = chi2/ndf
else:
q = -1000.
sigma = -1000.
alpha = -1000.
chi2ndf = -1000.
return q, sigma, chi2ndf, alpha
fit_gauss(x, y, yerr, sigma0, n_sigma, ratio)
¶
fits histogram with a gaussian function
- x (array, float): x values
- y (array, float): y values
- yerr (array, float): errors of the y values
- sigma0 (float): rough estimate of peak variance
- n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
- ratio (float): ratio parameter of the peak finder
peak position, peak variance, chi2/ndf, charge sharing parameter (last one alway == 0)
Type | Description |
---|---|
all of them are kept for compatibility with the wrap around function |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio):
"""
fits histogram with a gaussian function
arguments:
* x (array, float): x values
* y (array, float): y values
* yerr (array, float): errors of the y values
* sigma0 (float): rough estimate of peak variance
* n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
* ratio (float): ratio parameter of the peak finder
returns: peak position, peak variance, chi2/ndf, charge sharing parameter (last one alway == 0)
all of them are kept for compatibility with the wrap around function
"""
norm = np.sum(y) * (x[1] - x[0])/np.sqrt(2.*np.pi*sigma0**2)
alpha0 = 0.3
thr = n_sigma * sigma0
_peaks, imin = _peak_position(x, y, thr, ratio=ratio)
q0 = -1000.
if len(_peaks) > 0:
q0 = np.ma.min(x[_peaks])
q = -1.
sigma = -1.
alpha = -1.
chi2ndf = -1.
if q0 > -1000.:
limit_amp = (0.01*norm, 2.*norm)
limit_mean = (q0-sigma0, q0+sigma0)
limit_sigma = (0.1*sigma0, 3.*sigma0)
res = chi_square_fit(
gauss, x, y, yerr,
fit_range=[q0 - sigma0, q0 + 2.*sigma0],
amp=norm,
mean=q0,
sigma=sigma0,
limit_amp=limit_amp,
limit_mean=limit_mean,
limit_sigma=limit_sigma,
print_level=0,
pedantic=False,
)
values = res[0]
errors = res[1]
chi2 = res[2]
ndf = res[3]
is_valid = res[4]
if is_valid:
q = values['mean']
sigma = values['sigma']
alpha = 0.
chi2ndf = chi2/ndf
else:
q = -1000.
sigma = -1000.
alpha = -1000.
chi2ndf = -1000.
return q, sigma, chi2ndf, alpha
fit_histogram(x, _fit_func, n_sigma, rebin, ratio, noise, _inp)
¶
wrap around function for fitting of histogram
- x (list, float): - bin centers along x
- _fit_func (string): which function to use for fit
- CHARGE_SHARING: single peak with charge sharing tail
- CHARGE_SHARING_2: sum of two CHARGE_SHARING
- GAUSS: gaussian function
- n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
- ratio (float): ratio parameter of the peak finder
- _input (list): contains the data to preform the fit
- _input[0]: histogram bin counts with shape (n_bins, memory_cells, row, col)
- _input[1]: noise map with shape (3, memory_cells, row, col)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def fit_histogram(x, _fit_func, n_sigma, rebin, ratio, noise, _inp):
"""
wrap around function for fitting of histogram
arguments:
* x (list, float): - bin centers along x
* _fit_func (string): which function to use for fit
* CHARGE_SHARING: single peak with charge sharing tail
* CHARGE_SHARING_2: sum of two CHARGE_SHARING
* GAUSS: gaussian function
* n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0
* ratio (float): ratio parameter of the peak finder
* _input (list): contains the data to preform the fit
* _input[0]: histogram bin counts with shape (n_bins, memory_cells, row, col)
* _input[1]: noise map with shape (3, memory_cells, row, col)
returns: map of peak values, map of peak variances, map of chi2/ndf, map of charge sharing parameter values
"""
_funcs = dict(
CHARGE_SHARING=fit_charge_sharing,
GAUSS=fit_gauss,
CHARGE_SHARING_2=fit_double_charge_sharing,
)
fit_func = _funcs[_fit_func]
histo = _inp[0]
n_cells, n_rows, n_cols = histo.shape[1:]
sigma0 = 15.
_init_array = np.zeros((n_cells, n_rows, n_cols), dtype=np.float32)
g0_chk = _init_array.copy()
sigma_chk = _init_array.copy()
chi2ndf_chk = _init_array.copy()
alpha_chk = _init_array.copy()
for cell in range(n_cells):
for row in range(n_rows):
for col in range(n_cols):
y = histo[:, cell, row, col]
y, _x = rebin_histo(y, x, rebin)
yerr = histogram_errors(y)
if noise[0, cell, row, col] > 0.:
sigma0 = noise[0, cell, row, col]
q, sigma, chi2ndf, alpha = fit_func(_x, y, yerr, sigma0, n_sigma, ratio)
g0_chk[cell, row, col] = q
sigma_chk[cell, row, col] = sigma
chi2ndf_chk[cell, row, col] = chi2ndf
alpha_chk[cell, row, col] = alpha
return g0_chk, sigma_chk, chi2ndf_chk, alpha_chk
set_histo_range(_x, _h, _h_range)
¶
reduces the histogram bin axis to the range specified
- _x (array, float): the bin centers array
- _h (array, integers): the histogram with shape (bins, cells, columns, row)
- _h_range (tuple, float): the (liminf, limsup) of the desired range
returns the new bin centers array and the new histogram
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def set_histo_range(_x, _h, _h_range):
"""
reduces the histogram bin axis to the range specified
args:
* _x (array, float): the bin centers array
* _h (array, integers): the histogram with shape (bins, cells, columns, row)
* _h_range (tuple, float): the (liminf, limsup) of the desired range
returns the new bin centers array and the new histogram
"""
i_min = (np.abs(_x - _h_range[0])).argmin()
i_max = (np.abs(_x - _h_range[1])).argmin()
return _x[i_min:i_max], _h[i_min:i_max]
subtract_offset(offset_map, _inp)
¶
perform offset subtraction on raw data
- offset_map (xarray, float): map with offset constants, with shape (3, memory_cells, row, col)
- _inp (list): input data as:
- _inp[0]: raw images, with shape (trains, memory_cells, row, col)
- _inp[1]: gain bit map, with shape (trains, memory_cells, row, col)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/jungfrau/jungfrau_ff.py
def subtract_offset(offset_map, _inp):
"""
perform offset subtraction on raw data
Args:
* offset_map (xarray, float): map with offset constants,
with shape (3, memory_cells, row, col)
* _inp (list): input data as:
* _inp[0]: raw images, with shape (trains, memory_cells, row, col)
* _inp[1]: gain bit map, with shape (trains, memory_cells, row, col)
Returns: offset subtracted images
"""
imgs = _inp[0]
gbit = _inp[1]
n_cells = imgs.shape[1]
for cell in range(n_cells):
this_cell_gbit = gbit.sel(cell=cell)
this_cell_off = offset_map[:, cell]
_o = np.choose(
this_cell_gbit, (
np.expand_dims(this_cell_off[0], axis=0),
np.expand_dims(this_cell_off[1], axis=0),
np.expand_dims(this_cell_off[2], axis=0)
)
)
imgs.loc[dict(cell=cell)] -= _o
return imgs