Skip to content

calcat_interface2

CalCatAPIClient

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
class CalCatAPIClient:
    def __init__(self, base_api_url, oauth_client=None, user_email=""):
        if oauth_client is not None:
            self.oauth_client = oauth_client
            self.session = self.oauth_client.session
        else:
            # Oauth disabled - used with base_api_url pointing to an
            # xfel-oauth-proxy instance
            self.oauth_client = None
            self.session = requests.Session()

        self.user_email = user_email
        # Ensure the base URL has a trailing slash
        self.base_api_url = base_api_url.rstrip("/") + "/"

    def default_headers(self):
        return {
            "content-type": "application/json",
            "Accept": "application/json; version=2",
            "X-User-Email": self.user_email,
        }

    @classmethod
    def format_time(cls, dt):
        """Parse different ways to specify time to CalCat."""

        if isinstance(dt, datetime):
            return dt.astimezone(timezone.utc).isoformat()
        elif isinstance(dt, date):
            return cls.format_time(datetime.combine(dt, time()))
        elif not isinstance(dt, str):
            raise TypeError(
                f"Timestamp parameter ({dt!r}) must be a string, datetime or "
                f"date object"
            )

        return dt

    def get_request(self, relative_url, params=None, headers=None, **kwargs):
        """Make a GET request, return the HTTP response object"""
        # Base URL may include e.g. '/api/'. This is a prefix for all URLs;
        # even if they look like an absolute path.
        url = urljoin(self.base_api_url, relative_url.lstrip("/"))
        _headers = self.default_headers()
        if headers:
            _headers.update(headers)
        return self.session.get(url, params=params, headers=_headers, **kwargs)

    @staticmethod
    def _parse_response(resp: requests.Response):
        if resp.status_code >= 400:
            try:
                d = json.loads(resp.content.decode("utf-8"))
            except Exception:
                resp.raise_for_status()
            else:
                raise CalCatAPIError(
                    f"Error {resp.status_code} from API: "
                    f"{d.get('info', 'missing details')}"
                )

        if resp.content == b"":
            return None
        else:
            return json.loads(resp.content.decode("utf-8"))

    def get(self, relative_url, params=None, **kwargs):
        """Make a GET request, return response content from JSON"""
        resp = self.get_request(relative_url, params, **kwargs)
        return self._parse_response(resp)

    _pagination_headers = (
        "X-Total-Pages",
        "X-Count-Per-Page",
        "X-Current-Page",
        "X-Total-Count",
    )

    def get_paged(self, relative_url, params=None, **kwargs):
        """Make a GET request, return response content & pagination info"""
        resp = self.get_request(relative_url, params, **kwargs)
        content = self._parse_response(resp)
        pagination_info = {
            k[2:].lower().replace("-", "_"): int(resp.headers[k])
            for k in self._pagination_headers
            if k in resp.headers
        }
        return content, pagination_info

    # ------------------
    # Cached wrappers for simple ID lookups of fixed-ish info
    #
    # N.B. lru_cache behaves oddly with instance methods (it's a global cache,
    # with the instance as part of the key), but in this case it should be OK.
    @lru_cache()
    def calibration_by_id(self, cal_id):
        return self.get(f"calibrations/{cal_id}")

    @lru_cache()
    def detector_by_id(self, det_id):
        return self.get(f"detectors/{det_id}")

    # --------------------
    # Shortcuts to find 1 of something by an ID-like field (e.g. name) other
    # than CalCat's own integer IDs. Error on no match or >1 matches.
    @lru_cache()
    def detector_by_identifier(self, identifier):
        # The "identifier", "name" & "karabo_name" fields seem to have the same names
        res = self.get("detectors", {"identifier": identifier})
        if not res:
            raise KeyError(f"No detector with identifier {identifier}")
        elif len(res) > 1:
            raise ValueError(f"Multiple detectors found with identifier {identifier}")
        return res[0]

    @lru_cache()
    def calibration_by_name(self, name):
        res = self.get("calibrations", {"name": name})
        if not res:
            raise KeyError(f"No calibration with name {name}")
        elif len(res) > 1:
            raise ValueError(f"Multiple calibrations found with name {name}")
        return res[0]

format_time(dt) classmethod

Parse different ways to specify time to CalCat.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
@classmethod
def format_time(cls, dt):
    """Parse different ways to specify time to CalCat."""

    if isinstance(dt, datetime):
        return dt.astimezone(timezone.utc).isoformat()
    elif isinstance(dt, date):
        return cls.format_time(datetime.combine(dt, time()))
    elif not isinstance(dt, str):
        raise TypeError(
            f"Timestamp parameter ({dt!r}) must be a string, datetime or "
            f"date object"
        )

    return dt

