Source code for iCalibrationDB.meta_data
import binascii
import dateutil.parser
from typing import List, Optional
import hashlib
import os
import tempfile
import h5py
import zmq
from .calibration_constant import CalibrationConstant
from .constant_version import ConstantVersion
from .detector_condition import DetectorCondition
from .operating_condition import OperatingCondition
from .util import DictConvertable
from .settings import Settings
[docs]class ConstantMetaData(DictConvertable):
mandatory = ["detector_condition", "calibration_constant_version",
"calibration_constant", "producing_device"]
def __init__(self):
super(ConstantMetaData, self).__init__()
self._datadict["calibration_hash_schema_version"] = "2.0"
self._datadict["producing_device"] = "interactive"
self.comm_db_success = False
@property
def calibration_hash_schema_version(self):
"""
Version of the injection Hash schema to use.
The default version is usually appropriate.
"""
return self._datadict.get("calibration_hash_schema_version")
@calibration_hash_schema_version.setter
def calibration_hash_schema_version(self, value):
self._datadict["calibration_hash_schema_version"] = str(value)
@calibration_hash_schema_version.deleter
def calibration_hash_schema_version(self):
del self._datadict["calibration_hash_schema_version"]
@property
def producing_device(self):
"""
(Karabo) device producing this constant. Defaults to "interactive".
"""
return self._datadict.get("producing_device")
@producing_device.setter
def producing_device(self, value):
self._datadict["producing_device"] = str(value)
@producing_device.deleter
def producing_device(self):
del self._datadict["producing_device"]
@property
def calibration_constant(self):
"""
The calibration constant this meta data refers to.
An object of type `CalibrationConstant` is expected. A selection of
predefined constants is available in the `known_constants` module.
"""
return self._datadict.get("calibration_constant")
@calibration_constant.setter
def calibration_constant(self, value):
if not isinstance(value, CalibrationConstant):
raise AttributeError("Expecting a CalibrationConstant object")
self._datadict["calibration_constant"] = value
@calibration_constant.deleter
def calibration_constant(self):
del self._datadict["calibration_constant"]
@property
def calibration_constant_version(self):
"""
The version of the constant this meta data refers to.
An object of type `ConstantVersion` is expected. A selection of
predefined versions is available in the `known_versions` module.
"""
return self._datadict.get("calibration_constant_version")
@calibration_constant_version.setter
def calibration_constant_version(self, value):
if not isinstance(value, ConstantVersion):
raise AttributeError("Expecting a ConstantVersion object")
self._datadict["calibration_constant_version"] = value
@calibration_constant_version.deleter
def calibration_constant_version(self):
del self._datadict["calibration_constant_version"]
@property
def detector_condition(self):
"""
The detector operating condition the constant is valid for/ is
requested for.
An object of type `DetectorCondition` is expected. A selection of
predefined operating conditions is available in the
"""
return self._datadict.get("detector_condition")
@detector_condition.setter
def detector_condition(self, value):
if not isinstance(value, DetectorCondition):
raise AttributeError("Expecting a DetectorCondition object")
self._datadict["detector_condition"] = value
@detector_condition.deleter
def detector_condition(self):
del self._datadict["detector_condition"]
[docs] def send(self, receiver: str, silent: Optional[bool] = True,
timeout: Optional[int] = 30000):
"""Send a constant and its meta data to the database at `receiver`.
Receiver should be ZMQ address of of the form `tcp://localhost:5050`.
:raises: A `RuntimeError` if database communication fails or no
matching conditions are found.
"""
cdata = self.calibration_constant.data
fname = self.calibration_constant_version.begin_at.isoformat()
self.comm_db_success = False
fbytes = None
tmp_dir = None
# ../scratch exists only for maxwell.
# i.e. current gitlab CI can't access /gpfs/exfel/data/scratch.
tmp_path = "/gpfs/exfel/data/scratch/.caltmp/"
if not os.path.exists(tmp_path):
tmp_path = None
# create a temporary file in tmp_path
# and delete it as soon as it is closed.
with tempfile.NamedTemporaryFile(
dir=tmp_path,
suffix=".tmp",
prefix="cal_",
delete=True,
) as fp:
hp = h5py.File(fp.name, 'w', driver='core', backing_store=True)
self.calibration_constant_version.file_name = fp.name
dname = self.calibration_constant_version.device_name
cname = self.calibration_constant.name
base_key = '/{}/{}/0'.format(dname, cname)
hp["{}/data".format(base_key)] = cdata
unique_name = self.detector_condition.name
md5 = hashlib.md5(dname.encode())
for condition in self.detector_condition.parameters:
safe_name = condition.name.replace(" ", "_")
attmap = {"value": "value",
"logarithmic": "flg_logarithmic",
"lower_deviation": "lower_deviation_value",
"upper_deviation": "upper_deviation_value"}
for att, mapped in attmap.items():
value = getattr(condition, att)
key = "{}/condition/{}/{}".format(base_key, safe_name,
mapped)
hp[key] = value
md5.update(str(value).encode('utf-8'))
unique_name += binascii.b2a_base64(md5.digest()).decode("utf-8")
self.detector_condition.name = unique_name[:60]
hp.close()
fp.seek(0)
fbytes = fp.read()
del cdata
data = dict()
parm_hash = self.to_dict(silent=silent)
del parm_hash["calibration_constant"]["data"]
parm_hash['h5path'] = base_key
parm_hash['file_indexes.start_idx'] = 0
parm_hash['file_indexes.end_idx'] = 0
data["parameterHashes"] = [parm_hash]
data["bStream"] = fbytes
data["filename"] = "{}.h5".format(fname)
data["subject"] = "send"
parm_hash["pdu_physical_name"] = self.calibration_constant_version.device_name
parm_hash["detector_identifier"] = self.calibration_constant_version.karabo_id
parm_hash["pdu_karabo_da"] = self.calibration_constant_version.karabo_da
if self.calibration_constant_version.report_path:
parm_hash["report"] = self.calibration_constant_version.report_path
resp = _zmq_request(receiver, data, timeout=timeout)
if resp["success"]:
self.comm_db_success = True
if not silent:
print("Successfully sent constant to database!")
else:
msg = "Error sending to database: {}".format(resp)
raise RuntimeError(msg)
[docs] def retrieve(self, receiver: str,
when: Optional[str] = None,
silent: Optional[bool] = True,
timeout: Optional[int] = 30000,
meta_only: Optional[bool] = False,
version_info: Optional[bool] = False,
strategy: Optional[str] = "pdu_closest_by_time"):
"""Retrieve a constant and its meta data from the database at `receiver`.
Receiver should be ZMQ address of of the form `tcp://localhost:5050`.
The `when` parameter defaults to `None`: in this case the most current
constant version will be requested. To request a version for a
specific time, set `when` an iso-formatted time string, e.g. using
`datetime.now().isoformat()`.
If a constant is successfully returned, the requesting
`ConstantMetaData` object updates itself to reflect the parameter
conditions of the constant.
The constant itself can then be found at
`self.calibration_constant.data`.
If meta_only is set, then no constant data is transferred. Instead
constants are read from a .h5 file by the client. The variable
meta_only is overwritten by the environmental variable
`CAL_DB_METAONLY`, if it is defined.
if version_info flag is True, then function returns a list of meta-data
for constant-versions. The meta_only flag in this case is not used.
:param receiver: ZMQ address, e. g. `tcp://localhost:5050`
:param timeout: Timeout for zmq request
:param strategy: Default pdu_closest_by_time, options:
a) pdu_closest_by_time: use physical detector unit to
retrieve CCV closest to measured-at
b) detector_closest_by_time: use karabo-id and karabo-da to
retrieve CCV closest
to measured-time
c) pdu_prior_in_time: use PDU to retrieve CCV prior on time
only to measured-at
:raises: A `RuntimeError` if database communication fails or no
matching constant is found.
"""
self.comm_db_success = False
# Overwrite meta_only with environment variable
meta_only_env = os.environ.get('CAL_DB_METAONLY', None)
if meta_only_env:
meta_only = bool(meta_only_env)
data = dict()
d = self.to_dict(mode="retrieve", silent=silent)
data["detector_condition"] = d["detector_condition"]
data["calibration_name"] = self.calibration_constant.name
# In calibrationDBRemote retrieve function awaits a mandatory
# parameter of karabo_id and either of device_name or karabo_da
data["pdu_physical_name"] = self.calibration_constant_version.device_name # noqa
data["detector_identifier"] = self.calibration_constant_version.karabo_id # noqa
data["pdu_karabo_da"] = self.calibration_constant_version.karabo_da
if when is None:
when = ""
data["measured_at"] = when
# This snapshot will be used only while retrieving
# the correct PDU and appending its UUID
data["snapshot_at"] = when
if version_info:
# TODO: test with new mapping
data["subject"] = "retrieve_version_info"
data["meta_only"] = False
else:
data["subject"] = "retrieve"
data["meta_only"] = meta_only
data["strategy"] = strategy
resp = _zmq_request(receiver, data, timeout=timeout)
if resp["success"]:
self.comm_db_success = True
payload = resp["payload"]
if version_info:
if not silent:
print("Successfully retrieved list of versions!")
return payload
meta = payload["meta"]
own_name = self.calibration_constant.name
assert meta["calibration_name"] == own_name
# Expose retrieve physical name to iCalibrationDB attributes
self.calibration_constant_version.device_name = \
meta["pdu_physical_name"]
if not silent:
print("Successfully retrieved constant from database!")
print("Updating self...")
if meta.get("meta_only", False):
self.calibration_constant.data = None
else:
self.calibration_constant.data = payload["data"]
conditions = []
for name, cdict in meta["detector_conditions"].items():
# Avoid appending detector_uuid.
# This condition should only be added within
# calibrationDBRemote
if name == "Detector_UUID":
continue
condition = OperatingCondition()
condition._datadict = cdict
condition.name = name.replace("_", " ")
condition.available = True
conditions.append(condition)
self.detector_condition.parameters = conditions
def add_data(cname, is_datetime=False):
value = meta.get(cname, None)
if value is not None and is_datetime:
value = dateutil.parser.parse(value)
setattr(self.calibration_constant_version, cname, value)
time_info = ['begin_at', 'begin_validity_at', 'end_validity_at']
stored_metadata = ['ccv_id', 'flg_good_quality',
'hdf5path', 'filename', 'h5path']
for dconst in time_info:
add_data(dconst, is_datetime=True)
for dconst in stored_metadata:
add_data(dconst)
else:
msg = "Error sending to database: {}".format(resp)
raise RuntimeError(msg.replace('\\n', '\n').replace('\\', ''))
[docs] def get_from_version_info(self, ccv):
"""
Return a constant from a dictionary descriptor
returned by `retrieve(..., version_info=True)
:param ccv: calibration constant version
"""
fpath = "{}/{}/{}".format(Settings.base_path,
ccv['path_to_file'],
ccv['file_name'])
with h5py.File(fpath, "r") as f:
arr = f["{}/data".format(ccv['data_set_name'])][()]
return arr
return None
[docs] def retrieve_pdus_for_detector(self, receiver: str, karabo_id: str,
snapshot_at: Optional[str] = "",
timeout: Optional[int] = 30000
) -> List[dict]:
"""Retrieve physical detector units corresponding
to a detector identifier
:param receiver: ZMQ address, e. g. `tcp://localhost:5050`
:param karabo_id: The karabo id which is used
as a detector identifier in CalCat
:param snapshot_at: CalCat database snapshot
:param timeout: Timeout for zmq request
:return:
"""
data = dict()
data["subject"] = "retrieve_detector_pdus"
data["detector_identifier"] = karabo_id
data["snapshot_at"] = snapshot_at
resp = _zmq_request(receiver, data, timeout=timeout)
if resp["success"]:
self.comm_db_success = True
new_payload = [
{
"pdu_physical_name": p["physical_name"],
"karabo_da": p["karabo_da"],
"virtual_device_name": p["virtual_device_name"]
}
for p in resp["payload"]
]
sorted_payload = sorted(new_payload, key=lambda k: k['karabo_da'])
return sorted_payload
else:
msg = "Error sending to the database: {}".format(resp)
raise RuntimeError(msg.replace('\\n', '\n').replace('\\', ''))
[docs] def retrieve_from_version_info(self, ccv):
"""
Retrieve a constant and meta data from a dictionary descriptor
returned by `retrieve(..., version_info=True)
:param ccv: calibration constant version
"""
fpath = "{}/{}/{}".format(Settings.base_path,
ccv['path_to_file'],
ccv['file_name'])
with h5py.File(fpath, "r") as f:
arr = f["{}/data".format(ccv['data_set_name'])][()]
self.calibration_constant = CalibrationConstant()
self.calibration_constant.data = arr
conditions = []
condition_path = "{}/condition".format(ccv['data_set_name'])
for name, group in f[condition_path].items():
# Avoid appending detector_uuid.
# This condition should only be added within
# calibrationDBRemote
if name == "Detector_UUID":
continue
cdict = {}
for key, val in group.items():
cdict[key] = val[()]
condition = OperatingCondition()
condition._datadict = cdict
condition.name = name.replace("_", " ")
condition.available = True
conditions.append(condition)
self.detector_condition = DetectorCondition()
self.detector_condition.parameters = conditions
self.calibration_constant_version = ConstantVersion()
def add_data(cname, is_datetime=False):
value = ccv.get(cname, None)
if value is not None and is_datetime:
value = dateutil.parser.parse(value)
setattr(self.calibration_constant_version, cname, value)
add_data('flg_good_quality')
for dconst in ['begin_at', 'begin_validity_at', 'end_validity_at']:
add_data(dconst, is_datetime=True)
[docs] def update_flg_good_quality(self, receiver, cvv_id, flg_good_quality,
silent=True, timeout=30000):
"""
Update `flg_good_quality` of the calibration constant version.
:param receiver: ZMQ address, e. g. `tcp://localhost:5050`
:param cvv_id: calibration constant version if
:param flg_good_quality: flg_good_quality to be set
:param silent: Set to False to print information
:param timeout: Timeout for zmq request
:return: response message
:raises: A `RuntimeError` if database communication fails or
`update_keys` are not set.
"""
self.calibration_constant_version = ConstantVersion()
self.calibration_constant_version.ccv_id = cvv_id
self.calibration_constant_version.good_quality = flg_good_quality
self._update(receiver, update_keys=(['ccv_id', 'flg_good_quality']),
silent=silent, timeout=timeout)
def _update(self, receiver, update_keys=None, silent=True, timeout=30000):
"""
Update calibration constant version in the database at `receiver`.
Receiver should be ZMQ address of the form `tcp://localhost:5050`.
The generic function is part of the private interface and should be
exposed via dedicated methods for specific parameters.
:param receiver: ZMQ address, e. g. `tcp://localhost:5050`
:param update_keys: List oof keys to be updated
:param silent: Set to False to print information
:param timeout: Timeout for zmq request
:return: response message
:raises: A `RuntimeError` if database communication fails or
`update_keys` are not set.
"""
cvv_dict = getattr(self.calibration_constant_version, "__dict__", {})
self.comm_db_success = False
data = dict()
if '_datadict' in cvv_dict:
if update_keys is not None:
for key in update_keys:
value = cvv_dict['_datadict'].get(key, None)
if value is None:
msg = 'Value {} is not set'.format(key)
raise RuntimeError(msg)
else:
data[key] = value
else:
data = cvv_dict['_datadict'].copy()
else:
msg = 'Calibration_constant_version is not set'
raise RuntimeError(msg)
data["subject"] = "update"
resp = _zmq_request(receiver, data, timeout=timeout)
if resp["success"]:
self.comm_db_success = True
if not silent:
print("Successfully updated constant version")
else:
msg = "Error sending to database: {}".format(resp)
raise RuntimeError(msg)
def _zmq_request(endpoint, data, timeout=30000):
"""Make a request to endpoint with a temporary ZMQ socket
This creates a REQ socket, sends data (pickled), receives and unpickles
a reply.
:param str endpoint: The address of a ZMQ REP socket to connect to.
:param data: A Python object to pickle and send as a request.
:param int timeout: Timeout in ms. Actual maximum time is double this
(send & receive).
"""
cont = zmq.Context.instance()
sock = cont.socket(zmq.REQ)
try:
sock.setsockopt(zmq.LINGER, 0)
sock.SNDTIMEO = timeout
sock.RCVTIMEO = timeout
sock.connect(endpoint)
sock.send_pyobj(data)
return sock.recv_pyobj()
finally:
sock.close()