tools
CalibrationMetadata
¶
Bases: dict
Convenience class: dictionary stored in metadata YAML file
If metadata file already exists, it will be loaded (this may override additional constructor parameters given to this class). Use new=True to skip loading it.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
class CalibrationMetadata(dict):
"""Convenience class: dictionary stored in metadata YAML file
If metadata file already exists, it will be loaded (this may override
additional constructor parameters given to this class). Use new=True to
skip loading it.
"""
def __init__(self, output_dir: Union[Path, str], *args, new=False):
dict.__init__(self, args)
self._yaml_fn = Path(output_dir) / "calibration_metadata.yml"
if self._yaml_fn.exists():
if new:
# TODO: update after resolving this discussion
# https://git.xfel.eu/detectors/pycalibration/-/merge_requests/624 # noqa
self.save()
else:
with self._yaml_fn.open("r") as fd:
data = yaml.safe_load(fd)
if isinstance(data, dict):
self.update(data)
else:
print(f"Warning: existing {self._yaml_fn} is malformed, "
"will be overwritten")
@property
def filename(self):
return self._yaml_fn
def save(self):
with self._yaml_fn.open("w") as fd:
yaml.safe_dump(dict(self), fd)
def save_copy(self, copy_dir: Path):
with (copy_dir / self._yaml_fn.name).open("w") as fd:
yaml.safe_dump(dict(self), fd)
def add_fragment(self, data: dict):
"""Save metadata to a separate 'fragment' file to be merged later
Avoids a risk of corrupting the main file by writing in parallel.
"""
prefix = f"metadata_frag_j{os.environ.get('SLURM_JOB_ID', '')}_"
with NamedTemporaryFile("w", dir=self._yaml_fn.parent,
prefix=prefix, suffix='.yml', delete=False) as fd:
yaml.safe_dump(data, fd)
def gather_fragments(self):
"""Merge in fragments saved by add_fragment(), then delete them"""
frag_files = list(self._yaml_fn.parent.glob('metadata_frag_*.yml'))
to_delete = []
for fn in frag_files:
with fn.open("r") as fd:
data = yaml.safe_load(fd)
if recursive_update(self, data):
print(f"{fn} contained conflicting metadata. "
f"This file will be left for debugging")
else:
to_delete.append(fn)
self.save()
for fn in to_delete:
fn.unlink()
add_fragment(data)
¶
Save metadata to a separate 'fragment' file to be merged later
Avoids a risk of corrupting the main file by writing in parallel.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def add_fragment(self, data: dict):
"""Save metadata to a separate 'fragment' file to be merged later
Avoids a risk of corrupting the main file by writing in parallel.
"""
prefix = f"metadata_frag_j{os.environ.get('SLURM_JOB_ID', '')}_"
with NamedTemporaryFile("w", dir=self._yaml_fn.parent,
prefix=prefix, suffix='.yml', delete=False) as fd:
yaml.safe_dump(data, fd)
gather_fragments()
¶
Merge in fragments saved by add_fragment(), then delete them
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def gather_fragments(self):
"""Merge in fragments saved by add_fragment(), then delete them"""
frag_files = list(self._yaml_fn.parent.glob('metadata_frag_*.yml'))
to_delete = []
for fn in frag_files:
with fn.open("r") as fd:
data = yaml.safe_load(fd)
if recursive_update(self, data):
print(f"{fn} contained conflicting metadata. "
f"This file will be left for debugging")
else:
to_delete.append(fn)
self.save()
for fn in to_delete:
fn.unlink()
calcat_creation_time(in_folder, run, creation_time='')
¶
Return the creation time to use with CALCAT.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def calcat_creation_time(
in_folder: Path,
run: str,
creation_time: Optional[str] = "",
) -> datetime.datetime:
"""Return the creation time to use with CALCAT."""
# Run's creation time:
if creation_time:
creation_time = datetime.datetime.strptime(
creation_time,
'%Y-%m-%d %H:%M:%S').astimezone(tz=datetime.timezone.utc)
else:
creation_time = get_dir_creation_date(in_folder, run)
return creation_time
creation_date_file_metadata(run_folder)
¶
Get run directory creation date from METADATA/CreationDate of the oldest file using EXtra-data.
TODO: update after DAQ store the same date as myMDC.¶
:param dc: EXtra-data DataCollection for the run directory. :return Optional[datetime.datetime]: Run creation date.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def creation_date_file_metadata(
run_folder: Path,
) -> Optional[datetime.datetime]:
"""Get run directory creation date from
METADATA/CreationDate of the oldest file using EXtra-data.
# TODO: update after DAQ store the same date as myMDC.
:param dc: EXtra-data DataCollection for the run directory.
:return Optional[datetime.datetime]: Run creation date.
"""
md_dict = RunDirectory(run_folder).run_metadata()
if md_dict["dataFormatVersion"] != "0.5":
creation_dates = [
H5File(f).run_metadata()["creationDate"]
for f in run_folder.glob("*.h5")
]
return datetime.datetime.strptime(
min(creation_dates), "%Y%m%dT%H%M%S%z")
else:
print("WARNING: input files contains old datasets. "
"No `METADATA/creationDate` to read.")
creation_date_train_timestamp(dc)
¶
Get creation date from the timestamp of the first train.
:param dc: EXtra-data DataCollection for the run directory. :return Optional[datetime.datetime]: Run creation date.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def creation_date_train_timestamp(
dc: RunDirectory
) -> Optional[datetime.datetime]:
"""Get creation date from the timestamp of the first train.
:param dc: EXtra-data DataCollection for the run directory.
:return Optional[datetime.datetime]: Run creation date.
"""
creation_date = np.datetime64(
dc.select_trains(np.s_[0]).train_timestamps()[0], 'us').item()
if creation_date is None:
print("WARNING: input files contains old datasets without"
" trains timestamps.")
return None
return creation_date.replace(tzinfo=datetime.timezone.utc)
get_constant_from_db(karabo_id, karabo_da, constant, condition, empty_constant, cal_db_interface, creation_time=None, print_once=True, timeout=30000, ntries=120, meta_only=True)
¶
Return calibration constants requested from CalDB
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_constant_from_db(karabo_id: str, karabo_da: str,
constant, condition, empty_constant,
cal_db_interface: str, creation_time=None,
print_once=True, timeout=30000, ntries=120,
meta_only=True):
"""Return calibration constants requested from CalDB
"""
data, _ = get_from_db(karabo_id, karabo_da, constant,
condition, empty_constant,
cal_db_interface, creation_time,
int(print_once), timeout, ntries, meta_only)
return data
get_constant_from_db_and_time(karabo_id, karabo_da, constant, condition, empty_constant, cal_db_interface, creation_time=None, print_once=True, timeout=30000, ntries=120)
¶
Return calibration constants requested from CalDB, alongside injection time
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_constant_from_db_and_time(karabo_id: str, karabo_da: str,
constant, condition, empty_constant,
cal_db_interface: str, creation_time=None,
print_once=True, timeout=30000, ntries=120):
"""Return calibration constants requested from CalDB,
alongside injection time
"""
data, m = get_from_db(karabo_id, karabo_da, constant,
condition, empty_constant,
cal_db_interface, creation_time,
int(print_once), timeout, ntries)
if m and m.comm_db_success:
return data, m.calibration_constant_version.begin_at
else:
# return None for injection time if communication with db failed.
# reasons (no constant or condition found,
# or network problem)
return data, None
get_dir_creation_date(directory, run, verbosity=0)
¶
Get the directory creation data based on 3 different methods.
1) Return run start time from myMDC. (get_runtime_metadata_client) 2) If myMDC connection is not set, get the date from the files metadata. (get_runtime_metadata_file) 3) If data files are older than 2020 (dataformatversion == "0.5"), get the data from the oldest file's modified time.
If the data is not available from either source, this function will raise a FileNotFoundError.
:param directory: path to a directory which contains runs (e.g. /gpfs/exfel/data/exp/callab/202031/p900113/raw/). :param run: run number. :param verbosity: Level of verbosity (0 - silent) :return: creation datetime for the directory.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_dir_creation_date(directory: Union[str, Path], run: int,
verbosity: int = 0) -> datetime.datetime:
"""Get the directory creation data based on 3 different methods.
1) Return run start time from myMDC. (get_runtime_metadata_client)
2) If myMDC connection is not set,
get the date from the files metadata. (get_runtime_metadata_file)
3) If data files are older than 2020 (dataformatversion == "0.5"),
get the data from the oldest file's modified time.
If the data is not available from either source,
this function will raise a FileNotFoundError.
:param directory: path to a directory which contains runs
(e.g. /gpfs/exfel/data/exp/callab/202031/p900113/raw/).
:param run: run number.
:param verbosity: Level of verbosity (0 - silent)
:return: creation datetime for the directory.
"""
directory = Path(directory, f'r{run:04d}')
# Validate the availability of the input folder.
# And show a clear error message, if it was not found.
try:
dc = RunDirectory(directory)
except FileNotFoundError as e:
raise FileNotFoundError(
"- Failed to read creation time, wrong input folder",
directory) from e
cdate = creation_date_train_timestamp(dc)
if cdate is not None:
# Exposing the method used for reading the creation_date.
print("Reading creation_date from input files metadata"
" `INDEX/timestamp`")
else: # It's an older dataset.
print("Reading creation_date from last modification data "
"for the oldest input file.")
cdate = datetime.datetime.fromtimestamp(
sorted(
directory.glob("*.h5"), key=path.getmtime,
)[0].stat().st_mtime,
tz=datetime.timezone.utc,
)
return cdate
get_from_db(karabo_id, karabo_da, constant, condition, empty_constant, cal_db_interface, creation_time=None, verbosity=1, timeout=30000, ntries=7, meta_only=True, load_data=True, version_info=False, doraise=False, strategy='pdu_closest_by_time')
¶
Return calibration constants and metadata requested from CalDB
This feature uses the karabo-id and karabo-da to retrieve the desired CCV
:param karabo_id: karabo identifier (detector identifier). :param karabo_da: karabo data aggregator. :param constant: Calibration constant known for given detector. :param condition: Calibration condition. :param empty_constant: Constant to be returned in case of failure. :param cal_db_interface: Interface string, e.g. "tcp://max-exfl-cal001:8015" :param creation_time: Latest time for constant to be created. :param verbosity: Level of verbosity (0 - silent) :param timeout: Timeout for zmq request ntries is set to 7 so that if the timeout started at 30s last timeout will be ~ 1h. :param ntries: number of tries to contact the database. :param meta_only: Retrieve only metadata via ZMQ. Constants are taken directly from the h5 file on maxwell. :param version_info: Flag to show the info for the retrieved Constant. :param doraise: if True raise errors during communication with DB. :param strategy: Retrieving strategy for calibrationDBRemote. :return: Calibration constant, metadata.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_from_db(karabo_id: str, karabo_da: str,
constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
empty_constant: np.array,
cal_db_interface: str,
creation_time: Optional[datetime.datetime] = None,
verbosity: int = 1,
timeout: int = 30000,
ntries: int = 7,
meta_only: bool = True,
load_data: bool = True,
version_info: bool = False,
doraise: bool = False,
strategy: str = "pdu_closest_by_time"
) -> Tuple[np.array, 'ConstantMetaData']:
"""Return calibration constants and metadata requested from CalDB
This feature uses the karabo-id and karabo-da to retrieve the
desired CCV
:param karabo_id: karabo identifier (detector identifier).
:param karabo_da: karabo data aggregator.
:param constant: Calibration constant known for given detector.
:param condition: Calibration condition.
:param empty_constant: Constant to be returned in case of failure.
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl-cal001:8015"
:param creation_time: Latest time for constant to be created.
:param verbosity: Level of verbosity (0 - silent)
:param timeout: Timeout for zmq request
ntries is set to 7 so that if the timeout started at 30s last timeout
will be ~ 1h.
:param ntries: number of tries to contact the database.
:param meta_only: Retrieve only metadata via ZMQ. Constants are
taken directly from the h5 file on maxwell.
:param version_info: Flag to show the info for the retrieved Constant.
:param doraise: if True raise errors during communication with DB.
:param strategy: Retrieving strategy for calibrationDBRemote.
:return: Calibration constant, metadata.
"""
if version_info:
meta_only = False
metadata = _init_metadata(constant, condition, creation_time)
if karabo_id and karabo_da:
when = None
if creation_time is not None and hasattr(creation_time, 'isoformat'):
when = creation_time.isoformat()
metadata.calibration_constant_version.karabo_id = karabo_id
metadata.calibration_constant_version.karabo_da = karabo_da
# make sure to remove device name from metadata dict before
# retrieving to keep using karabo_id and karabo_da only
# during retrieval. As device_name could have been set after
# retrieval from iCalibrationDB
metadata.calibration_constant_version.device_name = None
while ntries > 0:
this_interface = get_random_db_interface(cal_db_interface)
try:
r = metadata.retrieve(this_interface, timeout=timeout,
when=when, meta_only=meta_only,
version_info=version_info,
strategy=strategy)
if version_info:
return r
break
except zmq.error.Again:
ntries -= 1
timeout *= 2
sleep(np.random.randint(30))
# TODO: reevaluate the need for doraise
# and remove if not needed.
if ntries == 0 and doraise:
raise
except Exception as e:
if verbosity > 0:
print(e)
if 'missing_token' in str(e):
ntries -= 1
else:
ntries = 0
if ntries == 0 and doraise:
raise RuntimeError(f'{e}')
if ntries > 0:
mdata_const = metadata.calibration_constant_version
if load_data and meta_only:
hdf5path = getattr(mdata_const, 'hdf5path', None)
filename = getattr(mdata_const, 'filename', None)
h5path = getattr(mdata_const, 'h5path', None)
if not (hdf5path and filename and h5path):
raise ValueError(
"Wrong metadata received to access the constant data."
f" Retrieved constant filepath is {hdf5path}/{filename}" # noqa
f" and data_set_name is {h5path}."
)
with h5py.File(Path(hdf5path, filename), "r") as f:
metadata.calibration_constant.data = f[f"{h5path}/data"][()] # noqa
# The variant attribute is missing for old constants.
if "variant" in f[h5path].attrs.keys():
metadata.calibration_constant_version.variant = f[h5path].attrs["variant"] # noqa
if verbosity > 0:
if constant.name not in already_printed or verbosity > 1:
already_printed[constant.name] = True
# TODO: Reset mdata_const.begin_at
# if comm_db_success is False.
begin_at = mdata_const.begin_at
print(f"Retrieved {constant.name} "
f"with creation time: {begin_at}")
return constant.data, metadata
else:
return empty_constant, metadata
else:
return empty_constant, None
get_notebook_name()
¶
Return the full path of the jupyter notebook.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_notebook_name():
"""
Return the full path of the jupyter notebook.
"""
try:
kernel_id = re.search('kernel-(.*).json',
ipykernel.connect.get_connection_file()).group(1)
servers = list_running_servers()
for ss in servers:
response = requests.get(urljoin(ss['url'], 'api/sessions'),
params={'token': ss.get('token', '')})
for nn in json.loads(response.text):
if nn['kernel']['id'] == kernel_id:
return nn['notebook']['path']
except:
return environ.get("CAL_NOTEBOOK_NAME", "Unknown Notebook")
get_pdu_from_db(karabo_id, karabo_da, constant, condition, cal_db_interface, snapshot_at=None, timeout=30000)
¶
Return all physical detector units for a karabo_id and list of karabo_da
:param karabo_id: Karabo identifier. :param karabo_da: Karabo data aggregator. :param constant: Calibration constant object to intialize CalibrationConstantMetadata class. :param condition: Detector condition object to intialize CalibrationConstantMetadata class. :param cal_db_interface: Interface string, e.g. "tcp://max-exfl-cal001:8015". :param snapshot_at: Database snapshot. :param timeout: Calibration Database timeout. :return: List of physical detector units (db_modules)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_pdu_from_db(karabo_id: str, karabo_da: Union[str, list],
constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
cal_db_interface: str,
snapshot_at: Optional[datetime.datetime] = None,
timeout: int = 30000) -> List[str]:
"""Return all physical detector units for a
karabo_id and list of karabo_da
:param karabo_id: Karabo identifier.
:param karabo_da: Karabo data aggregator.
:param constant: Calibration constant object to
intialize CalibrationConstantMetadata class.
:param condition: Detector condition object to
intialize CalibrationConstantMetadata class.
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl-cal001:8015".
:param snapshot_at: Database snapshot.
:param timeout: Calibration Database timeout.
:return: List of physical detector units (db_modules)
"""
if not isinstance(karabo_da, (str, list)):
raise TypeError("karabo_da should either be a list of multiple "
"karabo_da or a string of one karabo_da or 'all'")
metadata = _init_metadata(constant, condition, None)
# CalibrationDBRemote expects a string.
if snapshot_at is not None and hasattr(snapshot_at, 'isoformat'):
snapshot_at = snapshot_at.isoformat()
# A random interface is chosen if there is # for address range.
db_interface = get_random_db_interface(cal_db_interface)
pdu_dicts = metadata.retrieve_pdus_for_detector(receiver=db_interface,
karabo_id=karabo_id,
snapshot_at=snapshot_at,
timeout=timeout)
# Get a list of pdus based on requested karabo_das
if karabo_da == 'all':
db_modules = [d["pdu_physical_name"] for d in pdu_dicts]
else:
k_indices = []
if isinstance(karabo_da, str):
karabo_da = [karabo_da]
# Get indices of dict with the right karabo_da,
# else use None.
for k in karabo_da:
pdu_found = False
for i, d in enumerate(pdu_dicts):
if d["karabo_da"] == k:
k_indices.append(i)
pdu_found = True
break
if not pdu_found:
k_indices.append(None)
db_modules = []
for i in k_indices:
if i is None:
db_modules.append(None)
else:
db_modules.append(pdu_dicts[i]["pdu_physical_name"])
return db_modules
get_random_db_interface(cal_db_interface)
¶
Return interface to calibration DB with random (with given range) port.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_random_db_interface(cal_db_interface):
"""Return interface to calibration DB with
random (with given range) port.
"""
# Initialize the random generator with a random seed value,
# in case the function was executed within a multiprocessing pool.
np.random.seed()
if "#" in cal_db_interface:
prot, serv, ran = cal_db_interface.split(":")
r1, r2 = ran.split("#")
return ":".join(
[prot, serv, str(np.random.randint(int(r1), int(r2)))])
return cal_db_interface
get_report(out_folder, default_path='')
¶
Get the report path from calibration_metadata.yml stored in the out_folder.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def get_report(out_folder: str, default_path: str = ""):
"""Get the report path from calibration_metadata.yml
stored in the out_folder.
"""
metadata = CalibrationMetadata(out_folder)
report_path = metadata.get("report-path", default_path)
if not report_path:
print("WARNING: No report path will be injected "
"with the constants.\n")
return report_path
load_specified_constants(retrieved_constants, empty_constants=None)
¶
Load constant data from metadata in the retrieved_constants dictionary.
:param retrieved_constants: A dict. with the constant filepaths and dataset-name to read the constant data arrays. { 'Constant Name': { 'file-path': '/gpfs/.../*.h5', 'dataset-name': '/module_name/...', 'creation-time': str(datetime),}, } :param empty_constants: A dict of constant names keys and the empty constant array to use in case of not non-retrieved constants. :return constant_data: A dict of constant names keys and their data.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def load_specified_constants(
retrieved_constants: dict,
empty_constants: Optional[dict] = None,
) -> Tuple[dict, dict]:
"""Load constant data from metadata in the
retrieved_constants dictionary.
:param retrieved_constants: A dict. with the constant filepaths and
dataset-name to read the constant data arrays.
{
'Constant Name': {
'file-path': '/gpfs/.../*.h5',
'dataset-name': '/module_name/...',
'creation-time': str(datetime),},
}
:param empty_constants: A dict of constant names keys and
the empty constant array to use in case of not non-retrieved constants.
:return constant_data: A dict of constant names keys and their data.
"""
const_data = dict()
when = dict()
for cname, mdata in retrieved_constants.items():
const_data[cname] = dict()
when[cname] = mdata["creation-time"]
if when[cname]:
with h5py.File(mdata["file-path"], "r") as cf:
const_data[cname] = np.copy(
cf[f"{mdata['dataset-name']}/data"])
else:
const_data[cname] = (
empty_constants[cname] if empty_constants else None)
return const_data, when
map_gain_stages(in_folder, runs, path_template, karabo_da, sequences=None)
¶
Prepare queues of files to process.
Queues are stored in dictionary with module name Q{}M{}
and gain name as a keys
:param in_folder: Input folder with raw data
:param runs: Dictionary of runs with key naming the gain stages
:param path_template: Template for file name
e.g. RAW-R{:04d}-{}-S{:05d}.h5
:param karabo_da: List of data aggregators e.g. [AGIPD00, AGIPD01]
:param sequences: List of sequences to be considered
:return: Dictionary of queues of files,
total number of sequences
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def map_gain_stages(in_folder, runs, path_template, karabo_da, sequences=None):
"""
Prepare queues of files to process.
Queues are stored in dictionary with module name Q{}M{}
and gain name as a keys
:param in_folder: Input folder with raw data
:param runs: Dictionary of runs with key naming the gain stages
:param path_template: Template for file name
e.g. `RAW-R{:04d}-{}-S{:05d}.h5`
:param karabo_da: List of data aggregators e.g. [AGIPD00, AGIPD01]
:param sequences: List of sequences to be considered
:return: Dictionary of queues of files,
total number of sequences
"""
total_sequences = 0
total_file_size = 0
gain_mapped_files = OrderedDict()
for gain, run in runs.items():
mapped_files, _, seq, _, fs = map_modules_from_folder(in_folder, run,
path_template,
karabo_da,
sequences)
total_sequences += seq
total_file_size += fs
gain_mapped_files[gain] = mapped_files
return gain_mapped_files, total_sequences, total_file_size / 1e9
map_modules_from_folder(in_folder, run, path_template, karabo_da, sequences=None)
¶
Prepare queues of files to process. Queues are stored in dictionary with module name Q{}M{} as a key
:param in_folder: Input folder with raw data
:param run: Run number
:param path_template: Template for file name
e.g. RAW-R{:04d}-{}-S{:05d}.h5
:param karabo_da: List of data aggregators e.g. [AGIPD00, AGIPD01]
:param sequences: List of sequences to be considered
:return: Dictionary of queues of files, dictionary of module indexes,
total number of sequences, dictionary of number of sequences per module
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def map_modules_from_folder(in_folder, run, path_template, karabo_da,
sequences=None):
"""
Prepare queues of files to process.
Queues are stored in dictionary with module name Q{}M{} as a key
:param in_folder: Input folder with raw data
:param run: Run number
:param path_template: Template for file name
e.g. `RAW-R{:04d}-{}-S{:05d}.h5`
:param karabo_da: List of data aggregators e.g. [AGIPD00, AGIPD01]
:param sequences: List of sequences to be considered
:return: Dictionary of queues of files, dictionary of module indexes,
total number of sequences, dictionary of number of sequences per module
"""
module_files = OrderedDict()
mod_ids = OrderedDict()
total_sequences = 0
total_file_size = 0
sequences_qm = {}
for inset in karabo_da:
module_idx = int(inset[-2:])
name = module_index_to_qm(module_idx)
module_files[name] = Queue()
sequences_qm[name] = 0
mod_ids[name] = module_idx
if sequences is None:
fname = path_template.format(run, inset, 0).replace("S00000", "S*")
abs_fname = "{}/r{:04d}/{}".format(in_folder, run, fname)
for filename in glob(abs_fname):
module_files[name].put(filename)
total_sequences += 1
sequences_qm[name] += 1
total_file_size += path.getsize(filename)
else:
for sequence in sequences:
fname = path_template.format(run, inset, sequence)
abs_fname = "{}/r{:04d}/{}".format(in_folder, run, fname)
if not isfile(abs_fname):
continue
module_files[name].put(abs_fname)
total_sequences += 1
sequences_qm[name] += 1
total_file_size += path.getsize(abs_fname)
return (module_files, mod_ids, total_sequences,
sequences_qm, total_file_size)
map_seq_files(run_folder, karabo_das, sequences=None)
¶
Glob run_folder and match the files based on the selected detectors and sequence numbers.
Returns:
Name | Type | Description |
---|---|---|
Dict |
dict
|
with karabo_das keys and the corresponding sequence files. |
Int |
int
|
for number of all sequence files for all karabo_das to process. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def map_seq_files(
run_folder: Path,
karabo_das: List[str],
sequences: Optional[List[int]] = None,
) -> Tuple[dict, int]:
"""Glob run_folder and match the files based on the selected
detectors and sequence numbers.
Returns:
Dict: with karabo_das keys and the corresponding sequence files.
Int: for number of all sequence files for all karabo_das to process.
"""
if sequences == [-1]:
sequences = None
if sequences is not None:
sequences = set(int(seq) for seq in sequences)
seq_fn_pat = re.compile(r".*-(?P<da>.*?)-S(?P<seq>.*?)\.h5")
mapped_files = {kda: [] for kda in karabo_das}
total_files = 0
for fn in run_folder.glob("*.h5"):
if (match := seq_fn_pat.match(fn.name)) is not None:
da = match.group("da")
if da in mapped_files and (
sequences is None or int(match.group("seq")) in sequences
):
mapped_files[da].append(fn)
total_files += 1
# Return dict with sorted list of sequence files.
for k in mapped_files:
mapped_files[k].sort()
return mapped_files, total_files
module_index_to_qm(index, total_modules=16)
¶
Maps module index (0-indexed) to quadrant + module string (1-indexed)
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def module_index_to_qm(index: int, total_modules: int = 16):
"""Maps module index (0-indexed) to quadrant + module string (1-indexed)"""
assert index < total_modules, f'{index} is greater than {total_modules}'
modules_per_quad = total_modules // 4
quad, mod = divmod(index, modules_per_quad)
return f"Q{quad+1}M{mod+1}"
raw_data_location_string(proposal, runs)
¶
Create RAW data location string to inject as a metadata along with the calibration parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
proposal |
string
|
The proposal number including the preceding |
required |
runs |
list
|
A list of the run numbers |
required |
Returns:
Name | Type | Description |
---|---|---|
str | The string for raw data_location.
e.g. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def raw_data_location_string(proposal: str, runs: List[int]):
"""Create RAW data location string to inject as a
metadata along with the calibration parameters.
Args:
proposal (string): The proposal number including the preceding `p`.
e.g. p900203
runs (list): A list of the run numbers
Returns:
str: The string for raw data_location.
e.g. `proposal p900203 runs: 9008, 9009, 90010`
"""
if not isinstance(proposal, str) or proposal[0] != "p":
raise ValueError(
"Invalid proposal format. The proposal should be a string with"
" a preceding 'p'. Example: 'p900203'")
return f"proposal:{proposal} runs:{' '.join(map(str, runs))}"
recursive_update(target, source)
¶
Recursively merge source into target, checking for conflicts
Conflicting entries will not be copied to target. Returns True if any conflicts were found.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def recursive_update(target: dict, source: dict):
"""Recursively merge source into target, checking for conflicts
Conflicting entries will not be copied to target. Returns True if any
conflicts were found.
"""
conflict = False
for k, v2 in source.items():
v1 = target.get(k, None)
if isinstance(v1, dict) and isinstance(v2, dict):
conflict = recursive_update(v1, v2) or conflict
elif (v1 is not None) and (v1 != v2):
conflict = True
else:
target[k] = v2
return conflict
reorder_axes(a, from_order, to_order)
¶
Rearrange axes of array a from from_order to to_order
This does the same as np.transpose(), but making the before & after axes more explicit. from_order is a sequence of strings labelling the axes of a, and to_order is a similar sequence for the axes of the result.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def reorder_axes(a, from_order, to_order):
"""Rearrange axes of array a from from_order to to_order
This does the same as np.transpose(), but making the before & after axes
more explicit. from_order is a sequence of strings labelling the axes of a,
and to_order is a similar sequence for the axes of the result.
"""
assert len(from_order) == a.ndim
assert sorted(from_order) == sorted(to_order)
from_order = list(from_order)
order = tuple([from_order.index(lbl) for lbl in to_order])
return a.transpose(order)
save_const_to_h5(db_module, karabo_id, constant, condition, data, file_loc, report, creation_time, out_folder)
¶
Save constant in h5 file with its metadata (e.g. db_module, condition, creation_time)
:param db_module: database module (PDU/Physical Detector Unit). :param karabo_id: karabo identifier. :param constant: Calibration constant known for given detector. :param condition: Calibration condition. :param data: Constant data to save. :param file_loc: Location of raw data "proposal:{} runs:{} {} {}". :param creation_time: creation_time for the saved constant. :param out_folder: path to output folder. :return: metadata of the saved constant.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def save_const_to_h5(db_module: str, karabo_id: str,
constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
data: np.array, file_loc: str,
report: str,
creation_time: datetime.datetime,
out_folder: str) -> 'ConstantMetaData':
""" Save constant in h5 file with its metadata
(e.g. db_module, condition, creation_time)
:param db_module: database module (PDU/Physical Detector Unit).
:param karabo_id: karabo identifier.
:param constant: Calibration constant known for given detector.
:param condition: Calibration condition.
:param data: Constant data to save.
:param file_loc: Location of raw data "proposal:{} runs:{} {} {}".
:param creation_time: creation_time for the saved constant.
:param out_folder: path to output folder.
:return: metadata of the saved constant.
"""
metadata = _init_metadata(constant, condition, creation_time)
metadata.calibration_constant_version.raw_data_location = file_loc
dpar = {
parm.name: {
'lower_deviation_value': parm.lower_deviation,
'upper_deviation_value': parm.upper_deviation,
'value': parm.value,
'flg_logarithmic': parm.logarithmic,
}
for parm in metadata.detector_condition.parameters
}
creation_time = metadata.calibration_constant_version.begin_at
raw_data = metadata.calibration_constant_version.raw_data_location
constant_name = metadata.calibration_constant.__class__.__name__
data_to_store = {
'condition': dpar,
'db_module': db_module,
'karabo_id': karabo_id,
'constant': constant_name,
'data': data,
'creation_time': creation_time,
'file_loc': raw_data,
'report': report,
}
ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5"
if isfile(ofile):
print(f'File {ofile} already exists and will be overwritten')
save_dict_to_hdf5(data_to_store, ofile)
return metadata
save_constant_metadata(retrieved_constants, mdata, constant_name)
¶
Save constant metadata to the input meta data dictionary. The constant's metadata stored are file path, dataset name, creation time, and physical detector unit name.
:param retrieved_constants: A dictionary to store the metadata for the retrieved constant. :param mdata: A ConstantMetaData object after retrieving trying to retrieve a constant with get_from_db(). :param constant_name: String for constant name to be used as a key. :param constants_key: The key name when all constants metadata will be stored.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def save_constant_metadata(
retrieved_constants: dict,
mdata: ConstantMetaData,
constant_name: str,
):
"""Save constant metadata to the input meta data dictionary.
The constant's metadata stored are file path, dataset name,
creation time, and physical detector unit name.
:param retrieved_constants: A dictionary to store the metadata for
the retrieved constant.
:param mdata: A ConstantMetaData object after retrieving trying
to retrieve a constant with get_from_db().
:param constant_name: String for constant name to be used as a key.
:param constants_key: The key name when all constants metadata
will be stored.
"""
mdata_const = mdata.calibration_constant_version
const_mdata = retrieved_constants[constant_name] = dict()
# check if constant was successfully retrieved.
if mdata.comm_db_success:
const_mdata["file-path"] = (
f"{mdata_const.hdf5path}" f"{mdata_const.filename}"
)
const_mdata["dataset-name"] = mdata_const.h5path
const_mdata["creation-time"] = mdata_const.begin_at
else:
const_mdata["file-path"] = None
const_mdata["creation-time"] = None
send_to_db(db_module, karabo_id, constant, condition, file_loc, report_path, cal_db_interface, creation_time=None, timeout=30000, ntries=7, doraise=False, variant=0)
¶
Send new calibration constants and metadata requested to CalDB
:param db_module: database module (PDU/Physical Detector Unit) :param karabo_id: karabo identifier :param constant: Calibration constant known for given detector :param condition: Calibration condition :param file_loc: Location of raw data. :param report_path: xfel-calbrate report path to inject along with the calibration constant versions to the database. :param cal_db_interface: Interface string, e.g. "tcp://max-exfl-cal001:8015" :param creation_time: Latest time for constant to be created :param timeout: Timeout for zmq request :param ntries: number of tries to contact the database, ntries is set to 7 so that if the timeout started at 30s last timeout will be ~ 1h. :param doraise: if True raise errors during communication with DB :param variant: A calibration constant version variant attribute for the constant file.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def send_to_db(db_module: str, karabo_id: str, constant, condition,
file_loc: str, report_path: str, cal_db_interface: str,
creation_time: Optional[datetime.datetime] = None,
timeout: int = 30000,
ntries: int = 7,
doraise: bool = False,
variant: int = 0):
"""Send new calibration constants and metadata requested to CalDB
:param db_module: database module (PDU/Physical Detector Unit)
:param karabo_id: karabo identifier
:param constant: Calibration constant known for given detector
:param condition: Calibration condition
:param file_loc: Location of raw data.
:param report_path: xfel-calbrate report path to inject along with
the calibration constant versions to the database.
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl-cal001:8015"
:param creation_time: Latest time for constant to be created
:param timeout: Timeout for zmq request
:param ntries: number of tries to contact the database,
ntries is set to 7 so that if the timeout started
at 30s last timeout will be ~ 1h.
:param doraise: if True raise errors during communication with DB
:param variant: A calibration constant version variant attribute
for the constant file.
"""
success = False
snapshot_at = None
metadata = _init_metadata(constant, condition, creation_time)
if db_module:
# Add injected constant's file source info as a file location
metadata.calibration_constant_version.raw_data_location = file_loc
if report_path:
# calibration_client expects a dict of injected report path
# of at least 2 characters for each key.
if not isinstance(report_path, str) or len(report_path) < 2:
raise TypeError(
"\"report_path\" needs to be a string "
"of at least 2 characters."
)
report = {"name": path.basename(report_path),
"file_path": report_path}
metadata.calibration_constant_version.report_path = report
metadata.calibration_constant_version.karabo_id = karabo_id
metadata.calibration_constant_version.device_name = db_module
metadata.calibration_constant_version.karabo_da = None
metadata.calibration_constant_version.raw_data_location = file_loc
metadata.calibration_constant_version.variant = variant
if constant.data is None:
raise ValueError(
"There is no data available to "
"inject to the database."
)
while ntries > 0:
this_interface = get_random_db_interface(cal_db_interface)
if (
creation_time is not None and
hasattr(creation_time, 'isoformat')
):
# This snapshot will be used only while retrieving
# the correct PDU and appending its UUID.
snapshot_at = creation_time.isoformat()
try:
metadata.send(
this_interface,
snapshot_at=snapshot_at,
timeout=timeout,
)
success = True # TODO: use comm_db_success
break
except zmq.error.Again:
ntries -= 1
timeout *= 2
sleep(np.random.randint(30))
if ntries == 0 and doraise:
raise
except Exception as e:
# TODO: refactor to use custom exception class
# Refactor error message for re-injecting an
# identical CCV to the database.
if all(s in str(e) for s in [
"Error creating calibration_constant_version",
"has already been taken",
]):
print(
f"WARNING: {constant.name} for {db_module}"
" has already been injected with the same "
"parameter conditions."
)
else:
print(f"{e}\n")
if 'missing_token' in str(e):
ntries -= 1
else:
ntries = 0
if ntries == 0 and doraise:
raise RuntimeError(f'{e}')
if success:
print(
f"{constant.name} for {db_module} "
"is injected with creation-time: "
f"{metadata.calibration_constant_version.begin_at}."
)
return metadata
write_compressed_frames(arr, ofile, dataset_path, comp_threads=1)
¶
Compress gain/mask frames in multiple threads, and save their data
This is significantly faster than letting HDF5 do the compression in a single thread.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def write_compressed_frames(
arr: np.ndarray,
ofile: h5py.File,
dataset_path: str,
comp_threads: int = 1):
"""Compress gain/mask frames in multiple threads, and save their data
This is significantly faster than letting HDF5 do the compression
in a single thread.
"""
def _compress_frame(idx):
# Equivalent to the HDF5 'shuffle' filter: transpose bytes for better
# compression.
shuffled = np.ascontiguousarray(
arr[idx].view(np.uint8).reshape((-1, arr.itemsize)).transpose()
)
return idx, zlib.compress(shuffled, level=1)
# gain/mask compressed with gzip level 1, but not
# checksummed as we would have to implement this.
dataset = ofile.create_dataset(
dataset_path,
shape=arr.shape,
chunks=((1,) + arr.shape[1:]),
compression="gzip",
compression_opts=1,
shuffle=True,
dtype=arr.dtype,
)
with ThreadPool(comp_threads) as pool:
for i, compressed in pool.imap(_compress_frame, range(len(arr))):
# Each frame is 1 complete chunk
chunk_start = (i,) + (0,) * (dataset.ndim - 1)
dataset.id.write_direct_chunk(chunk_start, compressed)
return dataset
write_constants_fragment(out_folder, det_metadata, caldb_root)
¶
Record calibration constants metadata to a fragment file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_folder |
Path
|
The output folder to store the fragment file. |
required |
det_metadata |
dict
|
A dictionary with the desired detector metadata. {karabo_da: {constant_name: metadata}} |
required |
caldb_root |
Path
|
The calibration database root path for constant files. |
required |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/tools.py
def write_constants_fragment(
out_folder: Path,
det_metadata: dict,
caldb_root: Path,
):
"""Record calibration constants metadata to a fragment file.
Args:
out_folder (Path): The output folder to store the fragment file.
det_metadata (dict): A dictionary with the desired detector metadata.
{karabo_da: {constant_name: metadata}}
caldb_root (Path): The calibration database root path for constant files.
"""
metadata = {"retrieved-constants": {}}
for karabo_da, const_metadata in det_metadata.items():
mod_metadata = {}
mod_metadata["constants"] = {
cname: {
"path": str(caldb_root / ccv_metadata["path"]),
"dataset": ccv_metadata["dataset"],
"creation-time": ccv_metadata["begin_validity_at"],
"ccv_id": ccv_metadata["ccv_id"],
} for cname, ccv_metadata in const_metadata.items()
}
mod_metadata["physical-name"] = list(
const_metadata.values())[0]["physical_name"]
metadata["retrieved-constants"][karabo_da] = mod_metadata
CalibrationMetadata(out_folder).add_fragment(metadata)