import os
import posixpath
import zlib
from datetime import datetime
from multiprocessing import Manager
from multiprocessing.pool import ThreadPool
from typing import Any, Dict, List, Optional, Tuple
import h5py
import numpy as np
import sharedmem
from dateutil import parser
from extra_data import DataCollection, H5File, by_id, components
from iCalibrationDB import Conditions, Constants
from cal_tools import agipdalgs as calgs
from cal_tools.agipdutils import (
assemble_constant_dict,
baseline_correct_via_noise,
baseline_correct_via_stripe,
correct_baseline_via_hist,
correct_baseline_via_hist_asic,
make_noisy_adc_mask,
match_asic_borders,
melt_snowy_pixels,
cast_array_inplace
)
from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution
from cal_tools.h5_copy_except import h5_copy_except_paths
from cal_tools.tools import get_from_db
class AgipdCtrl:
def __init__(
self,
run_dc: DataCollection,
image_src: str,
ctrl_src: str,
raise_error: bool = True,
):
""" Initialize AgipdCondition class to read
all required AGIPD parameters.
:param run_dc: Run data collection with expected sources
to read needed parameters.
:param image_src: H5 source for image data.
:param ctrl_src: H5 source for control (slow) data.
:param raise_error: Boolean to raise errors for missing
sources and keys.
"""
self.run_dc = run_dc
self.image_src = image_src
self.ctrl_src = ctrl_src
self.raise_error = raise_error
def get_num_cells(self) -> Optional[int]:
"""Read number of memory cells from fast data.
:return mem_cells: Number of memory cells
return None, if no data available.
"""
cells = np.squeeze(
self.run_dc[
self.image_src, "image.cellId"].drop_empty_trains().ndarray()
)
if cells.shape[0] == 0:
return None
maxcell = np.max(cells)
options = [4, 32, 64, 76, 128, 176, 202, 250, 352]
dists = [abs(o - maxcell) for o in options]
return options[np.argmin(dists)]
def _get_acq_rate_ctrl(self) -> Optional[float]:
"""Get acquisition (repetition) rate from CONTROL source."""
# Attempt to look for acquisition rate in slow data
rep_rate_src = (
self.ctrl_src, "bunchStructure.repetitionRate.value")
if (
rep_rate_src[0] in self.run_dc.all_sources and
rep_rate_src[1] in self.run_dc.keys_for_source(rep_rate_src[0])
):
# It is desired to loose precision here because the usage is
# about bucketing the rate for managing meta-data.
return round(float(self.run_dc[rep_rate_src].as_single_value()), 1)
def _get_acq_rate_instr(self) -> Optional[float]:
"""Get acquisition (repetition rate) from INSTRUMENT source."""
def _get_acq_rate_instr(self) -> Optional[float]:
"""Get acquisition (repetition rate) from INSTRUMENT source."""
train_pulses = np.squeeze(
self.run_dc[
self.image_src,
"image.pulseId"].drop_empty_trains().train_from_index(0)[1]
)
# Compute acquisition rate from fast data
diff = train_pulses[1] - train_pulses[0]
options = {8: 0.5, 4: 1.1, 2: 2.2, 1: 4.5}
return options.get(diff, None)
def get_acq_rate(self) -> Optional[float]:
"""Read the acquisition rate for the selected detector module.
The value is read from CONTROL source e.g.`CONTROL/../MDL/FPGA_COMP`,
If key is not available, the rate is calculated from
two consecutive pulses of the same trainId.
:return acq_rate: the acquisition rate.
return None, if not available.
"""
acq_rate = self._get_acq_rate_ctrl()
if acq_rate is not None:
return acq_rate
# For AGIPD500K this function would produce wrong value (always 4.5)
# before W10/2022.
# TODO: Confirm leaving this as it is.
return self._get_acq_rate_instr()
def _get_gain_setting_ctrl(self) -> Optional[int]:
"""Read gain_settings from CONTROL source and gain key."""
return int(self.run_dc[self.ctrl_src, "gain"].as_single_value())
def _get_gain_setting_ctrl_old(self) -> Optional[int]:
"""Read gain_settings from setupr and patterTypeIndex
from old RAW data that is missing `gain.value`.
If `gain.value` isn't available in MDL source,
the setting is calculated from `setupr` and `patternTypeIndex`
gain-setting 1: setupr@dark=8, setupr@slopespc=40
gain-setting 0: setupr@dark=0, setupr@slopespc=32
patternTypeIndex 1: High-gain
patternTypeIndex 2: Medium-gain
patternTypeIndex 3: Low-gain
patternTypeIndex 4: SlopesPC
Returns:
int: gain_setting value.
"""
setupr = self.run_dc[self.ctrl_src, "setupr"].as_single_value()
pattern_type_idx = self.run_dc[
self.ctrl_src, "patternTypeIndex"].as_single_value()
if (setupr == 0 and pattern_type_idx < 4) or (
setupr == 32 and pattern_type_idx == 4):
return 0
elif (setupr == 8 and pattern_type_idx < 4) or (
setupr == 40 and pattern_type_idx == 4):
return 1
else:
# TODO: Confirm that this can be removed.
if self.raise_error:
raise ValueError(
"Could not derive gain setting from"
" setupr and patternTypeIndex"
)
return
def get_gain_setting(
self,
creation_time: Optional[datetime] = None,
) -> Optional[int]:
"""Read Gain setting from CONTROL sources.
if key `gain.value` is not available, calculate gain_setting from
setupr and patterTypeIndex. If it failed raise ValueError.
:param creation_time: datetime object for the data creation time.
:return: gain setting.
return 0, if not available.
"""
# TODO: remove after fixing get_possible_conditions
if (
creation_time and
creation_time.replace(tzinfo=None) < parser.parse('2020-01-31')
):
print("Set gain-setting to None for runs taken before 2020-01-31")
return
if "gain.value" in self.run_dc.keys_for_source(self.ctrl_src):
return self._get_gain_setting_ctrl()
gain_setting = self._get_gain_setting_ctrl_old()
if gain_setting is not None:
return gain_setting
else:
# TODO: confirm that this can be removed.
print(
"ERROR: gain_setting is not available "
f"at source {self.ctrl_src}.\nSet gain_setting to 0.")
return 0
def get_gain_mode(self) -> int:
"""Returns the gain mode (adaptive or fixed) from slow data."""
if (
self.ctrl_src in self.run_dc.all_sources and
"gainModeIndex.value" in self.run_dc.keys_for_source(
self.ctrl_src)
):
return AgipdGainMode(int(self.run_dc[
self.ctrl_src, "gainModeIndex"].as_single_value()))
return AgipdGainMode.ADAPTIVE_GAIN
def get_bias_voltage(
self,
karabo_id_control: str,
module: Optional[int] = 0
) -> int:
"""Read the voltage information from the RUN source of module 0.
Different modules may operate at different voltages.
In practice, they all operate at the same voltage.
As such, it is okay to read a single module's value.
If the FPGA/PSC RUN source is not available, 300 will be returned.
300 was the default bias_voltage value for
MID_DET_AGIPD1M-1 and SPB_DET_AGIPD1M-1.
:param karabo_id_control: The karabo deviceId for the CONTROL device.
:param module: defaults to module 0
:return: bias voltage
"""
# TODO: Add a breaking fix by passing the source and key through
# get_bias_voltage arguments.
if "AGIPD1M" in karabo_id_control:
voltage_src = (
f"{karabo_id_control[:-1]}/PSC/HV",
f"channels.U{module}.measurementSenseVoltage.value")
# TODO: Validate if removing this and depend on adding voltage value
# from the Notebook's first cell.
default_voltage = 300
else: # AGIPD500K
voltage_src = (
f"{karabo_id_control}/FPGA/M_{module}",
"highVoltage.actual.value")
default_voltage = None
if (
voltage_src[0] in self.run_dc.all_sources and
voltage_src[1] in self.run_dc.keys_for_source(voltage_src[0])
):
# Use RUN source for reading the bias voltage value.
# As HED_DET_AGIPD500K2G has a hardware issue that leads
# to storing arbitrary voltage values in the CONTROL source
# array. e.g. /gpfs/exfel/exp/HED/202230/p900248/raw
return int(self.run_dc.get_run_value(*voltage_src))
else:
# TODO: Validate if removing this and
# and using NB value for old RAW data.
error = ("ERROR: Unable to read bias_voltage from"
f" {voltage_src[0]}/{voltage_src[1].replace('.','/')}.")
if default_voltage:
print(f"{error} Returning {default_voltage} "
"as default bias voltage value.")
else:
raise ValueError(error)
return default_voltage
def get_integration_time(self) -> int:
"""Read integration time from the FPGA device.
The integration time is specified as an integer number of clock
cycles each spanning ~9ns. The default (and legacy) value is 12.
:return: integration time
"""
if (
self.ctrl_src in self.run_dc.all_sources and
'integrationTime.value' in self.run_dc.keys_for_source(
self.ctrl_src)
):
return int(self.run_dc[
self.ctrl_src, 'integrationTime'].as_single_value())
return 12
class CellSelection:
"""Selection of detector memory cells (abstract class)"""
row_size = 32
ncell_max = 352
CM_NONE = 0
CM_PRESEL = 1
CM_FINSEL = 2
def get_cells_on_trains(
self, trains_sel: List[int], cm: int = 0
) -> np.array:
"""Returns mask of cells selected for processing
:param train_sel: list of a train ids selected for processing
:param cm: flag indicates the final selection or interim selection
for common-mode correction
"""
raise NotImplementedError
def msg(self):
"""Return log message on initialization"""
raise NotImplementedError
@staticmethod
def _sel_for_cm(flag, flag_cm, cm):
if cm == CellSelection.CM_NONE:
return flag
elif cm == CellSelection.CM_PRESEL:
return flag_cm
elif cm == CellSelection.CM_FINSEL:
return flag[flag_cm]
else:
raise ValueError("param 'cm' takes only 0,1,2")
[docs]class AgipdCorrections:
def __init__(
self,
max_cells: int,
cell_sel: CellSelection,
h5_data_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/",
h5_index_path: str = "SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/",
corr_bools: Optional[dict] = None,
gain_mode: AgipdGainMode = AgipdGainMode.ADAPTIVE_GAIN,
comp_threads: int = 1,
train_ids: Optional[np.ndarray] = None
):
"""
Initialize an AgipdCorrections Class
:param max_cells: maximum number of memory cells to handle, e.g. if
calibration constants only exist for a subset of cells
:param cell_sel: the CellSelection indicates cells selected for
calibration
:param h5_data_path: path in HDF5 file which is prefixed to the
image/data section
:param h5_index_path: path in HDF5 file which is prefixed to the
index section
:param corr_bools: A dict with all of the correction booleans requested
or available
:param comp_threads: Number of threads to use for compressing gain/mask
:param train_ids: train IDs to process, all if omitted.
The following example shows a typical use case:
.. code-block:: python
cell_sel = CellRange(max_pulses, max_cells)
agipd_corr = AgipdCorrections(max_cells, cell_sel,
h5_data_path=h5path,
h5_index_path=h5path_idx,
corr_bools=corr_bools)
agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128))
metadata = cal_tools.tools.CalibrationMetadata(out_folder)
const_yaml = metadata["retrieved-constants"]
for mod in modules:
agipd_corr.initialize_from_yaml(karabo_da, const_yaml, mod)
data_shape = (n_images_max, 512, 128)
agipd_corr.allocate_images(data_shape, n_cores_files)
i_proc=0
agipd_corr.read_file(i_proc, file_name, apply_sel_pulses)
# Correct first 1000 images
agipd_corr.correct_agipd(i_proc, first=0, last=1000)
agipd_corr.write_file(i_proc, file_name, ofile_name)
"""
if corr_bools is None:
corr_bools = {}
# Data description
self.h5_data_path = h5_data_path
self.h5_index_path = h5_index_path
self.max_cells = max_cells
self.gain_mode = gain_mode
self.comp_threads = comp_threads
self.train_ids = np.array(train_ids) if train_ids is not None else None
self.cell_sel = cell_sel
# Correction parameters
self.baseline_corr_noise_threshold = -1000
self.snow_resolution = SnowResolution.INTERPOLATE
self.cm_dark_fraction = 0.66
self.cm_dark_min = -25.
self.cm_dark_max = 25.
self.cm_n_itr = 4
self.mg_hard_threshold = 100
self.hg_hard_threshold = 100
self.noisy_adc_threshold = 0.25
self.ff_gain = 1
self.photon_energy = 9.2
# Output parameters
self.compress_fields = ['gain', 'mask']
self.recast_image_fields = {}
# Shared variables for data and constants
self.shared_dict = []
self.offset = {}
self.noise = {}
self.thresholds = {}
self.frac_high_med = {}
self.md_additional_offset = {}
self.rel_gain = {}
self.mask = {}
self.xray_cor = {}
if corr_bools.get('round_photons'):
# Variables related to histogramming.
self.shared_hist_preround = None
self.shared_hist_postround = None
self.hist_bins_preround = np.linspace(-5.0, 60.0, 200)
self.hist_bins_postround = np.arange(-5.5, 60.1)
self.hist_lock = Manager().Lock()
# check if given corr_bools are correct
tot_corr_bools = ['only_offset', 'adjust_mg_baseline', 'rel_gain',
'xray_corr', 'blc_noise', 'blc_hmatch',
'blc_stripes', 'blc_set_min', 'match_asics',
'corr_asic_diag', 'zero_nans',
'zero_orange', 'force_hg_if_below',
'force_mg_if_below', 'mask_noisy_adc',
'melt_snow', 'common_mode', 'mask_zero_std',
'low_medium_gap', 'round_photons']
if set(corr_bools).issubset(tot_corr_bools):
self.corr_bools = corr_bools
else:
bools = list(set(corr_bools) - set(tot_corr_bools))
raise Exception('Correction Booleans: '
f'{bools} are not available!')
# Flags allowing for pulse capacitor constant retrieval.
self.pc_bools = [self.corr_bools.get("rel_gain"),
self.corr_bools.get("adjust_mg_baseline"),
self.corr_bools.get('blc_noise'),
self.corr_bools.get('blc_hmatch'),
self.corr_bools.get('blc_stripes'),
self.corr_bools.get('melt_snow')]
self.blc_bools = [self.corr_bools.get('blc_noise'),
self.corr_bools.get('blc_hmatch'),
self.corr_bools.get('blc_stripes')]
[docs] def read_file(self, i_proc: int, file_name: str,
apply_sel_pulses: Optional[bool] = True
) -> int:
"""Read file with raw data to shared memory
:param file_name: Name of input file including path.
:param i_proc: Index of shared memory array.
:param apply_sel_pulses: apply selected pulses before
all corrections.
:return:
- n_img: The number of images to correct.
"""
module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
agipd_base = self.h5_data_path.format(module_idx)
data_dict = self.shared_dict[i_proc]
data_dict['moduleIdx'][0] = module_idx
h5_dc = H5File(file_name)
# Exclude trains without data.
im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
valid_train_ids = self.get_valid_image_idx(
im_dc[agipd_base, "image.trainId"])
if not valid_train_ids:
# If there's not a single valid train, exit early.
print(f"WARNING: No valid trains for {im_dc.files} to process.")
data_dict['nImg'][0] = 0
return 0
# store valid trains in shared memory
# valid_train_ids = train_ids[valid]
n_valid_trains = len(valid_train_ids)
data_dict["n_valid_trains"][0] = n_valid_trains
data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
# get cell selection for the images in this file
cm = (self.cell_sel.CM_NONE if apply_sel_pulses
else self.cell_sel.CM_PRESEL)
img_selected = self.cell_sel.get_cells_on_trains(
valid_train_ids, cm=cm)
data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
# Exclude non_valid trains from the selected data collection.
im_dc = im_dc.select_trains(by_id(valid_train_ids))
if "AGIPD500K" in agipd_base:
agipd_comp = components.AGIPD500K(im_dc)
else:
agipd_comp = components.AGIPD1M(im_dc)
kw = {
"unstack_pulses": False,
}
# [n_modules, n_imgs, 2, x, y]
raw_data = agipd_comp.get_array("image.data", **kw)[0]
frm_ix = np.flatnonzero(img_selected)
n_img = frm_ix.size
data_dict['nImg'][0] = n_img
data_dict['data'][:n_img] = raw_data[frm_ix, 0]
data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
data_dict['cellId'][:n_img] = agipd_comp.get_array(
"image.cellId", **kw)[0, frm_ix]
data_dict['pulseId'][:n_img] = agipd_comp.get_array(
"image.pulseId", **kw)[0, frm_ix]
data_dict['trainId'][:n_img] = agipd_comp.get_array(
"image.trainId", **kw)[0, frm_ix]
return n_img
[docs] def write_file(self, i_proc, file_name, ofile_name):
"""
Create output file and write corrected data to it
:param file_name: Name of input file including path
:param ofile_name: Name of output file including path
:param i_proc: Index of shared memory array
"""
module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
agipd_base = f'INSTRUMENT/{self.h5_data_path}/'.format(module_idx)
idx_base = self.h5_index_path.format(module_idx)
data_path = f'{agipd_base}/image'
# Obtain a shallow copy of the pointer map to allow for local
# changes in this method.
data_dict = self.shared_dict[i_proc].copy()
image_fields = [
'trainId', 'pulseId', 'cellId', 'data', 'gain', 'mask', 'blShift',
]
n_img = data_dict['nImg'][0]
if n_img == 0:
return
trains = data_dict['trainId'][:n_img]
# Re-cast fields in-place, i.e. using the same memory region.
for field, dtype in self.recast_image_fields.items():
data_dict[field] = cast_array_inplace(data_dict[field], dtype)
with h5py.File(ofile_name, "w") as outfile:
# Copy any other data from the input file.
# This includes indexes, so it's important that the corrected data
# we write is aligned with the raw data.
with h5py.File(file_name, "r") as infile:
self.copy_and_sanitize_non_cal_data(
infile, outfile, agipd_base, idx_base, trains
)
# All corrected data goes in a /INSTRUMENT/.../image group
image_grp = outfile[data_path]
# Set up all the datasets before filling them. This puts the
# metadata about the datasets together at the start of the file,
# so it's efficient to examine the file structure.
for field in image_fields:
arr = data_dict[field][:n_img]
if field in self.compress_fields:
# gain/mask compressed with gzip level 1, but not
# checksummed as we would have to implement this.
kw = dict(
compression='gzip', compression_opts=1, shuffle=True
)
else:
# Uncompressed data can easily be checksummed by HDF5's
# filter pipeline. This should be cheap to compute.
kw = {'fletcher32': True}
if arr.ndim > 1:
kw['chunks'] = (1,) + arr.shape[1:] # 1 chunk = 1 image
image_grp.create_dataset(
field, shape=arr.shape, dtype=arr.dtype, **kw
)
# Write the corrected data
for field in image_fields:
if field in self.compress_fields:
self._write_compressed_frames(
image_grp[field], data_dict[field][:n_img],
)
else:
image_grp[field][:] = data_dict[field][:n_img]
def _write_compressed_frames(self, dataset, arr):
"""Compress gain/mask frames in multiple threads, and save their data
This is significantly faster than letting HDF5 do the compression
in a single thread.
"""
def _compress_frame(i):
# Equivalent to the HDF5 'shuffle' filter: transpose bytes for
# better compression.
shuffled = np.ascontiguousarray(
arr[i].view(np.uint8).reshape((-1, arr.itemsize)).transpose()
)
return i, zlib.compress(shuffled, level=1)
with ThreadPool(self.comp_threads) as pool:
for i, compressed in pool.imap(_compress_frame, range(len(arr))):
# Each frame is 1 complete chunk
dataset.id.write_direct_chunk((i, 0, 0), compressed)
[docs] def cm_correction(self, i_proc, asic):
"""
Perform common-mode correction of data in shared memory
In a given code a complete file is loaded to the memory.
Asics common mode correction is calculated based on single image.
Cell common mode is calculated across trains and groups of 32 cells.
Both corrections are iterative and requires 4 iterations.
Correction is performed in chunks of (e.g. 512 images).
A complete array of data from one file
(256 trains, 352 cells) will take
256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory
:param i_proc: Index of shared memory array to process
:param asic: Asic number to process
"""
if not self.corr_bools.get("common_mode"):
return
dark_min = self.cm_dark_min
dark_max = self.cm_dark_max
fraction = self.cm_dark_fraction
n_itr = self.cm_n_itr
n_img = self.shared_dict[i_proc]['nImg'][0]
if n_img == 0:
return
cell_id = self.shared_dict[i_proc]['cellId'][:n_img]
train_id = self.shared_dict[i_proc]['trainId'][:n_img]
cell_ids = cell_id[train_id == train_id[0]]
n_cells = cell_ids.size
data = self.shared_dict[i_proc]['data'][:n_img].reshape(-1, n_cells,
8, 64, 2, 64)
# Loop over iterations
for _ in range(n_itr):
# Loop over rows of cells
# TODO: check what occurs in case of 64 memory cells
# as it will have less than 11 iterations
first = 0
for cell_row in range(11):
last = first + cell_ids[(cell_ids // 32) == cell_row].size
if first == last:
continue
asic_data = data[:, first:last, asic % 8, :, asic // 8, :]
# Cell common mode
cell_cm_sum, cell_cm_count = \
calgs.sum_and_count_in_range_cell(asic_data, dark_min,
dark_max)
cell_cm = cell_cm_sum / cell_cm_count
# TODO: check files with less 256 trains
cell_cm[cell_cm_count < fraction * 32 * 256] = 0
asic_data[...] -= cell_cm[None, None, :, :]
# Asics common mode
asic_cm_sum, asic_cm_count = \
calgs.sum_and_count_in_range_asic(asic_data, dark_min,
dark_max)
asic_cm = asic_cm_sum / asic_cm_count
asic_cm[asic_cm_count < fraction * 64 * 64] = 0
asic_data[...] -= asic_cm[:, :, None, None]
first = last
[docs] def mask_zero_std(self, i_proc, cells):
"""
Add bad pixel bit: DATA_STD_IS_ZERO to the mask of bad pixels
Pixel is bad if standard deviation for a given pixel and
given memory cell is zero
:param i_proc: Index of shared memory array to process
:param cells: List of cells to be considered
"""
if not self.corr_bools.get("mask_zero_std"):
return
module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
n_img = self.shared_dict[i_proc]['nImg'][0]
data = self.shared_dict[i_proc]['data'][:n_img]
cellid = self.shared_dict[i_proc]['cellId'][:n_img]
mask_std = self.mask[module_idx] # shape of n_cells, x, y
for c in cells:
std = np.nanstd(data[cellid == c, ...], axis=0)
mask_std[:, c, std == 0] |= BadPixels.DATA_STD_IS_ZERO
[docs] def offset_correction(self, i_proc: int, first: int, last: int):
"""
Perform image-wise offset correction for data in shared memory
:param first: Index of the first image to be corrected
:param last: Index of the last image to be corrected
:param i_proc: Index of shared memory array to process
"""
module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
data = self.shared_dict[i_proc]['data'][first:last]
rawgain = self.shared_dict[i_proc]['rawgain'][first:last]
gain = self.shared_dict[i_proc]['gain'][first:last]
cellid = self.shared_dict[i_proc]['cellId'][first:last]
if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
# first evaluate the gain into 0, 1, 2 --> high, medium, low
t0 = self.thresholds[module_idx][0]
t1 = self.thresholds[module_idx][1]
if self.corr_bools.get("melt_snow"):
# load raw_data and rgain to be used during gain_correction
self.shared_dict[i_proc]["t0_rgain"][first:last] = rawgain / t0[cellid, ...] # noqa
self.shared_dict[i_proc]["raw_data"][first:last] = np.copy(data) # noqa
# Often most pixels are in high-gain, so it's more efficient to
# set the whole output block to zero than select the right pixels.
gain[:] = 0
# exceeding first threshold means data is medium or low gain
gain[rawgain > t0[cellid, ...]] = 1
# exceeding also second threshold means data is low gain
gain[rawgain > t1[cellid, ...]] = 2
else:
# the enum values map 1, 2, 3 to (fixed) gain modes
gain[:] = self.gain_mode - 1
offsetb = self.offset[module_idx][:, cellid]
# force into high or medium gain if requested
if self.corr_bools.get("force_mg_if_below"):
gain[(gain == 2) & ((data - offsetb[1]) < self.mg_hard_threshold)] = 1 # noqa
if self.corr_bools.get("force_hg_if_below"):
gain[(gain > 0) & ((data - offsetb[0]) < self.hg_hard_threshold)] = 0 # noqa
# choose constants according to gain setting
off = calgs.gain_choose(gain, offsetb)
del offsetb
# subtract offset
data -= off
del off
[docs] def baseline_correction(self, i_proc: int, first: int, last: int):
"""
Perform image-wise base-line shift correction for
data in shared memory via histogram or stripe
:param first: Index of the first image to be corrected
:param last: Index of the last image to be corrected
:param i_proc: Index of shared memory array to process
"""
# before doing relative gain correction we need to evaluate any
# baseline shifts
# as they are effectively and additional offset in the data
if not any(self.blc_bools):
return
module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
data = self.shared_dict[i_proc]['data'][first:last]
bl_shift = self.shared_dict[i_proc]['blShift'][first:last]
gain = self.shared_dict[i_proc]['gain'][first:last]
cellid = self.shared_dict[i_proc]['cellId'][first:last]
# output is saved in sharedmem to pass for correct_agipd()
# as this function takes about 3 seconds.
self.shared_dict[i_proc]["msk"][first:last] = calgs.gain_choose(
gain, self.mask[module_idx][:, cellid]
)
if hasattr(self, "rel_gain"):
# Get the correct rel_gain depending on cell-id
self.shared_dict[i_proc]['rel_corr'][first:last] = \
calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid])
# do this image wise, as the shift is per image
for i in range(data.shape[0]):
# first correction requested may be to evaluate shift via
# noise peak
if self.corr_bools.get('blc_noise'):
mn_noise = np.nanmean(self.noise[module_idx][0, cellid[i]])
dd, sh = baseline_correct_via_noise(data[i], mn_noise,
gain[i],
self.baseline_corr_noise_threshold) # noqa
# if not we continue with initial data
else:
dd = data[i]
sh = 0
# if we have enough pixels in medium or low gain and
# correction via hist matching is requested to this now
gcrit = np.count_nonzero(gain[i] > 0) > 1000
if (gcrit and self.corr_bools.get('blc_hmatch') and
hasattr(self, "rel_gain")):
dd2, sh2 = correct_baseline_via_hist(data[i],
self.shared_dict[i_proc]['rel_corr'][first:last][i], # noqa
gain[i])
data[i] = np.maximum(dd, dd2)
sh = np.minimum(sh, sh2)
# finally correct diagonal effects if requested
if self.corr_bools.get('corr_asic_diag'):
ii = data[i, ...]
gg = gain[i, ...]
adim = correct_baseline_via_hist_asic(ii, gg)
data[i, ...] = adim
# if there is not enough medium or low gain data to do an
# evaluation, do nothing
else:
data[i, ...] = dd
bl_shift[i] = sh
if self.corr_bools.get('blc_stripes'):
fmh = self.frac_high_med[module_idx][cellid[i]]
dd, sh = baseline_correct_via_stripe(data[i, ...],
gain[i, ...],
self.shared_dict[i_proc]['msk'][first:last][i, ...], # noqa
fmh)
data[i, ...] = dd
bl_shift[i] = sh
[docs] def gain_correction(self, i_proc: int, first: int, last: int):
"""
Perform several image-wise corrections for data in shared memory
e.g. Relative gain, FlatField xray correction, .....
:param first: Index of the first image to be corrected
:param last: Index of the last image to be corrected
:param i_proc: Index of shared memory array to process
"""
module_idx = self.shared_dict[i_proc]['moduleIdx'][0]
data = self.shared_dict[i_proc]['data'][first:last]
gain = self.shared_dict[i_proc]['gain'][first:last]
cellid = self.shared_dict[i_proc]['cellId'][first:last]
mask = self.shared_dict[i_proc]['mask'][first:last]
rel_corr = self.shared_dict[i_proc]['rel_corr'][first:last]
msk = self.shared_dict[i_proc]['msk'][first:last]
# if baseline correction was not requested
# msk and rel_corr will still be empty shared_mem arrays
if not any(self.blc_bools):
msk = calgs.gain_choose(gain, self.mask[module_idx][:, cellid])
# same for relative gain and then bad pixel mask
if hasattr(self, "rel_gain"):
# Get the correct rel_gain depending on cell-id
rel_corr = calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # noqa
# Correct for relative gain
if self.corr_bools.get("rel_gain") and hasattr(self, "rel_gain"):
data *= rel_corr
del rel_corr
# Adjust medium gain baseline to match highest high gain value
if self.corr_bools.get("adjust_mg_baseline"):
mgbc = self.md_additional_offset[module_idx][cellid, ...]
data[gain == 1] += mgbc[gain == 1]
del mgbc
# Set negative values for medium gain to 0
# TODO: Probably it would be better to add it to badpixel maps,
# not just set to 0
if self.corr_bools.get('blc_set_min'):
data[(data < 0) & (gain == 1)] = 0
# Do xray correction if requested
# The slopes we have in our constants are already relative
# slopeFF = slopeFFpix/avarege(slopeFFpix)
# To apply them we have to / not *
if self.corr_bools.get("xray_corr"):
data /= self.xray_cor[module_idx][cellid, ...]
# use sharedmem raw_data and t0_rgain
# after calculating it while offset correcting.
if self.corr_bools.get('melt_snow'):
_ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last], # noqa
data, gain,
self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa
self.snow_resolution)
# Inner ASIC borders are matched to the same signal level
if self.corr_bools.get("match_asics"):
data = match_asic_borders(data, 8, module_idx)
# Add any non-finite values to the mask, zero them
if self.corr_bools.get("zero_nans"):
bidx = ~np.isfinite(data)
data[bidx] = 0
msk[bidx] |= BadPixels.VALUE_IS_NAN
del bidx
# Add pixels with unrealistically high and low values to the mask.
# Zero them.
if self.corr_bools.get("zero_orange"):
bidx = (data < -1e7) | (data > 1e7)
data[bidx] = 0
msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
del bidx
# Round keV-normalized intensity to photons.
if self.corr_bools.get("round_photons"):
data_hist_preround, _ = np.histogram(data, bins=self.hist_bins_preround)
data /= self.photon_energy
np.round(data, out=data)
# This could also be done before and its mask inverted for
# rounding, but the performance difference is negligible.
bidx = data < 0
data[bidx] = 0
msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
del bidx
data_hist_postround, _ = np.histogram(data * self.photon_energy,
bins=self.hist_bins_postround)
with self.hist_lock:
self.shared_hist_preround += data_hist_preround
self.shared_hist_postround += data_hist_postround
if np.issubdtype(self.recast_image_fields.get('image'), np.integer):
# If the image data is meant to be recast to an integer
# type, make sure its values are within its bounds.
type_info = np.iinfo(self.recast_image_data['image'])
bidx = data < type_info.min
data[bidx] = 0
msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
del bidx
bidx = data > type_info.max
data[bidx] = type_info.max
msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE
del bidx
# Mask entire ADC if they are noise above a threshold
# TODO: Needs clarification if needed,
# the returned arg is not used.
if self.corr_bools.get("mask_noisy_adc"):
_ = make_noisy_adc_mask(msk,
self.noisy_adc_threshold)
# Copy the data across into the existing shared-memory array
mask[...] = msk[...]
[docs] def get_valid_image_idx(self, im_dc: DataCollection) -> list: # noqa
"""Return a list of valid train ids.
Exclude non-valid train ids from past or future.
"""
dc_trains = im_dc.train_ids
if len(dc_trains) == 0:
return [] # No trains to validate.
# Check against train ID filter list, if any
if self.train_ids is not None:
valid = np.in1d(dc_trains, self.train_ids)
else:
valid = np.ones_like(dc_trains, dtype=bool)
# Train indices are of type=f32
# Validate that train indices values fall
# between medianTrain +- 1e4
medianTrain = np.nanmedian(dc_trains)
lowok = (dc_trains > medianTrain - 1e4)
highok = (dc_trains < medianTrain + 1e4)
valid &= lowok & highok
# exclude non valid trains
valid_trains = valid * dc_trains
return list(valid_trains[valid_trains != 0])
def apply_selected_pulses(self, i_proc: int) -> int:
"""Select sharedmem data indices to correct based on selected
pulses indices.
:param i_proc: the index of sharedmem for a given file/module
:return n_img: number of images to correct
"""
data_dict = self.shared_dict[i_proc]
n_img = data_dict['nImg'][0]
if not data_dict["cm_presel"][0]:
return n_img
ntrains = data_dict["n_valid_trains"][0]
train_ids = data_dict["valid_trains"][:ntrains]
# Initializing can_calibrate array
can_calibrate = self.cell_sel.get_cells_on_trains(
train_ids, cm=self.cell_sel.CM_FINSEL
)
if np.all(can_calibrate):
return n_img
# Get selected number of images based on
# selected pulses to correct
n_img_sel = np.count_nonzero(can_calibrate)
# Only select data corresponding to selected pulses
# and overwrite data in shared-memory leaving
# the required indices to correct
array_names = ["data", "rawgain", "cellId", "pulseId", "trainId", "gain"]
# if AGIPD in fixed gain mode or melting snow was not requested
# `t0_rgain` and `raw_data` will be empty shared_mem arrays
is_adaptive = self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN
melt_snow = self.corr_bools.get("melt_snow")
if (is_adaptive and melt_snow):
array_names += ["t0_rgain", "raw_data"]
# if baseline correction was not requested
# `msk` and `rel_corr` will still be empty shared_mem arrays
if any(self.blc_bools):
array_names += ["blShift", "msk"]
if hasattr(self, "rel_gain"):
array_names.append("rel_corr")
for name in array_names:
arr = data_dict[name][:n_img][can_calibrate]
data_dict[name][:n_img_sel] = arr
# Overwrite number of images
data_dict['nImg'][0] = n_img_sel
return n_img_sel
def copy_and_sanitize_non_cal_data(self, infile, outfile, agipd_base,
idx_base, trains):
""" Copy and sanitize data in `infile` that is not touched by
`correctAGIPD`
"""
# these are touched in the correct function, do not copy them here
dont_copy = ["data", "cellId", "trainId", "pulseId", "status",
"length"]
dont_copy = [posixpath.join(agipd_base, "image", ds)
for ds in dont_copy]
# don't copy index as we may need to adjust if we filter trains
dont_copy.append(posixpath.join(idx_base, "image"))
h5_copy_except_paths(infile, outfile, dont_copy)
# sanitize indices
for do in ["image", ]:
# uq: INDEX/trainID
# fidxv: INDEX/.../image/first idx values
# cntsv: INDEX/.../image/counts values
# Extract parameters through identifying
# unique trains, index and numbers.
uq, fidxv, cntsv = np.unique(trains, return_index=True, return_counts=True) # noqa
# Validate calculated CORR INDEX contents by checking
# difference between trainId stored in RAW data and trains from
train_diff = np.isin(np.array(infile["/INDEX/trainId"]), uq, invert=True) # noqa
# Insert zeros for missing trains.
# fidxv and cntsv should have same length as
# raw INDEX/.../image/first and INDEX/.../image/count,
# respectively
# first_inc = first incrementation
first_inc = True
for i, diff in enumerate(train_diff):
if diff:
if i < len(cntsv):
cntsv = np.insert(cntsv, i, 0)
fidxv = np.insert(fidxv, i, 0) if i == 0 else np.insert(fidxv, i, fidxv[i])
else:
# append if at the end of the array
cntsv = np.append(cntsv, 0)
# increment fidxv once with the
# no. of processed mem-cells.
if first_inc:
fidxv = np.append(fidxv,
(2 * fidxv[i-1]) - fidxv[i-2])
first_inc = False
else:
fidxv = np.append(fidxv, fidxv[i-1])
# save INDEX contents (first, count) in CORR files
outfile.create_dataset(idx_base + "{}/first".format(do),
fidxv.shape,
dtype=fidxv.dtype,
data=fidxv,
fletcher32=True)
outfile.create_dataset(idx_base + "{}/count".format(do),
cntsv.shape,
dtype=cntsv.dtype,
data=cntsv,
fletcher32=True)
def init_constants(self, cons_data, when, module_idx, variant):
"""
For CI derived gain, a mean multiplication factor of 4.48 compared
to medium gain is used, as no reliable CI data for all memory cells
exists of the current AGIPD instances.
Relative gain is derived both from pulse capacitor as well as low
intensity flat field data, information from flat field data is
needed to 'calibrate' pulse capacitor data, if there is no
available FF data, relative gain for High Gain stage is set to 1:
* Relative gain for High gain stage - from the FF data we get
the relative slopes of a given pixel and memory cells with
respect to all memory cells and all pixels in the module,
Please note: Current slopesFF avaialble in calibibration
constants are created per pixel only, not per memory cell:
rel_high_gain = 1 if only PC data is available
rel_high_gain = rel_slopesFF if FF data is also available
* Relative gain for Medium gain stage: we derive the factor
between high and medium gain using slope information from
fits to the linear part of high and medium gain:
rfpc_high_medium = m_h/m_m
where m_h and m_m is the medium gain slope of given memory cells
and pixel and m_h is the high gain slope as above
rel_gain_medium = rel_high_gain * rfpc_high_medium
With this data the relative gain for the three gain stages evaluates
to:
rel_high gain = 1 or rel_slopesFF
rel_medium gain = rel_high_gain * rfpc_high_medium
rel_low gain = _rel_medium gain * 4.48
:param cons_data: A dictionary for each retrieved constant value.
:param when: A dictionary for the creation time
of each retrieved constant.
:param module_idx: A module_idx index
:param variant: A dictionary for the variant of each retrieved CCV.
:return:
"""
# Distribute threads for transposition evenly across all modules
# assuming this method runs in parallel.
calgs_opts = dict(num_threads=os.cpu_count() // len(self.offset))
calgs.transpose_constant(self.offset[module_idx], cons_data['Offset'], **calgs_opts)
calgs.transpose_constant(self.noise[module_idx], cons_data['Noise'], **calgs_opts)
if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
calgs.transpose_constant(self.thresholds[module_idx],
cons_data['ThresholdsDark'][..., :3],
**calgs_opts)
if self.corr_bools.get("low_medium_gap"):
t0 = self.thresholds[module_idx][0]
t1 = self.thresholds[module_idx][1]
t1[t1 <= 1.05 * t0] = np.iinfo(np.uint16).max
bpixels = cons_data["BadPixelsDark"].astype(np.uint32)
if self.corr_bools.get("xray_corr"):
if when["BadPixelsFF"]:
bpixels |= cons_data["BadPixelsFF"].astype(np.uint32)[...,
:bpixels.shape[2], # noqa
None]
if when["SlopesFF"]: # Checking if constant was retrieved
slopesFF = cons_data["SlopesFF"]
# This could be used for backward compatibility
# for very old SlopesFF constants
if len(slopesFF.shape) == 4:
slopesFF = slopesFF[..., 0]
# This is for backward compatability for old FF constants
# (128, 512, mem_cells)
if slopesFF.shape[-1] == 2:
xray_cor = np.squeeze(slopesFF[..., 0])
xray_cor_med = np.nanmedian(xray_cor)
xray_cor[np.isnan(xray_cor)] = xray_cor_med
xray_cor[(xray_cor < 0.8) | (
xray_cor > 1.2)] = xray_cor_med
xray_cor = np.dstack([xray_cor]*self.max_cells)
else:
# Memory cell resolved xray_cor correction
xray_cor = slopesFF # (128, 512, mem_cells)
if xray_cor.shape[-1] < self.max_cells:
# When working with new constant with fewer memory
# cells, eg. lacking enough FF data or during
# development, xray_cor must be expand its last memory
# cell to maintain a consistent shape.
xray_cor = np.dstack(xray_cor,
np.dstack([xray_cor[..., -1]]
* (self.max_cells - xray_cor.shape[-1]))) # noqa
# This is already done for old constants,
# but new constant is absolute and we need to have
# global ADU output for the moment
xray_cor /= self.ff_gain
else:
xray_cor = np.ones((128, 512, self.max_cells), np.float32)
self.xray_cor[module_idx][...] = xray_cor.transpose()[...]
# add additional bad pixel information
if any(self.pc_bools):
if when["BadPixelsPC"]:
bppc = np.moveaxis(cons_data["BadPixelsPC"].astype(np.uint32),
0, 2)
bpixels |= bppc[..., :bpixels.shape[2], None]
# calculate relative gain from the constants
rel_gain = np.ones((128, 512, self.max_cells, 3), np.float32)
if when["SlopesPC"]:
slopesPC = cons_data["SlopesPC"].astype(np.float32, copy=False)
# This will handle some historical data in a different format
# constant dimension injected first
if slopesPC.shape[0] in [10, 11]:
slopesPC = np.moveaxis(slopesPC, 0, 3)
slopesPC = np.moveaxis(slopesPC, 0, 2)
# high gain slope from pulse capacitor data
pc_high_m = slopesPC[..., :self.max_cells, 0]
pc_high_l = slopesPC[..., :self.max_cells, 1]
# medium gain slope from pulse capacitor data
pc_med_m = slopesPC[..., :self.max_cells, 3]
pc_med_l = slopesPC[..., :self.max_cells, 4]
# calculate median for slopes
pc_high_med = np.nanmedian(pc_high_m, axis=(0, 1))
pc_med_med = np.nanmedian(pc_med_m, axis=(0, 1))
if variant == 0:
# calculate median for intercepts:
pc_high_l_med = np.nanmedian(pc_high_l, axis=(0, 1))
pc_med_l_med = np.nanmedian(pc_med_l, axis=(0, 1))
# sanitize PC data with CCV variant = 0.
# Sanitization is already done for constants
# with CCV variant = 1
# In the following loop,
# replace `nan`s across memory cells with
# the median value calculated previously.
# Then, values outside of the valid range (0.8 and 1.2)
# are fixed to the median value.
# This is applied for high and medium gain stages
for i in range(self.max_cells):
pc_high_m[
np.isnan(pc_high_m[..., i]), i] = pc_high_med[i]
pc_med_m[
np.isnan(pc_med_m[..., i]), i] = pc_med_med[i]
pc_high_l[
np.isnan(pc_high_l[..., i]), i] = pc_high_l_med[i]
pc_med_l[
np.isnan(pc_med_l[..., i]), i] = pc_med_l_med[i]
pc_high_m[
(pc_high_m[..., i] < 0.8 * pc_high_med[i]) |
(pc_high_m[..., i] > 1.2 * pc_high_med[i]), i] = pc_high_med[i] # noqa
pc_med_m[
(pc_med_m[..., i] < 0.8 * pc_med_med[i]) |
(pc_med_m[..., i] > 1.2 * pc_med_med[i]), i] = pc_med_med[i] # noqa
pc_high_l[
(pc_high_l[..., i] < 0.8 * pc_high_l_med[i]) |
(pc_high_l[..., i] > 1.2 * pc_high_l_med[i]), i] = pc_high_l_med[i] # noqa
pc_med_l[
(pc_med_l[..., i] < 0.8 * pc_med_l_med[i]) |
(pc_med_l[..., i] > 1.2 * pc_med_l_med[i]), i] = pc_med_l_med[i] # noqa
# ration between HG and MG per pixel per mem cell used
# for rel gain calculation
frac_high_med_pix = pc_high_m / pc_med_m
# average ratio between HG and MG as a function of
# mem cell (needed for bls_stripes)
# TODO: Per pixel would be more optimal correction
frac_high_med = pc_high_med / pc_med_med
# calculate additional medium-gain offset
md_additional_offset = pc_high_l - pc_med_l * pc_high_m / pc_med_m # noqa
# Calculate relative gain. If FF constants are available,
# use them for high gain
# if not rel_gain is calculated using PC data only
# if self.corr_bools.get("xray_corr"):
# rel_gain[..., :self.max_cells, 0] /= xray_corr
# PC data should be 'calibrated with X-ray data,
# if it is not done, it is better to use 1 instead of bias
# the results with PC arteffacts.
# rel_gain[..., 0] = 1./(pc_high_m / pc_high_ave)
rel_gain[..., 1] = rel_gain[..., 0] * frac_high_med_pix
rel_gain[..., 2] = rel_gain[..., 1] * 4.48
else:
# Intialize with fake calculated parameters of Ones and Zeros
md_additional_offset = np.zeros((128, 512, self.max_cells), np.float32)
frac_high_med = np.ones((self.max_cells,), np.float32)
self.md_additional_offset[module_idx][...] = md_additional_offset.transpose()[...] # noqa
calgs.transpose_constant(self.rel_gain[module_idx], rel_gain, **calgs_opts)
self.frac_high_med[module_idx][...] = frac_high_med
calgs.transpose_constant(self.mask[module_idx], bpixels, **calgs_opts)
return
def initialize_from_yaml(
self, karabo_da: str, const_yaml: Dict[str, Any], module_idx: int
) -> Dict[str, Any]:
"""Initialize calibration constants from a yaml file
:param karabo_da: a karabo data aggregator
:param const_yaml: from the "retrieved-constants" part of a yaml file
from pre-notebook, which consists of metadata of either the constant
file path or the empty constant shape, and the creation-time of the
retrieved constants
:param module_idx: Index of module
:return when: dict of retrieved constants with their creation-time
"""
# string of the device name.
cons_data = dict()
when = dict()
variant = dict()
db_module = const_yaml[karabo_da]["physical-detector-unit"]
for cname, mdata in const_yaml[karabo_da]["constants"].items():
base_key = f"{db_module}/{cname}/0"
when[cname] = mdata["creation-time"]
if when[cname]:
with h5py.File(mdata["file-path"], "r") as cf:
cons_data[cname] = np.copy(cf[f"{base_key}/data"])
# Set variant to 0 if the attribute is missing
# as for old constants.
if "variant" in cf[base_key].attrs.keys():
variant[cname] = cf[base_key].attrs["variant"]
else:
variant[cname] = 0
else:
# Create empty constant using the list elements
cons_data[cname] = getattr(np, mdata["file-path"][0])(mdata["file-path"][1]) # noqa
self.init_constants(cons_data, when, module_idx, variant)
return when
def initialize_from_db(self, karabo_id: str, karabo_da: str,
cal_db_interface: str,
creation_time: datetime,
memory_cells: float, bias_voltage: int,
photon_energy: float, gain_setting: float,
acquisition_rate: float, integration_time: int,
module_idx: int, only_dark: bool = False):
""" Initialize calibration constants from the calibration database
:param karabo_id: karabo identifier
:param karabo_da: karabo data aggregator
:param cal_db_interface: database interaface port
:param creation_time: time for desired calibration constant version
:param memory_cells: number of memory cells used for CCV conditions
:param bias_voltage: bias voltage used for CCV conditions
:param photon_energy: photon energy used for CCV conditions
:param gain_setting: gain setting used for CCV conditions
:param acquisition_rate: acquistion rate used for CCV conditions
:param integration_time: integration time used for CCV conditions
:param module_idx: module index to save retrieved CCV in sharedmem
:param only_dark: load only dark image derived constants. This
implies that a `calfile` is used to load the remaining
constants. Useful to reduce DB traffic and interactions
for non-frequently changing constants, i.e. such which are
not usually updated during a beamtime.
The `cal_db_interface` parameter in the `dbparms` tuple may be in
one of the following notations:
* tcp://host:port to directly identify the host and port to
connect to
* tcp://host:port_low#port_high to specify a port range from
which a random port will be picked. E.g. specifying
tcp://max-exfl016:8015#8025
will randomly pick an address in the range max-exfl016:8015 and
max-exfl016:8025.
The latter notation allows for load-balancing.
This routine loads the following constants as given in
`iCalibrationDB`:
Dark Image Derived
------------------
* Constants.AGIPD.Offset
* Constants.AGIPD.Noise
* Constants.AGIPD.BadPixelsDark
* Constants.AGIPD.ThresholdsDark
Pulse Capacitor Derived
-----------------------
* Constants.AGIPD.SlopesPC
Flat-Field Derived
* Constants.AGIPD.SlopesFF
"""
const_dict = assemble_constant_dict(
self.corr_bools,
self.pc_bools,
memory_cells,
bias_voltage,
gain_setting,
acquisition_rate,
photon_energy,
beam_energy=None,
only_dark=only_dark,
integration_time=integration_time
)
when = {}
cons_data = {}
variant = {}
for cname, cval in const_dict.items():
condition = getattr(
Conditions, cval[2][0]).AGIPD(**cval[2][1])
cdata, md = get_from_db(
karabo_id=karabo_id,
karabo_da=karabo_da,
constant=getattr(Constants.AGIPD, cname)(),
condition=condition,
empty_constant=getattr(np, cval[0])(cval[1]),
cal_db_interface=cal_db_interface,
creation_time=creation_time,
verbosity=0,
)
cons_data[cname] = cdata
variant[cname] = md.calibration_constant_version.variant
when[cname] = None
# Read the CCV begin at if constant was retrieved successfully.
if md and md.comm_db_success:
when[cname] = md.calibration_constant_version.begin_at
self.init_constants(cons_data, when, module_idx, variant)
return when
def allocate_constants(self, modules, constant_shape):
"""
Allocate memory for correction constants
:param modules: Module indices
:param constant_shape: Shape of expected constants (gain, cells, x, y)
"""
for module_idx in modules:
self.offset[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa
if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
self.thresholds[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa
self.noise[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa
self.md_additional_offset[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") # noqa
self.rel_gain[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa
self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], dtype="f4") # noqa
self.mask[module_idx] = sharedmem.empty(constant_shape, dtype="i4")
self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") # noqa
def allocate_images(self, shape, n_cores_files):
"""
Allocate memory for image data and variable shared
between correction functions
:param shape: Shape of expected data (nImg, x, y)
:param n_cores_files: Number of files, handled in parallel
"""
self.shared_dict = []
for i in range(n_cores_files):
self.shared_dict.append({})
self.shared_dict[i]["cellId"] = sharedmem.empty(shape[0], dtype="u2") # noqa
self.shared_dict[i]["pulseId"] = sharedmem.empty(shape[0], dtype="u8") # noqa
self.shared_dict[i]["trainId"] = sharedmem.empty(shape[0], dtype="u8") # noqa
self.shared_dict[i]["moduleIdx"] = sharedmem.empty(1, dtype="i4")
self.shared_dict[i]["nImg"] = sharedmem.empty(1, dtype="i4")
self.shared_dict[i]["mask"] = sharedmem.empty(shape, dtype="u4")
self.shared_dict[i]["data"] = sharedmem.empty(shape, dtype="f4")
self.shared_dict[i]["rawgain"] = sharedmem.empty(shape, dtype="u2")
self.shared_dict[i]["gain"] = sharedmem.empty(shape, dtype="u1")
self.shared_dict[i]["blShift"] = sharedmem.empty(shape[0], dtype="f4") # noqa
# Parameters shared between image-wise correction functions
self.shared_dict[i]["msk"] = sharedmem.empty(shape, dtype="i4")
self.shared_dict[i]["raw_data"] = sharedmem.empty(shape, dtype="f4") # noqa
self.shared_dict[i]["rel_corr"] = sharedmem.empty(shape, dtype="f4") # noqa
self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2") # noqa
# Valid trains
self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b")
self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4") # noqa
self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8") # noqa
if self.corr_bools.get("round_photons"):
self.shared_hist_preround = sharedmem.empty(len(self.hist_bins_preround) - 1, dtype="i8")
self.shared_hist_postround = sharedmem.empty(len(self.hist_bins_postround) - 1, dtype="i8")
def validate_selected_pulses(
max_pulses: List[int], max_cells: int
) -> List[int]:
"""Validate the selected pulses given from the notebook
Validate the selected range of pulses to correct raw data
of at least one image.
1) A pulse indices within one train can't be greater
than the operating memory cells.
2) Validate the order of the given raneg of pulses.
3) Raise value error if generate list of pulses is empty.
:param max_pulses: a list of at most 3 elements defining the
range of pulses to calibrate.
:param max_cells: operating memory cells.
:return adjusted_range: An adjusted range of pulse indices to correct.
"""
# Validate selected pulses range:
# 1) A pulseId can't be greater than the operating memory cells.
pulses_range = [max_cells if p > max_cells else p for p in max_pulses]
if pulses_range != max_pulses:
print(
"WARNING: \"max_pulses\" list has been modified from "
f"{max_pulses} to {pulses_range}. As the number of "
"operating memory cells are less than the selected "
"maximum pulse."
)
if len(pulses_range) == 1:
adjusted_range = (0, pulses_range[0], 1)
elif len(pulses_range) == 2:
adjusted_range = (pulses_range[0], pulses_range[1], 1)
elif len(pulses_range) == 3:
adjusted_range = tuple(pulses_range)
else:
raise ValueError(
"ERROR: Wrong length for the list of pulses indices range. "
"Please check the given range for \"max_pulses\":"
f"{max_pulses}. \"max_pulses\" needs to be a list of "
"3 elements, [start, last, step]")
if adjusted_range[0] > adjusted_range[1]:
raise ValueError(
"ERROR: Pulse range start is greater than range end. "
"Please check the given range for \"max_pulses\":"
f"{max_pulses}. \"max_pulses\" needs to be a list of "
"3 elements, [start, last, step]")
if not np.all([isinstance(p, int) for p in max_pulses]):
raise TypeError(
"ERROR: \"max_pulses\" elements needs to be integers:"
f" {max_pulses}.")
print(
"A range of pulse indices is selected to correct:"
f" {pulses_range}")
return adjusted_range
class CellRange(CellSelection):
"""Selection of detector memory cells given as a range"""
def __init__(self, crange: List[int], max_cells: int):
"""Initialize range selection
:param crange: range parameters of selected cells,
list up to 3 elements
:param max_cells: number of exposed cells
"""
self.max_cells = max_cells
self.crange = validate_selected_pulses(crange, max_cells)
self.flag = np.zeros(self.max_cells, bool)
self.flag_cm = np.zeros(self.ncell_max, bool)
self.flag[slice(*crange)] = True
self.flag_cm[:self.max_cells] = self.flag
self.flag_cm = (self.flag_cm.reshape(-1, self.row_size).any(1)
.repeat(self.row_size)[:self.max_cells])
def msg(self):
return (
f"Use range selection with crange={self.crange}, "
f"max_cells={self.max_cells}\n"
f"Frames per train: {self.flag.sum()}"
)
def get_cells_on_trains(
self, train_sel: List[int], cm: int = 0
) -> np.array:
return np.tile(self._sel_for_cm(self.flag, self.flag_cm, cm),
len(train_sel))
class LitFrameSelection(CellSelection):
"""Selection of detector memery cells indicated as lit frames
by the AgipdLitFrameFinder
"""
def __init__(self,
litfrmdata: 'AgipdLitFrameFinderOffline',
train_ids: List[int],
crange: Optional[List[int]] = None,
energy_threshold: float = -1000):
"""Initialize lit frame selection
:param litfrmdata: AgipdLitFrameFinder output data
:param train_ids: the list of selected trains
:param crange: range parameters of selected cells,
list up to 3 elements
"""
# read AgipdLitFrameFinder data
self.dev = litfrmdata.meta.litFrmDev
self.crange = validate_selected_pulses(crange, self.ncell_max)
self.energy_threshold = energy_threshold
nfrm = litfrmdata.output.nFrame
litfrm_train_ids = litfrmdata.meta.trainId
litfrm = litfrmdata.output.nPulsePerFrame > 0
if energy_threshold != -1000:
litfrm &= litfrmdata.output.energyPerFrame > energy_threshold
# apply range selection
if crange is None:
cellsel = np.ones(self.ncell_max, bool)
else:
cellsel = np.zeros(self.ncell_max, bool)
cellsel[slice(*crange)] = True
# update train selection, removing trains without litframe data
if train_ids is None:
train_ids = np.unique(litfrm_train_ids)
ntrain = len(train_ids)
else:
ntrain_orig = len(train_ids)
train_ids, _, litfrm_train_idx = np.intersect1d(
train_ids, litfrm_train_ids, return_indices=True
)
litfrm = litfrm[litfrm_train_idx]
nfrm = nfrm[litfrm_train_idx]
ntrain = len(train_ids)
if ntrain != ntrain_orig:
print(f"Lit frame data missed for {ntrain_orig - ntrain}. "
"Skip them.")
# initiate instance attributes
nimg = np.sum(nfrm)
self.train_ids = train_ids
self.count = nfrm
self.first = np.zeros(ntrain, int)
self.flag = np.zeros(nimg, bool)
self.flag_cm = np.zeros(nimg, bool)
self.min_sel = nfrm.max()
self.max_sel = 0
# compress frame pattern
i0 = 0
for i in range(ntrain):
n = nfrm[i]
iN = i0 + n
trnflg = litfrm[i] & cellsel
trnflg[n:] = False
self.first[i] = i0
self.flag[i0:iN] = trnflg[:n]
self.flag_cm[i0:iN] = (trnflg.reshape(-1, self.row_size).any(1)
.repeat(self.row_size)[:n])
nlit = trnflg[:n].sum()
self.min_sel = min(self.min_sel, nlit)
self.max_sel = max(self.max_sel, nlit)
i0 = iN
def msg(self):
srng = (f"{self.min_sel}" if self.min_sel == self.max_sel
else f"{self.min_sel}~{self.max_sel}")
return (
f"Use lit frame selection from {self.dev}, crange={self.crange}\n"
f"Frames per train: {srng}"
)
def get_cells_on_trains(
self, train_sel: List[int], cm: int = 0
) -> np.array:
train_idx = np.flatnonzero(np.in1d(self.train_ids, train_sel))
i0 = self.first[train_idx]
iN = i0 + self.count[train_idx]
idx = np.concatenate(
[np.arange(i0[i], iN[i], dtype=int) for i in range(train_idx.size)]
)
return self._sel_for_cm(self.flag[idx], self.flag_cm[idx], cm)