get(relative_url, params=None, **kwargs)

Make a GET request, return response content from JSON

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
def get(self, relative_url, params=None, **kwargs):
    """Make a GET request, return response content from JSON"""
    resp = self.get_request(relative_url, params, **kwargs)
    return self._parse_response(resp)

get_paged(relative_url, params=None, **kwargs)

Make a GET request, return response content & pagination info

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
def get_paged(self, relative_url, params=None, **kwargs):
    """Make a GET request, return response content & pagination info"""
    resp = self.get_request(relative_url, params, **kwargs)
    content = self._parse_response(resp)
    pagination_info = {
        k[2:].lower().replace("-", "_"): int(resp.headers[k])
        for k in self._pagination_headers
        if k in resp.headers
    }
    return content, pagination_info

get_request(relative_url, params=None, headers=None, **kwargs)

Make a GET request, return the HTTP response object

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
def get_request(self, relative_url, params=None, headers=None, **kwargs):
    """Make a GET request, return the HTTP response object"""
    # Base URL may include e.g. '/api/'. This is a prefix for all URLs;
    # even if they look like an absolute path.
    url = urljoin(self.base_api_url, relative_url.lstrip("/"))
    _headers = self.default_headers()
    if headers:
        _headers.update(headers)
    return self.session.get(url, params=params, headers=_headers, **kwargs)

CalCatAPIError

Bases: requests.HTTPError

Used when the response includes error details as JSON

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
class CalCatAPIError(requests.HTTPError):
    """Used when the response includes error details as JSON"""

CalibrationData

Bases: Mapping

