#28: Karabo Kai - The MDL Dojo

The Karabo middlelayer is a high level framework that does a lot of procedures implicitly. The following sections are provided to introduce a common code style and highlight a few pitfalls and shows best practices using the asyncio python library according to the motto of this dojo:

  • coroutine first

  • task hard

  • no threads

What does it mean? Firstly, use as much awaitables as possible, e.g. prefer the async implementation over a synchronous one. If code execution is taking long (io operations, slot calls) move procedures to a background task.

Code Style

In general PEP8, PEP20, and PEP257 are stylings to follow, there are a few Karabo-specific details to take care of to improve code quality.

An example of a major exception from PEP8 is that underscores should never be used for public device properties or slots.

Imports

Imports follow isort style: they are first in resolution order (built-ins, external libraries, Karabo, project), then in import style (from imports, imports), then in alphabetical order:

import sys
from asyncio import wait_for

import numpy as np

from karabo.middlelayer import (
    connectDevice, Device, Slot, String)

from .scenes import control, default

As a rule of thumb, isort -m4 can solve your problems quickly.

Note

Don’t do star (*) imports! Your readers don’t know which parts you will be using.

Note

Don’t do submodule imports, e.g. (karabo.middlelayer.*). The underlying API might change and your device won’t work anymore.

Class Definitions

Classes should be CamelCased:

class MyDevice(Device):
    pass

Abbreviations in class names should be capitalised:

class JJAttenutator(Device):
    pass

class SA3MirrorsWitch(Device):
    pass

Class Properties

Properties part of the device’s schema, which are exposed, should be camelCased, whereas non-exposed variables should have_underscores:

name = String(displayedName="Name")

someOtherString = String(
    displayedName="Other string",
    defaultValue="Hello",
    accessMode=AccessMode.READONLY)

valid_ids = ["44eab", "ff64d"]

Slots and Methods

Slots are camelCased, methods have_underscores. Public slots don’t have arguments.

@Slot(displayedName="Execute")
async def execute(self):
    """This is a public slot is exposed to the system"""
    self.state = State.ACTIVE
    await self.execute_action()

@Slot(displayedName="Abort",
     allowedStates={State.ACTIVE, State.ERROR})
async def abortNow(self):
    self.state = state.STOPPING
    await self.abort_action()

async def execute_action(self):
    """This is not exposed, and therefore PEP8"""
    pass

Use Doubles, not Floats

Python floats have double precision. Hence, always use the Double declaration in a device instead of a Float whenever possible. Otherwise you might lose precision and have casting differences in top layer applications.

import numpy as np

Do = Double()
Dont = Float()


value = np.float32(0.1234)

>>> value
0.1234
>>> float(x)
0.1234000027179718

Karabo Values - Units and Timestamps

Setting a value on a device property automatically converts the value to a KaraboValue. A KaraboValue can have a unit and a timestamp. A timestamp is automatically assigned if the new value does not have one assigned.

from karabo.middlelayer import Unit, MetricPrefix

value = Double(
    unitSymbol=Unit.SECOND,
    metricPrefixSymbol=MetricPrefix.MILLI)

@Slot()
def some_function(self):
    self.value = 5
    print(self.value)
    print(self.value.timestamp)

>>> some_function()
5.0 ms
2022-04-26T12:16:26.423016

Timestamps are preserved under the hood when calculating.

from karabo.middlelayer import unit

def calculate_new(self):
    value = self.value
    print(value)
    print(self.value.timestamp)

    # Calculate a new value
    new_value = value + 20 * unit.ms
    print(self.value)
    print(self.value.timestamp)

>>> some_function()
5.0 ms
2022-04-26T12:16:26.423016
25.0 ms
2022-04-26T12:16:26.423016

With more KaraboValues, the newest timestamp is taken.

def calculate_new(self):
    encoder_position = self.encoderPosition
    print(encoder_position)
    print(encoder_position.timestamp)

    offset = self.offset
    print(offset)
    print(offset.timestamp)

    # Calculate the total
    total = encoder_position + offset
    print(total)
    print(total.timestamp)

>>> some_function()
5.0 mm
2022-04-26T12:16:26.423016
17.3 mm
2022-04-26T12:17:22.423016
22.3 mm
2022-04-26T12:17:22.423016

Warning

If the units of the values cannot be casted together, there will be an exception.

Karabo Values - Comparison

Dealing with KaraboValues is an important fact to always remember. A very often realized pitfall is the comparison by identity (is), that fails. KaraboValues cannot be compared so singleton values by identity (None, True, False).

from karabo.middlelayer import isSet

isLocked = Bool(
    defaultValue=True)

position = Double()

def check(self):
    print(self.isLocked is True)
    print(self.position is None)
    print(self.isLocked.value is True)
    print(isSet(position))

>>> check()
False
False
True
True

Exception handling

