Source code for iCalibrationDB.meta_data

import binascii
import dateutil.parser
from typing import List, Optional

import hashlib
import os
import tempfile

import h5py
import zmq

from .calibration_constant import CalibrationConstant
from .constant_version import ConstantVersion
from .detector_condition import DetectorCondition
from .operating_condition import OperatingCondition
from .util import DictConvertable
from .settings import Settings


[docs]class ConstantMetaData(DictConvertable): mandatory = ["detector_condition", "calibration_constant_version", "calibration_constant", "producing_device"] def __init__(self): super(ConstantMetaData, self).__init__() self._datadict["calibration_hash_schema_version"] = "2.0" self._datadict["producing_device"] = "interactive" self.comm_db_success = False @property def calibration_hash_schema_version(self): """ Version of the injection Hash schema to use. The default version is usually appropriate. """ return self._datadict.get("calibration_hash_schema_version") @calibration_hash_schema_version.setter def calibration_hash_schema_version(self, value): self._datadict["calibration_hash_schema_version"] = str(value) @calibration_hash_schema_version.deleter def calibration_hash_schema_version(self): del self._datadict["calibration_hash_schema_version"] @property def producing_device(self): """ (Karabo) device producing this constant. Defaults to "interactive". """ return self._datadict.get("producing_device") @producing_device.setter def producing_device(self, value): self._datadict["producing_device"] = str(value) @producing_device.deleter def producing_device(self): del self._datadict["producing_device"] @property def calibration_constant(self): """ The calibration constant this meta data refers to. An object of type `CalibrationConstant` is expected. A selection of predefined constants is available in the `known_constants` module. """ return self._datadict.get("calibration_constant") @calibration_constant.setter def calibration_constant(self, value): if not isinstance(value, CalibrationConstant): raise AttributeError("Expecting a CalibrationConstant object") self._datadict["calibration_constant"] = value @calibration_constant.deleter def calibration_constant(self): del self._datadict["calibration_constant"] @property def calibration_constant_version(self): """ The version of the constant this meta data refers to. An object of type `ConstantVersion` is expected. A selection of predefined versions is available in the `known_versions` module. """ return self._datadict.get("calibration_constant_version") @calibration_constant_version.setter def calibration_constant_version(self, value): if not isinstance(value, ConstantVersion): raise AttributeError("Expecting a ConstantVersion object") self._datadict["calibration_constant_version"] = value @calibration_constant_version.deleter def calibration_constant_version(self): del self._datadict["calibration_constant_version"] @property def detector_condition(self): """ The detector operating condition the constant is valid for/ is requested for. An object of type `DetectorCondition` is expected. A selection of predefined operating conditions is available in the """ return self._datadict.get("detector_condition") @detector_condition.setter def detector_condition(self, value): if not isinstance(value, DetectorCondition): raise AttributeError("Expecting a DetectorCondition object") self._datadict["detector_condition"] = value @detector_condition.deleter def detector_condition(self): del self._datadict["detector_condition"]
[docs] def send(self, receiver: str, silent: Optional[bool] = True, timeout: Optional[int] = 30000): """Send a constant and its meta data to the database at `receiver`. Receiver should be ZMQ address of of the form `tcp://localhost:5050`. :raises: A `RuntimeError` if database communication fails or no matching conditions are found. """ cdata = self.calibration_constant.data fname = self.calibration_constant_version.begin_at.isoformat() self.comm_db_success = False fbytes = None tmp_dir = None # ../scratch exists only for maxwell. # i.e. current gitlab CI can't access /gpfs/exfel/data/scratch. tmp_path = "/gpfs/exfel/data/scratch/.caltmp/" if not os.path.exists(tmp_path): tmp_path = None # create a temporary file in tmp_path # and delete it as soon as it is closed. with tempfile.NamedTemporaryFile( dir=tmp_path, suffix=".tmp", prefix="cal_", delete=True, ) as fp: hp = h5py.File(fp.name, 'w', driver='core', backing_store=True) self.calibration_constant_version.file_name = fp.name dname = self.calibration_constant_version.device_name cname = self.calibration_constant.name base_key = '/{}/{}/0'.format(dname, cname) hp["{}/data".format(base_key)] = cdata unique_name = self.detector_condition.name md5 = hashlib.md5(dname.encode()) for condition in self.detector_condition.parameters: safe_name = condition.name.replace(" ", "_") attmap = {"value": "value", "logarithmic": "flg_logarithmic", "lower_deviation": "lower_deviation_value", "upper_deviation": "upper_deviation_value"} for att, mapped in attmap.items(): value = getattr(condition, att) key = "{}/condition/{}/{}".format(base_key, safe_name, mapped) hp[key] = value md5.update(str(value).encode('utf-8')) unique_name += binascii.b2a_base64(md5.digest()).decode("utf-8") self.detector_condition.name = unique_name[:60] hp.close() fp.seek(0) fbytes = fp.read() del cdata data = dict() parm_hash = self.to_dict(silent=silent) del parm_hash["calibration_constant"]["data"] parm_hash['h5path'] = base_key parm_hash['file_indexes.start_idx'] = 0 parm_hash['file_indexes.end_idx'] = 0 data["parameterHashes"] = [parm_hash] data["bStream"] = fbytes data["filename"] = "{}.h5".format(fname) data["subject"] = "send" parm_hash["pdu_physical_name"] = self.calibration_constant_version.device_name parm_hash["detector_identifier"] = self.calibration_constant_version.karabo_id parm_hash["pdu_karabo_da"] = self.calibration_constant_version.karabo_da if self.calibration_constant_version.report_path: parm_hash["report"] = self.calibration_constant_version.report_path resp = _zmq_request(receiver, data, timeout=timeout) if resp["success"]: self.comm_db_success = True if not silent: print("Successfully sent constant to database!") else: msg = "Error sending to database: {}".format(resp) raise RuntimeError(msg)
[docs] def retrieve(self, receiver: str, when: Optional[str] = None, silent: Optional[bool] = True, timeout: Optional[int] = 30000, meta_only: Optional[bool] = False, version_info: Optional[bool] = False, strategy: Optional[str] = "pdu_closest_by_time"): """Retrieve a constant and its meta data from the database at `receiver`. Receiver should be ZMQ address of of the form `tcp://localhost:5050`. The `when` parameter defaults to `None`: in this case the most current constant version will be requested. To request a version for a specific time, set `when` an iso-formatted time string, e.g. using `datetime.now().isoformat()`. If a constant is successfully returned, the requesting `ConstantMetaData` object updates itself to reflect the parameter conditions of the constant. The constant itself can then be found at `self.calibration_constant.data`. If meta_only is set, then no constant data is transferred. Instead constants are read from a .h5 file by the client. The variable meta_only is overwritten by the environmental variable `CAL_DB_METAONLY`, if it is defined. if version_info flag is True, then function returns a list of meta-data for constant-versions. The meta_only flag in this case is not used. :param receiver: ZMQ address, e. g. `tcp://localhost:5050` :param timeout: Timeout for zmq request :param strategy: Default pdu_closest_by_time, options: a) pdu_closest_by_time: use physical detector unit to retrieve CCV closest to measured-at b) detector_closest_by_time: use karabo-id and karabo-da to retrieve CCV closest to measured-time c) pdu_prior_in_time: use PDU to retrieve CCV prior on time only to measured-at :raises: A `RuntimeError` if database communication fails or no matching constant is found. """ self.comm_db_success = False # Overwrite meta_only with environment variable meta_only_env = os.environ.get('CAL_DB_METAONLY', None) if meta_only_env: meta_only = bool(meta_only_env) data = dict() d = self.to_dict(mode="retrieve", silent=silent) data["detector_condition"] = d["detector_condition"] data["calibration_name"] = self.calibration_constant.name # In calibrationDBRemote retrieve function awaits a mandatory # parameter of karabo_id and either of device_name or karabo_da data["pdu_physical_name"] = self.calibration_constant_version.device_name # noqa data["detector_identifier"] = self.calibration_constant_version.karabo_id # noqa data["pdu_karabo_da"] = self.calibration_constant_version.karabo_da if when is None: when = "" data["measured_at"] = when # This snapshot will be used only while retrieving # the correct PDU and appending its UUID data["snapshot_at"] = when if version_info: # TODO: test with new mapping data["subject"] = "retrieve_version_info" data["meta_only"] = False else: data["subject"] = "retrieve" data["meta_only"] = meta_only data["strategy"] = strategy resp = _zmq_request(receiver, data, timeout=timeout) if resp["success"]: self.comm_db_success = True payload = resp["payload"] if version_info: if not silent: print("Successfully retrieved list of versions!") return payload meta = payload["meta"] own_name = self.calibration_constant.name assert meta["calibration_name"] == own_name # Expose retrieve physical name to iCalibrationDB attributes self.calibration_constant_version.device_name = \ meta["pdu_physical_name"] if not silent: print("Successfully retrieved constant from database!") print("Updating self...") if meta.get("meta_only", False): self.calibration_constant.data = None else: self.calibration_constant.data = payload["data"] conditions = [] for name, cdict in meta["detector_conditions"].items(): # Avoid appending detector_uuid. # This condition should only be added within # calibrationDBRemote if name == "Detector_UUID": continue condition = OperatingCondition() condition._datadict = cdict condition.name = name.replace("_", " ") condition.available = True conditions.append(condition) self.detector_condition.parameters = conditions def add_data(cname, is_datetime=False): value = meta.get(cname, None) if value is not None and is_datetime: value = dateutil.parser.parse(value) setattr(self.calibration_constant_version, cname, value) time_info = ['begin_at', 'begin_validity_at', 'end_validity_at'] stored_metadata = ['ccv_id', 'flg_good_quality', 'hdf5path', 'filename', 'h5path'] for dconst in time_info: add_data(dconst, is_datetime=True) for dconst in stored_metadata: add_data(dconst) else: msg = "Error sending to database: {}".format(resp) raise RuntimeError(msg.replace('\\n', '\n').replace('\\', ''))
[docs] def get_from_version_info(self, ccv): """ Return a constant from a dictionary descriptor returned by `retrieve(..., version_info=True) :param ccv: calibration constant version """ fpath = "{}/{}/{}".format(Settings.base_path, ccv['path_to_file'], ccv['file_name']) with h5py.File(fpath, "r") as f: arr = f["{}/data".format(ccv['data_set_name'])][()] return arr return None
[docs] def retrieve_pdus_for_detector(self, receiver: str, karabo_id: str, snapshot_at: Optional[str] = "", timeout: Optional[int] = 30000 ) -> List[dict]: """Retrieve physical detector units corresponding to a detector identifier :param receiver: ZMQ address, e. g. `tcp://localhost:5050` :param karabo_id: The karabo id which is used as a detector identifier in CalCat :param snapshot_at: CalCat database snapshot :param timeout: Timeout for zmq request :return: """ data = dict() data["subject"] = "retrieve_detector_pdus" data["detector_identifier"] = karabo_id data["snapshot_at"] = snapshot_at resp = _zmq_request(receiver, data, timeout=timeout) if resp["success"]: self.comm_db_success = True new_payload = [ { "pdu_physical_name": p["physical_name"], "karabo_da": p["karabo_da"], "virtual_device_name": p["virtual_device_name"] } for p in resp["payload"] ] sorted_payload = sorted(new_payload, key=lambda k: k['karabo_da']) return sorted_payload else: msg = "Error sending to the database: {}".format(resp) raise RuntimeError(msg.replace('\\n', '\n').replace('\\', ''))
[docs] def retrieve_from_version_info(self, ccv): """ Retrieve a constant and meta data from a dictionary descriptor returned by `retrieve(..., version_info=True) :param ccv: calibration constant version """ fpath = "{}/{}/{}".format(Settings.base_path, ccv['path_to_file'], ccv['file_name']) with h5py.File(fpath, "r") as f: arr = f["{}/data".format(ccv['data_set_name'])][()] self.calibration_constant = CalibrationConstant() self.calibration_constant.data = arr conditions = [] condition_path = "{}/condition".format(ccv['data_set_name']) for name, group in f[condition_path].items(): # Avoid appending detector_uuid. # This condition should only be added within # calibrationDBRemote if name == "Detector_UUID": continue cdict = {} for key, val in group.items(): cdict[key] = val[()] condition = OperatingCondition() condition._datadict = cdict condition.name = name.replace("_", " ") condition.available = True conditions.append(condition) self.detector_condition = DetectorCondition() self.detector_condition.parameters = conditions self.calibration_constant_version = ConstantVersion() def add_data(cname, is_datetime=False): value = ccv.get(cname, None) if value is not None and is_datetime: value = dateutil.parser.parse(value) setattr(self.calibration_constant_version, cname, value) add_data('flg_good_quality') for dconst in ['begin_at', 'begin_validity_at', 'end_validity_at']: add_data(dconst, is_datetime=True)
[docs] def update_flg_good_quality(self, receiver, cvv_id, flg_good_quality, silent=True, timeout=30000): """ Update `flg_good_quality` of the calibration constant version. :param receiver: ZMQ address, e. g. `tcp://localhost:5050` :param cvv_id: calibration constant version if :param flg_good_quality: flg_good_quality to be set :param silent: Set to False to print information :param timeout: Timeout for zmq request :return: response message :raises: A `RuntimeError` if database communication fails or `update_keys` are not set. """ self.calibration_constant_version = ConstantVersion() self.calibration_constant_version.ccv_id = cvv_id self.calibration_constant_version.good_quality = flg_good_quality self._update(receiver, update_keys=(['ccv_id', 'flg_good_quality']), silent=silent, timeout=timeout)
def _update(self, receiver, update_keys=None, silent=True, timeout=30000): """ Update calibration constant version in the database at `receiver`. Receiver should be ZMQ address of the form `tcp://localhost:5050`. The generic function is part of the private interface and should be exposed via dedicated methods for specific parameters. :param receiver: ZMQ address, e. g. `tcp://localhost:5050` :param update_keys: List oof keys to be updated :param silent: Set to False to print information :param timeout: Timeout for zmq request :return: response message :raises: A `RuntimeError` if database communication fails or `update_keys` are not set. """ cvv_dict = getattr(self.calibration_constant_version, "__dict__", {}) self.comm_db_success = False data = dict() if '_datadict' in cvv_dict: if update_keys is not None: for key in update_keys: value = cvv_dict['_datadict'].get(key, None) if value is None: msg = 'Value {} is not set'.format(key) raise RuntimeError(msg) else: data[key] = value else: data = cvv_dict['_datadict'].copy() else: msg = 'Calibration_constant_version is not set' raise RuntimeError(msg) data["subject"] = "update" resp = _zmq_request(receiver, data, timeout=timeout) if resp["success"]: self.comm_db_success = True if not silent: print("Successfully updated constant version") else: msg = "Error sending to database: {}".format(resp) raise RuntimeError(msg)
def _zmq_request(endpoint, data, timeout=30000): """Make a request to endpoint with a temporary ZMQ socket This creates a REQ socket, sends data (pickled), receives and unpickles a reply. :param str endpoint: The address of a ZMQ REP socket to connect to. :param data: A Python object to pickle and send as a request. :param int timeout: Timeout in ms. Actual maximum time is double this (send & receive). """ cont = zmq.Context.instance() sock = cont.socket(zmq.REQ) try: sock.setsockopt(zmq.LINGER, 0) sock.SNDTIMEO = timeout sock.RCVTIMEO = timeout sock.connect(endpoint) sock.send_pyobj(data) return sock.recv_pyobj() finally: sock.close()