Collected constants for a given detector

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
class CalibrationData(Mapping):
    """Collected constants for a given detector"""

    def __init__(self, constant_groups, module_details, detector_name):
        # {calibration: {karabo_da: SingleConstant}}
        self.constant_groups = constant_groups
        self.module_details = module_details
        self.detector_name = detector_name

    @staticmethod
    def _format_cond(condition):
        """Encode operating condition to CalCat API format.

        Args:
            condition (dict): Mapping of parameter DB name to value

        Returns:
            (dict) Operating condition for use in CalCat API.
        """

        return {
            "parameters_conditions_attributes": [
                {"parameter_name": k, "value": str(v)} for k, v in condition.items()
            ]
        }

    @classmethod
    def from_condition(
        cls,
        condition: "ConditionsBase",
        detector_name,
        calibrations=None,
        client=None,
        event_at=None,
        pdu_snapshot_at=None,
    ):
        if calibrations is None:
            calibrations = set(condition.calibration_types)
        if pdu_snapshot_at is None:
            pdu_snapshot_at = event_at

        cal_types_by_params_used = {}
        for cal_type, params in condition.calibration_types.items():
            if cal_type in calibrations:
                cal_types_by_params_used.setdefault(tuple(params), []).append(cal_type)

        client = client or get_client()

        detector_id = client.detector_by_identifier(detector_name)["id"]
        pdus = client.get(
            "physical_detector_units/get_all_by_detector",
            {
                "detector_id": detector_id,
                "pdu_snapshot_at": client.format_time(pdu_snapshot_at),
            },
        )
        module_details = sorted(pdus, key=lambda d: d["karabo_da"])
        for mod in module_details:
            if mod.get("module_number") is None:
                mod["module_number"] = int(re.findall(r"\d+", mod["karabo_da"])[-1])

        constant_groups = {}

        for params, cal_types in cal_types_by_params_used.items():
            condition_dict = condition.make_dict(params)

            cal_id_map = {
                client.calibration_by_name(name)["id"]: name for name in cal_types
            }
            calibration_ids = list(cal_id_map.keys())

            query_res = client.get(
                "calibration_constant_versions/get_by_detector_conditions",
                {
                    "detector_identifier": detector_name,
                    "calibration_id": str(calibration_ids),
                    "karabo_da": "",
                    "event_at": client.format_time(event_at),
                    "pdu_snapshot_at": client.format_time(pdu_snapshot_at),
                },
                data=json.dumps(cls._format_cond(condition_dict)),
            )

            for ccv in query_res:
                aggr = ccv["physical_detector_unit"]["karabo_da"]
                cal_type = cal_id_map[ccv["calibration_constant"]["calibration_id"]]

                const_group = constant_groups.setdefault(cal_type, {})
                const_group[aggr] = SingleConstant.from_response(ccv)

        return cls(constant_groups, module_details, detector_name)

    @classmethod
    def from_report(
        cls,
        report_id_or_path: Union[int, str],
        client=None,
    ):
        client = client or get_client()

        # Use max page size, hopefully always enough for CCVs from 1 report
        params = {"page_size": 500}
        if isinstance(report_id_or_path, int):
            params["report_id"] = report_id_or_path  # Numeric ID
        else:
            params["report.file_path"] = str(report_id_or_path)

        res = client.get("calibration_constant_versions", params)

        constant_groups = {}
        pdus = {}  # keyed by karabo_da (e.g. 'AGIPD00')
        det_ids = set()  # Should only have one detector

        for ccv in res:
            pdu = ccv["physical_detector_unit"]
            # We're only interested in the PDU mapping from the CCV start time
            kda = pdu["karabo_da"] = pdu.pop("karabo_da_at_ccv_begin_at")
            det_id = pdu["detector_id"] = pdu.pop("detector_id_at_ccv_begin_at")
            pdu["virtual_device_name"] = pdu.pop("virtual_device_name_at_ccv_begin_at")
            if pdu.get("module_number_at_ccv_begin_at") is not None:
                pdu["module_number"] = pdu.pop("module_number_at_ccv_begin_at")
            else:
                pdu["module_number"] = int(re.findall(r"\d+", kda)[-1])

            det_ids.add(det_id)
            if kda in pdus:
                if pdu["physical_name"] != pdus[kda]["physical_name"]:
                    raise Exception(
                        f"Mismatched PDU mapping from calibration report: {kda} is both"
                        f" {pdu['physical_name']} and {pdus[kda]['physical_name']}"
                    )
            else:
                pdus[kda] = pdu

            cal_type = client.calibration_by_id(
                ccv["calibration_constant"]["calibration_id"]
            )["name"]
            const_group = constant_groups.setdefault(cal_type, {})
            const_group[kda] = SingleConstant.from_response(ccv)

        if len(det_ids) > 1:
            raise Exception(f"Found multiple detector IDs in report: {det_ids}")
        # The "identifier", "name" & "karabo_name" fields seem to have the same names
        det_name = client.detector_by_id(det_ids.pop())["identifier"]

        module_details = sorted(pdus.values(), key=lambda d: d["karabo_da"])
        return cls(constant_groups, module_details, det_name)

    def __getitem__(self, key) -> MultiModuleConstant:
        if isinstance(key, str):
            return MultiModuleConstant(
                self.constant_groups[key], self.module_details, self.detector_name, key
            )
        elif isinstance(key, tuple) and len(key) == 2:
            cal_type, module = key
            return self[cal_type][module]
        else:
            raise TypeError(f"Key should be string or 2-tuple (got {key!r})")

    def __iter__(self):
        return iter(self.constant_groups)

    def __len__(self):
        return len(self.constant_groups)

    def __contains__(self, item):
        return item in self.constant_groups

    def __repr__(self):
        return (
            f"<CalibrationData: {', '.join(sorted(self.constant_groups))} "
            f"constants for {len(self.module_details)} modules of {self.detector_name}>"
        )

    # These properties may include modules for which we have no constants -
    # when created with .from_condition(), they represent all modules present in
    # the detector (at the specified time).
    @property
    def module_nums(self):
        return [m["module_number"] for m in self.module_details]

    @property
    def aggregator_names(self):
        return [m["karabo_da"] for m in self.module_details]

    @property
    def qm_names(self):
        return [m["virtual_device_name"] for m in self.module_details]

    @property
    def pdu_names(self):
        return [m["physical_name"] for m in self.module_details]

    def require_calibrations(self, calibrations):
        """Drop any modules missing the specified constant types"""
        mods = set(self.aggregator_names)
        for cal_type in calibrations:
            mods.intersection_update(self[cal_type].constants)
        return self.select_modules(aggregator_names=mods)

    def select_modules(
        self, module_nums=None, *, aggregator_names=None, qm_names=None
    ) -> "CalibrationData":
        # Validate the specified modules against those we know about.
        # Each specific constant type may have only a subset of these modules.
        aggs = prepare_selection(
            self.module_details, module_nums, aggregator_names, qm_names
        )
        constant_groups = {}
        matched_aggregators = set()
        for cal_type, const_group in self.constant_groups.items():
            constant_groups[cal_type] = d = {
                aggr: const for (aggr, const) in const_group.items() if aggr in aggs
            }
            matched_aggregators.update(d.keys())
        module_details = [
            m for m in self.module_details if m["karabo_da"] in matched_aggregators
        ]
        return type(self)(constant_groups, module_details, self.detector_name)

    def select_calibrations(self, calibrations) -> "CalibrationData":
        const_groups = {c: self.constant_groups[c] for c in calibrations}
        return type(self)(const_groups, self.module_details, self.detector_name)

    def merge(self, *others: "CalibrationData") -> "CalibrationData":
        det_names = set(cd.detector_name for cd in (self,) + others)
        if len(det_names) > 1:
            raise Exception(
                "Cannot merge calibration data for different "
                "detectors: " + ", ".join(sorted(det_names))
            )
        det_name = det_names.pop()

        cal_types = set(self.constant_groups)
        aggregators = set(self.aggregator_names)
        pdus_d = {m["karabo_da"]: m for m in self.module_details}
        for other in others:
            cal_types.update(other.constant_groups)
            aggregators.update(other.aggregator_names)
            for md in other.module_details:
                # Warn if constants don't refer to same modules
                md_da = md["karabo_da"]
                if md_da in pdus_d:
                    pdu_a = pdus_d[md_da]["physical_name"]
                    pdu_b = md["physical_name"]
                    if pdu_a != pdu_b:
                        warn(
                            f"Merging constants with different modules for "
                            f"{md_da}: {pdu_a!r} != {pdu_b!r}",
                            stacklevel=2,
                        )
                else:
                    pdus_d[md_da] = md

        module_details = sorted(pdus_d.values(), key=lambda d: d["karabo_da"])

        constant_groups = {}
        for cal_type in cal_types:
            d = constant_groups[cal_type] = {}
            for caldata in (self,) + others:
                if cal_type in caldata:
                    d.update(caldata.constant_groups[cal_type])

        return type(self)(constant_groups, module_details, det_name)