Exception handling in Karabo devices is important. Of course, ideally only expected or no exceptions are occuring. Please don’t use bare ``try … except` coding pattern. If an exception is triggered in a slot or reconfiguration, an error message is propagated to the gui client and a pop up is shown. If an exception is happening in a background procedure, it is visible in the logging system. Only this way errors do not propagate without knowledge.

@Slot()
async def dontDo(self):
    try:
        # This is a bad example
    except Exception:
        self.state = State.ERROR

@Slot()
async def slotItLikeAnEngineer(self):
    # Work with expected exceptions
    try:
        position = self.positions[self.index]
    except IndexError:
        # do something

@Slot()
async def workingWithWaitFor(self):
    # Slots are not allowed to take time
    await wait_for(setWait("DEVICENOTTHERE", position=5), timeout=2)

@Slot()
async def timeoutSlot(self):
    try:
        await wait_for(setWait("DEVICENOTTHERE", position=5), timeout=2)
    except TimeoutError:
        self.status = "Device not available"

Note

The TimeoutError must be imported from asyncio.

Numpy and KaraboValues

Numpy is an extensively used package for data analysis routines. When using KaraboValues, not every function is supported.

The package that supports the units is pint. In order to make our newest timestamp and unit algorithm work, the KaraboValues rely on element wise math operations. With regard to numpy, we are talking about so-called ufuncs (universal functions).

As an example, numpy.mean and numpy.std are not universal functions and won’t work properly with KaraboValues. Also pint is not yet supporting every universal function and we have found:

  • numpy.positive

  • numpy.divmod

  • numpy.heaviside

  • numpy.gcd

  • numpy.lcm

  • numpy.bitwise_and

  • numpy.bitwise_xor

  • numpy.bitwise_or

  • numpy.invert

  • numpy.left_shift

  • numpy.right_shift

  • numpy.logical_and

  • numpy.logical_or

  • numpy.logical_xor

  • numpy.logical_not

  • numpy.spacing

Ideally mathmatical operations are always factored out in an own module and unit tested.

Working with KaraboValues and numpy is discouraged and it is the developers responsibility to backup his device code with unit-tests. Numpy, pint and python dependencies are continously upgraded! However, it is possible to use the magnitudes of KaraboValues with .value in numpy functions.

Slot Calls: Reply with correct State

Devices come with expectations. If we call a motor to move, we expect it after the slot (move) call to be in a MOVING state.

Similar scenarios are there for all devices. Hence, we must reply with the correct State, but also very quick!

@Slot()
async def onlyAllowedInMacro(self):
    self.state = State.ACTIVE
    await sleep(20)
    self.state = State.PASSIVE

@Slot(allowedStates=[State.PASSIVE])
async def move(self):
    self.state = State.ACTIVE
    background(self._long_operation())
    # The Slot method ends, the reply to the user happens without error
    # and the device is in ACTIVE State.

async def _long_operation(self):
    await sleep(20)
    self.state = State.PASSIVE

Setter knowhow and instantiation workflow

Devices initialize firstly their Schema (Pipeline Channel and Device Nodes) and then enter the onInitialization function. The device state by default is State.UNKNOWN.

class MyDevice(Device):
    is_connected = False

    @Double()
    def targetPosition(self, value):
        if value is None:
            return
        if self.is_connected:
            self.send_target_hardware(value)
        self.targetPosition = value

    async def onInitialization(self):
        # We enter as State.UNKNOWN (default)
        await self.connect_hardware()
        self.is_connected = True
        self.state = State.ON

State Monitor: StateSignifier and background

A typical task of a middlelayer device is to merge a lot of device states to a single one. For this task always use the StateSignifier. This example shows how to build a state monitor. Note in this case the extra attributes assignment and accessMode.

from karabo.middlelayer import (StateSignifier, Assignment, AccessMode,
                                background, connectDevice, ...)

class MyDevice(Device):

    mirrorXId = String(
        accessMode=AccessMode.INITONLY,
        assignment=Assignment.MANDATORY)

    mirrorYId = String(
        accessMode=AccessMode.INITONLY,
        assignment=Assignment.MANDATORY)

    def __init__(self, configuration):
        super().__init__(configuration)
        self.signifier = StateSignifier()

    async def onInitialization(self):

        devs = [connectDevice(self.mirrorXId),
                connectDevice(self.mirrorYId)]
        self.devices = await gather(*devs)
        background(self._monitor_state())

    async def _monitor_state(self):
        while True:
            props = [dev.state for dev in self.devices]
            # The state signifier also provides the newest timestamp
            state = self.signifier.returnMostSignificant(props)
            if state != self.state:
                # Only set a different state if necessary
                self.state = state
            await waitUntilNew(*props)

Pipelining: InputChannel and proxies

A typical task of a middlelayer device can also be to combine input channel data (fast data, pipeline data) and proxy data (slow control data). Important to know is that InputChannel connections are created in the background to not block the device instantiation.

from karabo.middlelayer import connectDevice, InputChannel

class MyDevice(Device):
    is_connected = False

    mirrorXId = String(
        accessMode=AccessMode.INITONLY,
        assignment=Assignment.MANDATORY)

    mirrorYId = String(
        accessMode=AccessMode.INITONLY,
        assignment=Assignment.MANDATORY)

    def __init__(self, configuration):
        super().__init__(configuration)
        self.signifier = StateSignifier()

    async def onInitialization(self):
        # InputChannels might be already alive but we don't have the proxies
        # yet
        devs = [connectDevice(self.mirrorXId),
                connectDevice(self.mirrorYId)]
        self.devices = await gather(*devs)
        self.is_connected = True

    @InputChannel()
    async def input(self, data, meta):
        if not self.is_connected:
            return
        # Do something with proxy and input data
        timestamp = meta.timestamp.timestamp
        train_id = timestamp.tid
        position = self.devices[0].actualPosition
        ...