#28: Karabo Kai - The MDL Dojo¶
The Karabo middlelayer is a high level framework that does a lot of procedures implicitly. The following sections are provided to introduce a common code style and highlight a few pitfalls and shows best practices using the asyncio python library according to the motto of this dojo:
coroutine first
task hard
no threads
What does it mean? Firstly, use as much awaitables as possible, e.g. prefer the async implementation over a synchronous one. If code execution is taking long (io operations, slot calls) move procedures to a background task.
Code Style¶
In general PEP8, PEP20, and PEP257 are stylings to follow, there are a few Karabo-specific details to take care of to improve code quality.
An example of a major exception from PEP8 is that underscores should never be used for public device properties or slots.
Imports¶
Imports follow isort style: they are first in resolution order (built-ins, external libraries, Karabo, project), then in import style (from imports, imports), then in alphabetical order:
import sys
from asyncio import wait_for
import numpy as np
from karabo.middlelayer import (
connectDevice, Device, Slot, String)
from .scenes import control, default
As a rule of thumb, isort -m4 can solve your problems quickly.
Note
Don’t do star (*) imports! Your readers don’t know which parts you will be using.
Note
Don’t do submodule imports, e.g. (karabo.middlelayer.*). The underlying API might change and your device won’t work anymore.
Class Definitions¶
Classes should be CamelCased:
class MyDevice(Device):
pass
Abbreviations in class names should be capitalised:
class JJAttenutator(Device):
pass
class SA3MirrorsWitch(Device):
pass
Class Properties¶
Properties part of the device’s schema, which are exposed, should be camelCased, whereas non-exposed variables should have_underscores:
name = String(displayedName="Name")
someOtherString = String(
displayedName="Other string",
defaultValue="Hello",
accessMode=AccessMode.READONLY)
valid_ids = ["44eab", "ff64d"]
Slots and Methods¶
Slots are camelCased, methods have_underscores. Public slots don’t have arguments.
@Slot(displayedName="Execute")
async def execute(self):
"""This is a public slot is exposed to the system"""
self.state = State.ACTIVE
await self.execute_action()
@Slot(displayedName="Abort",
allowedStates={State.ACTIVE, State.ERROR})
async def abortNow(self):
self.state = state.STOPPING
await self.abort_action()
async def execute_action(self):
"""This is not exposed, and therefore PEP8"""
pass
Use Doubles, not Floats¶
Python floats have double precision. Hence, always use the Double declaration in a device instead of a Float whenever possible. Otherwise you might lose precision and have casting differences in top layer applications.
import numpy as np
Do = Double()
Dont = Float()
value = np.float32(0.1234)
>>> value
0.1234
>>> float(x)
0.1234000027179718
Karabo Values - Units and Timestamps¶
Setting a value on a device property automatically converts the value to a KaraboValue. A KaraboValue can have a unit and a timestamp. A timestamp is automatically assigned if the new value does not have one assigned.
from karabo.middlelayer import Unit, MetricPrefix
value = Double(
unitSymbol=Unit.SECOND,
metricPrefixSymbol=MetricPrefix.MILLI)
@Slot()
def some_function(self):
self.value = 5
print(self.value)
print(self.value.timestamp)
>>> some_function()
5.0 ms
2022-04-26T12:16:26.423016
Timestamps are preserved under the hood when calculating.
from karabo.middlelayer import unit
def calculate_new(self):
value = self.value
print(value)
print(self.value.timestamp)
# Calculate a new value
new_value = value + 20 * unit.ms
print(self.value)
print(self.value.timestamp)
>>> some_function()
5.0 ms
2022-04-26T12:16:26.423016
25.0 ms
2022-04-26T12:16:26.423016
With more KaraboValues, the newest timestamp is taken.
def calculate_new(self):
encoder_position = self.encoderPosition
print(encoder_position)
print(encoder_position.timestamp)
offset = self.offset
print(offset)
print(offset.timestamp)
# Calculate the total
total = encoder_position + offset
print(total)
print(total.timestamp)
>>> some_function()
5.0 mm
2022-04-26T12:16:26.423016
17.3 mm
2022-04-26T12:17:22.423016
22.3 mm
2022-04-26T12:17:22.423016
Warning
If the units of the values cannot be casted together, there will be an exception.
Karabo Values - Comparison¶
Dealing with KaraboValues is an important fact to always remember. A very often realized pitfall is the comparison by identity (is), that fails. KaraboValues cannot be compared so singleton values by identity (None, True, False).
from karabo.middlelayer import isSet
isLocked = Bool(
defaultValue=True)
position = Double()
def check(self):
print(self.isLocked is True)
print(self.position is None)
print(self.isLocked.value is True)
print(isSet(position))
>>> check()
False
False
True
True
Exception handling¶
Exception handling in Karabo devices is important. Of course, ideally only expected or no exceptions are occuring. Please don’t use bare ``try … except` coding pattern. If an exception is triggered in a slot or reconfiguration, an error message is propagated to the gui client and a pop up is shown. If an exception is happening in a background procedure, it is visible in the logging system. Only this way errors do not propagate without knowledge.
@Slot()
async def dontDo(self):
try:
# This is a bad example
except Exception:
self.state = State.ERROR
@Slot()
async def slotItLikeAnEngineer(self):
# Work with expected exceptions
try:
position = self.positions[self.index]
except IndexError:
# do something
@Slot()
async def workingWithWaitFor(self):
# Slots are not allowed to take time
await wait_for(setWait("DEVICENOTTHERE", position=5), timeout=2)
@Slot()
async def timeoutSlot(self):
try:
await wait_for(setWait("DEVICENOTTHERE", position=5), timeout=2)
except TimeoutError:
self.status = "Device not available"
Note
The TimeoutError
must be imported from asyncio.
Numpy and KaraboValues¶
Numpy is an extensively used package for data analysis routines. When using KaraboValues, not every function is supported.
The package that supports the units is pint. In order to make our newest timestamp and unit algorithm work, the KaraboValues rely on element wise math operations. With regard to numpy, we are talking about so-called ufuncs (universal functions).
As an example, numpy.mean and numpy.std are not universal functions and won’t work properly with KaraboValues. Also pint is not yet supporting every universal function and we have found:
numpy.positive
numpy.divmod
numpy.heaviside
numpy.gcd
numpy.lcm
numpy.bitwise_and
numpy.bitwise_xor
numpy.bitwise_or
numpy.invert
numpy.left_shift
numpy.right_shift
numpy.logical_and
numpy.logical_or
numpy.logical_xor
numpy.logical_not
numpy.spacing
Ideally mathmatical operations are always factored out in an own module and unit tested.
Working with KaraboValues and numpy is discouraged and it is the developers responsibility to backup his device code with unit-tests. Numpy, pint and python dependencies are continously upgraded! However, it is possible to use the magnitudes of KaraboValues with .value in numpy functions.
Slot Calls: Reply with correct State¶
Devices come with expectations. If we call a motor to move, we expect it after the slot (move) call to be in a MOVING state.
Similar scenarios are there for all devices. Hence, we must reply with the correct State, but also very quick!
@Slot()
async def onlyAllowedInMacro(self):
self.state = State.ACTIVE
await sleep(20)
self.state = State.PASSIVE
@Slot(allowedStates=[State.PASSIVE])
async def move(self):
self.state = State.ACTIVE
background(self._long_operation())
# The Slot method ends, the reply to the user happens without error
# and the device is in ACTIVE State.
async def _long_operation(self):
await sleep(20)
self.state = State.PASSIVE
Setter knowhow and instantiation workflow¶
Devices initialize firstly their Schema (Pipeline Channel and Device Nodes) and then enter the onInitialization function. The device state by default is State.UNKNOWN.
class MyDevice(Device):
is_connected = False
@Double()
def targetPosition(self, value):
if value is None:
return
if self.is_connected:
self.send_target_hardware(value)
self.targetPosition = value
async def onInitialization(self):
# We enter as State.UNKNOWN (default)
await self.connect_hardware()
self.is_connected = True
self.state = State.ON
State Monitor: StateSignifier and background¶
A typical task of a middlelayer device is to merge a lot of device states to a single one. For this task always use the StateSignifier. This example shows how to build a state monitor. Note in this case the extra attributes assignment and accessMode.
from karabo.middlelayer import (StateSignifier, Assignment, AccessMode,
background, connectDevice, ...)
class MyDevice(Device):
mirrorXId = String(
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
mirrorYId = String(
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
def __init__(self, configuration):
super().__init__(configuration)
self.signifier = StateSignifier()
async def onInitialization(self):
devs = [connectDevice(self.mirrorXId),
connectDevice(self.mirrorYId)]
self.devices = await gather(*devs)
background(self._monitor_state())
async def _monitor_state(self):
while True:
props = [dev.state for dev in self.devices]
# The state signifier also provides the newest timestamp
state = self.signifier.returnMostSignificant(props)
if state != self.state:
# Only set a different state if necessary
self.state = state
await waitUntilNew(*props)
Pipelining: InputChannel and proxies¶
A typical task of a middlelayer device can also be to combine input channel data (fast data, pipeline data) and proxy data (slow control data). Important to know is that InputChannel connections are created in the background to not block the device instantiation.
from karabo.middlelayer import connectDevice, InputChannel
class MyDevice(Device):
is_connected = False
mirrorXId = String(
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
mirrorYId = String(
accessMode=AccessMode.INITONLY,
assignment=Assignment.MANDATORY)
def __init__(self, configuration):
super().__init__(configuration)
self.signifier = StateSignifier()
async def onInitialization(self):
# InputChannels might be already alive but we don't have the proxies
# yet
devs = [connectDevice(self.mirrorXId),
connectDevice(self.mirrorYId)]
self.devices = await gather(*devs)
self.is_connected = True
@InputChannel()
async def input(self, data, meta):
if not self.is_connected:
return
# Do something with proxy and input data
timestamp = meta.timestamp.timestamp
train_id = timestamp.tid
position = self.devices[0].actualPosition
...