require_calibrations(calibrations)

Drop any modules missing the specified constant types

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
def require_calibrations(self, calibrations):
    """Drop any modules missing the specified constant types"""
    mods = set(self.aggregator_names)
    for cal_type in calibrations:
        mods.intersection_update(self[cal_type].constants)
    return self.select_modules(aggregator_names=mods)

MultiModuleConstant dataclass

Bases: Mapping

A group of similar constants for several modules of one detector

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
@dataclass
class MultiModuleConstant(Mapping):
    """A group of similar constants for several modules of one detector"""

    constants: Dict[str, SingleConstant]  # Keys e.g. 'LPD00'
    module_details: List[Dict]
    detector_name: str  # e.g. 'HED_DET_AGIPD500K2G'
    calibration_name: str

    def __repr__(self):
        return (
            f"<MultiModuleConstant: {self.calibration_name} for "
            f"{len(self.constants)} modules of {self.detector_name}>"
        )

    def __iter__(self):
        return iter(self.constants)

    def __len__(self):
        return len(self.constants)

    def __getitem__(self, key):
        if key in (None, ""):
            raise KeyError(key)

        candidate_kdas = set()
        if key in self.constants:  # Karabo DA name, e.g. 'LPD00'
            candidate_kdas.add(key)

        for m in self.module_details:
            names = (m["module_number"], m["virtual_device_name"], m["physical_name"])
            if key in names and m["karabo_da"] in self.constants:
                candidate_kdas.add(m["karabo_da"])

        if not candidate_kdas:
            raise KeyError(key)
        elif len(candidate_kdas) > 1:
            raise KeyError(f"Ambiguous key: {key} matched {candidate_kdas}")

        return self.constants[candidate_kdas.pop()]

    def select_modules(
        self, module_nums=None, *, aggregator_names=None, qm_names=None
    ) -> "MultiModuleConstant":
        aggs = prepare_selection(
            self.module_details, module_nums, aggregator_names, qm_names
        )
        d = {aggr: scv for (aggr, scv) in self.constants.items() if aggr in aggs}
        mods = [m for m in self.module_details if m["karabo_da"] in d]
        return replace(self, constants=d, module_details=mods)

    # These properties label only the modules we have constants for, which may
    # be a subset of what's in module_details
    @property
    def aggregator_names(self):
        return sorted(self.constants)

    @property
    def module_nums(self):
        return [
            m["module_number"]
            for m in self.module_details
            if m["karabo_da"] in self.constants
        ]

    @property
    def qm_names(self):
        return [
            m["virtual_device_name"]
            for m in self.module_details
            if m["karabo_da"] in self.constants
        ]

    @property
    def pdu_names(self):
        return [
            m["physical_name"]
            for m in self.module_details
            if m["karabo_da"] in self.constants
        ]

    def ndarray(self, caldb_root=None, *, parallel=0):
        eg_dset = self.constants[self.aggregator_names[0]].dataset_obj(caldb_root)
        shape = (len(self.constants),) + eg_dset.shape

        if parallel > 0:
            load_ctx = psh.ProcessContext(num_workers=parallel)
        else:
            load_ctx = psh.SerialContext()

        arr = psh.alloc(shape, eg_dset.dtype, fill=0)

        def _load_constant_dataset(wid, index, mod):
            dset = self.constants[mod].dataset_obj(caldb_root)
            dset.read_direct(arr[index])

        load_ctx.map(_load_constant_dataset, self.aggregator_names)
        return arr

    def xarray(self, module_naming="modnum", caldb_root=None, *, parallel=0):
        import xarray

        if module_naming == "aggregator":
            modules = self.aggregator_names
        elif module_naming == "modnum":
            modules = self.module_nums
        elif module_naming == "qm":
            modules = self.qm_names
        else:
            raise ValueError(
                f"{module_naming=} (must be 'aggregator', 'modnum' or 'qm'"
            )

        ndarr = self.ndarray(caldb_root, parallel=parallel)

        # Dimension labels
        dims = ["module"] + ["dim_%d" % i for i in range(ndarr.ndim - 1)]
        coords = {"module": modules}
        name = self.calibration_name

        return xarray.DataArray(ndarr, dims=dims, coords=coords, name=name)

SingleConstant dataclass

A calibration constant for one detector module

CalCat calls this a calibration constant version (CCV).

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
@dataclass
class SingleConstant:
    """A calibration constant for one detector module

    CalCat calls this a calibration constant version (CCV).
    """

    path: Path
    dataset: str
    ccv_id: Optional[int]
    pdu_name: Optional[str]
    _metadata: dict = field(default_factory=dict)
    _have_calcat_metadata: bool = False

    @classmethod
    def from_response(cls, ccv: dict) -> "SingleConstant":
        return cls(
            path=Path(ccv["path_to_file"]) / ccv["file_name"],
            dataset=ccv["data_set_name"],
            ccv_id=ccv["id"],
            pdu_name=ccv["physical_detector_unit"]["physical_name"],
            _metadata=ccv,
            _have_calcat_metadata=True,
        )

    def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
        if caldb_root is not None:
            caldb_root = Path(caldb_root)
        else:
            caldb_root = _get_default_caldb_root()

        f = h5py.File(caldb_root / self.path, "r")
        return f[self.dataset]["data"]

    def ndarray(self, caldb_root=None):
        return self.dataset_obj(caldb_root)[:]

    def _load_calcat_metadata(self, client=None):
        client = client or get_client()
        calcat_meta = client.get(f"calibration_constant_versions/{self.ccv_id}")
        # Any metadata we already have takes precedence over CalCat, so
        # this can't change a value that was previously returned.
        self._metadata = calcat_meta | self._metadata
        self._have_calcat_metadata = True

    def metadata(self, key, client=None):
        """Get a specific metadata field, e.g. 'begin_validity_at'

        This may make a request to CalCat if the value is not already known.
        """
        if key not in self._metadata and not self._have_calcat_metadata:
            if self.ccv_id is None:
                raise KeyError(f"{key!r} (no CCV ID to request data from CalCat")
            self._load_calcat_metadata(client)

        return self._metadata[key]

    def metadata_dict(self, client=None):
        """Get a dict of available metadata

        If this constant didn't come from CalCat but we have a CalCat CCV ID,
        this will fetch metadata from CalCat.
        """
        if (not self._have_calcat_metadata) and (self.ccv_id is not None):
            self._load_calcat_metadata(client)
        return self._metadata.copy()

metadata(key, client=None)

Get a specific metadata field, e.g. 'begin_validity_at'

This may make a request to CalCat if the value is not already known.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
def metadata(self, key, client=None):
    """Get a specific metadata field, e.g. 'begin_validity_at'

    This may make a request to CalCat if the value is not already known.
    """
    if key not in self._metadata and not self._have_calcat_metadata:
        if self.ccv_id is None:
            raise KeyError(f"{key!r} (no CCV ID to request data from CalCat")
        self._load_calcat_metadata(client)

    return self._metadata[key]

metadata_dict(client=None)

Get a dict of available metadata

If this constant didn't come from CalCat but we have a CalCat CCV ID, this will fetch metadata from CalCat.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/calcat_interface2.py
def metadata_dict(self, client=None):
    """Get a dict of available metadata

    If this constant didn't come from CalCat but we have a CalCat CCV ID,
    this will fetch metadata from CalCat.
    """
    if (not self._have_calcat_metadata) and (self.ccv_id is not None):
        self._load_calcat_metadata(client)
    return self._metadata.copy()