Advanced Features¶
The following section describes the advanced features of the middlelayer api.
Complete Futures¶
Synchronizing to single karabo-future completion within a set of karabo-futures, or completion of all karabo-futures of the set is supported by firstCompleted, firstException, and allCompleted. As the set may contain only a single karabo-future it should not be surprising that all functions have the same argument and return signatures. Necessary arguments are the karabo-futures as position or keyword arguments and optional two keyword arguments (timeout and not cancel_pending) to modify function behaviour. The functions return three dictionaries (done, pending, and error) specifying the status of the karabo-futures when the function returns.
The first example shows a timeout wait for connections to be established to devices specified in a clients list of arbitrary length. Note that all exceptions are already caught in the function allCompleted.
async def setup(self):
self.state = State.STARTING
self.devices = []
done, pending, error = await allCompleted(
*[connectDevice(deviceId) for deviceId in self.clients],
cancel_pending=False,
timeout=10)
if pending:
self.logger.warning('Missing clients {}'.format(
', '.join([self.clients[k] for k in pending])))
for k, v in pending.items():
v.cancel()
elif error:
self.logger.warning('Error creating client proxy {}'.format(
', '.join([self.clients[k] for k in error])))
except_entry = [(k, v.__class__.__name__)
for k, v in error.items() if v is not None]
pending_entry = [k for k, v in error.items() if v is None]
else:
for k, v in done.items():
self.devices.append(v)
return
self.state = State.UNKNOWN
from collection import ChainMap
async def setup(self):
self.state = State.STARTING
fut = {deviceId: connectDevice(deviceId) for deviceId in self.clients}
fut["timeout"] = 5
devices, *fails = await allCompleted(**fut, cancel_pending=True)
chain = ChainMap(*fails)
if chain:
status = (f"Not all proxies could be connected {chain.keys()}.")
self.state = State.UNKNOWN
else:
self.devices = list(devices.values())
The second example waits indefinitely for left, right and back motor device nodes to reach new target positions.
@Slot(displayedName="Move",
description="Move to absolute position specified by targetPosition.",
allowedStates={State.STOPPED})
async def move(self):
left, right, back = virtualToActual(
self.Y.targetPosition,
self.Pitch.targetPosition,
self.Roll.targetPosition,
self.Length,
self.Width)
self.left.targetPosition = left
self.right.targetPosition = right
self.back.targetPosition = back
await allCompleted(moveL=self.left.move(),
moveR=self.right.move(),
moveB=self.back.move())
The following points should be remembered when using the support functions
Functions wait indefinitely for their completion criteria unless the timeout keyword argument is set with the required timeout seconds, when the function will return (a TimeoutError is not thrown).
The returned done, pending and error dictionaries contain k,v pairs, where the key is the enumeration number (e.g. 0, 1, 2 and 3 in example 1) when the karabo-futures are specified as positional arguements, or as user specified values (e.g. “moveL”, “moveR” and “moveB” in example 2) when specified as keyword arguments.
The order of karabo-futures in done, pending and error returned dictionaries maintains the order of the karabo-futures of the calling arguments.
By default functions cancel any pending karabo-futures (cancel_pending) and append the corresponding k,v (with v = None) entry into error, before returning.
Karabo-futures which raise an exception have their k,v (v = Exception) entries returned in error. Example 1 shows how to build lists of exception and cancelled Karabo-futures in error.
Async Timer¶
There are different ways in the middlelayer to continous or repeated checks and procedures. Since Karabo 2.16.X the AsyncTimer can be used for repeating tasks.
A striking possibility is to bunch device status updates. Not only does the middlelayer bunch updates but also the Karabo Gui Server device is throttling device updates. In the code snippet below, two examples of status throttling are depicted utilizing the AsyncTimer.
from karabo.middlelayer import AsyncTimer, Device
STATUS_THROTTLE = 0.5
STATUS_THROTTLE_MAX = 2
class StackedStatus(Device):
"""This is a device that has a lot of status updates.
The `status` property is in the base device and commonly used to provide
information to the operator what is happening.
In this example the status updates are concatenated and send out after
a maximum snooze time of the timer `STATUS_THROTTLE_MAX`.
The timer is a single shot timer.
"""
async def onInitialization(self):
self.stacked_status = []
# Single shot timer example
self.status_timer = AsyncTimer(
self._timer_callback, timeout=STATUS_THROTTLE,
flush_interval=STATUS_THROTTLE_MAX, single_shot=True)
def post_status_update(self, status):
"""Cache a status and start the async timer"""
self.stacked_status.append(status)
# Start the timer, it will postpone by another `STATUS_THROTTLE`
# if started already.
self.status_timer.start()
async def _timer_callback(self):
self.status = "\n".join(self.stacked_status)
self.stacked_status = []
self.update()
async def onDestruction(self):
self.status_timer.stop()
class QueueStatus(Device):
"""This is a device that has a lot of status updates.
In this example, a status list is continously emptied with a status
throttle time of `STATUS_THROTTLE`
"""
async def onInitialization(self):
self.status_queue = []
self.status_timer = AsyncTimer(
self._timer_callback, timeout=STATUS_THROTTLE)
# Start the timer to continously check the queue.
self.status_timer.start()
def queue_status_update(self, status):
"""Queue a status update for the device"""
self.status_queue.append(status)
async def _timer_callback(self):
if self.status_queue:
self.status = self.status_queue.pop(0)
# Potential check for status changes before setting
self.update()
async def onDestruction(self):
self.status_timer.stop()
Note
All AsyncTimers must be stopped before destruction of the device. A typical method is utilizing onDestruction!
Pipelining Channels¶
Fast or big data in Karabo is typically shared using Pipeline Channel connections. This section explores how to share data in such fashion.
The data is sent on output channels and received on input channels. An output channel can send data to several input channels on several devices, and likewise an input channel can receive data from many outputs.
Output Channels¶
Firstly, import the required classes:
from karabo.middlelayer import (
AccessMode, Configurable, DaqDataType, Double, InputChannel,
Node, OutputChannel, Type)
Then, define an output channel in your device:
output = OutputChannel(ChannelNode,
displayedName="Output",
description="Pipeline Output channel")
You’ll notice that we referenced ChannelNode. This is the schema of our output channel that defines what data we send and permits other devices to manage their expectations.
To define that schema, create a class that inherits from Configurable:
class DataNode(Configurable):
daqDataType = DaqDataType.TRAIN
doubleProperty = Double(
defaultValue=0.0,
accessMode=AccessMode.READONLY)
class ChannelNode(Configurable):
data = Node(DataNode)
Notice that this class has a variable daqDataType defined. This is to enable the DAQ to triage the data. The type can be of either PULSE or TRAIN resolution and has to be encapsulated in the Node.
Now that the schema is defined, here’s how to send data over the output channel:
@Slot(displayedName="Send Pipeline Data")
async def sendPipeline(self):
self.output.schema.data.doubleProperty = 3.5
await self.output.writeData()
Alternatively, we can send a Hash without setting the property on the device:
@Slot(displayedName="Send Pipeline Raw Data")
async def sendPipelineRaw(self):
await self.output.writeRawData(Hash('data.doubleProperty', 3.5))
The output channel can notify all the clients with an end of stream message. Typically, this information is used to change the state of the input processing device.
@Slot(displayedName="Send EndOfStream")
async def sendEndOfStream(self):
await self.output.writeEndOfStream()
Input Channels¶
Receiving data from a Pipeline Channel is done by decorating a function with InputChannel:
@InputChannel(displayedName="Input")
async def input(self, data, meta):
print("Data", data)
print("Meta", meta)
The metadata contains information about the data, such as the source, whether the data is timestamped, and a timestamp if so.
If the device developer is interested in the bare Hash of the data, one can set the raw option to True:
@InputChannel(raw=True, displayedName="Input")
async def input(self, data, meta):
""" Very Important Processing """
For image data it is recommended to use the raw=False option, as the middlelayer device will automatically assign an NDArray to the ImageData, accessible via:
@InputChannel(displayedName="Input")
async def input(self, data, meta):
image = data.data.image
If it is needed to use the bare
Hash in the case of ImageData, it can be converted to NDArray as:
from karabo.middlelayer import get_image_data
@InputChannel(raw=True, displayedName="Input")
async def input(self, data, meta):
image = get_image_data(data)
It is possible to react on the endOfStream or the close signal from the output channel via:
@input.endOfStream
async def input(self, channel):
# React on the end of stream of `channel`
@input.close
async def input(self, channel):
# React on the close of stream of `channel`
Policies¶
Different policies can be set at the device level on the behaviour to adopt when data is arriving too fast on the input channel, or the consumer is too slow on the output channel. The various behaviours are:
queue: put the data in a queue;
drop: discard the data;
wait: create a background task that waits until the data can be sent;
queueDrop: cycle the data when the limit of the queue is hit
The default mode is drop for performance reasons.
The policies are the same on input channels if they are too slow for the fed data rate, but in copy mode only:
self.input.onSlowness = "drop"
Reference Implementation¶
A reference implementation can be found in pipelineMDL, where both receiving and sending data is shown.
Pipeline Proxy Example¶
In the previous section we learned that devices can have input and output channels. It is possible to access pipeline data with middlelayer proxies as well.
Output Proxies¶
Consider a remote device that has an OutputChannel with an equal schema as was presented in the last section,
class DataNode(Configurable):
daqDataType = DaqDataType.TRAIN
doubleProperty = Double(
defaultValue=0.0,
accessMode=AccessMode.READONLY)
class ChannelNode(Configurable):
data = Node(DataNode)
output = OutputChannel(
ChannelNode,
displayedName="Output",
description="Pipeline Output channel")
Then it is possible to create a middlelayer proxy and connect to the channel with a channel policy drop via:
from karabo.middlelayer import connectDevice, waitUntilNew
proxy = await connectDevice("device")
# This is a non-blocking connect in a background task
proxy.output.connect()
# Inspect the data
while True:
value = proxy.output.schema.data.doubleProperty
if value > 100:
break
await waitUntilNew(proxy.output.schema.data.doubleProperty)
# Disconnect if you are not interested anymore, this saves network traffic
# which other channels can use.
proxy.output.disconnect()
As can be seen, it is possible to use waitUntilNew on the properties in the output proxy. Additionally, handlers can be registered for convenient use, before connecting:
from karabo.middlelayer import Hash, connectDevice, get_timestamp
proxy = await connectDevice("deviceId")
# Register handlers (# strong reference)
async def data_handler(data, meta):
# do something with data hash
assert isinstance(data, Hash)
# meta is an object containing source and timestamp information
source = meta.source # string
# get the timestamp object from timestamp variable
timestamp = meta.timestamp.timestamp
# make sure trainId is corrected
timestamp = get_timestamp(meta.timestamp.timestamp)
# do something
async def connect_handler(channel):
"""Connect stream handler of channel"""
async def eos_handler(channel):
"""End of stream handler of channel"""
async def close_handler(channel):
"""Close stream handler of channel"""
proxy.output.setDataHandler(data_handler)
proxy.output.setConnectHandler(connect_handler)
proxy.output.setEndOfStreamHandler(eos_handler)
proxy.output.setCloseHandler(close_handler)
proxy.output.connect()
Furthermore, from Karabo 2.16.X onwards, it is possible to reassign handlers after disconnecting. Using proxies for pipeline data is a very powerful feature and sometimes it is only needed to get a few context-specific pipeline packages. For this purpose, from Karabo 2.16.X onwards, the PipelineContext can be used.
Pipeline Context¶
This context represents a specific input channel connection to a karabo device
and is not connected automatically, but may be connected using async with()
or with()
channel = PipelineContext("deviceId:output")
async with channel:
# wait for and retrieve exactly one data, metadata pair
data, meta = await channel.get_data()
source = meta.source
timestamp = get_timestamp(meta.timestamp.timestamp)
with channel:
await channel.get_data()
It is possible to ask for the connection status using is_alive()
and
wait for the pipeline connection awaiting wait_connected()
async with channel:
if not channel.is_alive():
await channel.wait_connected()
# Leaving the context, will disconnect
assert not channel.is_alive()
However, awaiting a connection is already implicitly done when waiting
for pipeline data to arrive in get_data()
.
Images¶
Karabo has the strong capability of sending ImageData
via network. The middlelayer
API provides this possibility as well.
Image Element¶
The Image
element is a helper class to provide ImageData
Along the raw pixel values it also stores useful metadata like encoding, bit depth or binning and basic transformations like flip, rotation, ROI.
This special hash Type
contains an NDArray
element and is constructed:
import numpy as np
from karabo.middlelayer import Configurable, Device, EncodingType
class Device(Configurable):
image = Image(
data=ImageData(np.zeros(shape=(10, 10), dtype=np.uint64),
encoding=EncodingType.GRAY),
displayedName="Image")
Hence, the Image element can be initialized with an ImageData KaraboValue.
Alternatively, the Image element can be initialized by providing shape and dtype and the encoding:
image = Image(
displayedName="Image"
shape=(2600, 2000),
dtype=UInt8,
encoding=EncodingType.GRAY)
The dtype can be provided with a simple Karabo descriptor or the numpy dtype, e.g. numpy.uint8.
Image Data¶
The Karabo ImageData
is supposed to provide an encapsulated NDArray.
This KaraboValue
can estimate from the input array the associated
attributes of the ImageData, such as binning, encoding, etc. The minimum requiremnt
to initialize is a numpy array with dtype and shape.
import numpy as np
from karabo.middlelayer import ImageData
data = ImageData(np.zeros(shape=(10, 10), dtype=np.uint64))
Further attributes can be provided as keyword arguments on object creation, also
on runtime. The ImageData
can be set on runtime on an Image
element.
However, changing attributes on runtime will not alter the Schema information
class ImageData(KaraboValue):
def __init__(self, value, *args, binning=None, encoding=None,
rotation=None, roiOffsets=None, dimScales=None, dimTypes=None,
bitsPerPixel=None, flipX=False, flipY=False, **kwargs):
binning [uint64]: Array or list of the binning of the image, e.g. [0, 0]
encoding [int32]: The encoding of the image, e.g. EncodingType.GRAY (enum)
rotation [int32]: The rotation of the image, either 0, 90, 180 or 270
roiOffsets [uin64]: Array or list of the roiOffset, e.g. [0, 0]
dimScales [str]: Description of the dim scales
dimTypes [int32]: The dimension types array or list
bitsPerPixel: The bits per pixel
flipX: boolean, either True or False
flipY: boolean, either True or False
Pipeline Device Example: Images and Output Channel Schema Injection¶
Karabo has the strong capability of sending ImageData
via network. Since, sometimes
the data type of the image is not know, the corresponding schema has to be injected.
Please find below a short example how MDL can use image data for pipelining. It shows
a trivial example how schema injection may be utilized on runtime of a device
for output channels with setOutputSchema.
This is available with Karabo 2.11.0
from asyncio import sleep
import numpy as np
from karabo.middlelayer import (
AccessMode, Configurable, DaqDataType, Device, Image, Int32, Node,
OutputChannel, Slot, State, UInt8, UInt32, background)
def channelSchema(dtype):
"""Return the output channel schema for a `dtype`"""
class DataNode(Configurable):
daqDataType = DaqDataType.TRAIN
image = Image(displayedName="Image",
dtype=dtype,
shape=(800, 600))
class ChannelNode(Configurable):
data = Node(DataNode)
return ChannelNode
class ImageMDL(Device):
output = OutputChannel(
channelSchema(UInt32),
displayedName="Output")
frequency = Int32(
displayedName="Frequency",
defaultValue=2)
imageSend = UInt32(
displayedName="Packets Send",
defaultValue=0,
accessMode=AccessMode.READONLY)
def __init__(self, configuration):
super(ImageMDL, self).__init__(configuration)
self._dtype = UInt32
async def onInitialization(self):
self.state = State.ON
self._acquiring = False
background(self._network_action())
@Slot(allowedStates=[State.ACQUIRING])
async def stop(self):
self.state = State.ON
self._acquiring = False
@Slot(allowedStates=[State.ON])
async def start(self):
self.state = State.ACQUIRING
self._acquiring = True
@Slot(displayedName="Reset Counter")
async def resetCounter(self):
self.imageSend = 0
@Slot(displayedName="Send EndOfStream", allowedStates=[State.ON])
async def writeEndOfStream(self):
await self.output.writeEndOfStream()
@Slot(displayedName="Set Image data dtype")
async def setImageDtype(self):
"""Example method to show how output channel can be changed on
runtime"""
dtype = UInt8 if self._dtype == UInt32 else UInt32
self._dtype = dtype
schema = channelSchema(dtype)
# provide key and new schema
await self.setOutputSchema("output", schema)
async def _network_action(self):
while True:
if self._acquiring:
output = self.output.schema.data
# Descriptor classes have `numpy` property
dtype = self._dtype.numpy
image_array = np.random.randint(
0, 255, size=(800, 600),
dtype=dtype)
output.image = image_array
self.imageSend = self.imageSend.value + 1
await self.output.writeData()
await sleep(1 / self.frequency.value)
Broker Shortcut¶
Karabo devices are hosted on device servers. In to communicate, messages are send via the broker to either reconfigure other devices or call device slots.
However, it is possible to directly communicate and control devices on the same devices server via getLocalDevice.
This is available with Karabo 2.15.X
from karabo.middlelayer import Device, Int32, String, waitUntil
class Motor(Device):
"""This is a motor that has shared access to a controller talking
to hardware"""
controllerId = String()
channelId = Int32(defaultValue=1)
async def onInitialization(self):
"""This method is executed on instantiation"""
def is_online():
return self.getLocalDevice(self.controllerId.value) is not None
await waitUntil(lambda: is_online)
# A bit less readable example with 2 lambdas
await waitUntil(lambda: lambda: self.getLocalDevice(
self.controllerId.value) is not None)
# Strong reference to the controller device
controller = self.getLocalDevice(self.controllerId.value)
# Call a function directly on the device object
values = await controller.read_hardware_values(self.channelId)
Serialization¶
What is a Hash¶
The Hash
is Karabo’s container, across all APIs. All data transferred
over network or saved to file by Karabo is in this format.
There exists two implementations: in C++, used by the C++ and Bound APIs, and
the Middlelayer Python implementation.
This document covers serialization details of karabo.middlelayer.Hash
.
There are various ways to create a Hash
:
from karabo.middlelayer import Hash
value = 'a_string'
h = Hash()
h['key'] = value
h = Hash('key', value)
dict_ = {'key': value}
h = Hash(dict_)
Hash
can be considered as a supercharged OrderedDict
The big difference to normal Python containers is the dot-access method.
The Hash
has a built-in knowledge about it containing itself.
Thus, one can access subhashes by hash['key.subhash']
.
It also allows to store extra metadata, called attributes linked to a datum.
Most commonly, you will find as attribute the trainId, telling when was the datum created. These attributes are also key-value pairs stored in a dictionary:
h.setAttribute('key', 'tid', 5)
h['key', 'source'] = 'mdl'
# It is possible to access a single attribute at a time:
h.getAttribute('key', 'tid')
5
h['key', 'source']
'mdl'
# Or all at once:
h.getAttributes('key')
{'tid': 5, 'source': 'mdl'}
h['key', ...]
{'tid': 5, 'source': 'mdl'}
Note
There are two ways of accessing and setting attributes. One is setAttribute and getAttribute, made to respect the C++ implementation, the other consists of using multiple keys and ellipses
With this in mind h['one', 'b']
accesses the b attribute, whereas
h['one.b']
accesses the value b of the inner hash one
Note
the tid attribute is used here on purpose: it is a special attribute representing the trainId, and is always an unit64
XML Serialization¶
The Middlelayer API offers saveToFile and loadFromFile, which,
given a Hash
and a file name, will store or load the hash to XML:
from karabo.middlelayer import Hash as Mash
from karabo.middlelayer import saveToFile as save_mdl, loadFromFile as load_mdl
save_mdl(h, 'mash.xml')
This will result in an XML like the following:
<root KRB_Artificial="">
<key KRB_Type="STRING", tid="KRB_UINT64:5" source="KRB_STRING:mdl">a_string</key>
</root>
As shown here, the tid and source are also stored as xml attributes of key. The definition of the entry for key specifies the data type (KRB_Type) and any attributes. These types (KRB_*) are specified using the types as defined in the Framework and have the values separated by a colon, and are the same type accross APIs.
The root xml node is there as marker to specify that the information is an
encoded Hash
.
Cross-API¶
As the format of a Hash is well defined, it is also possible to deserialize a Hash from another API:
from karabo.bound import Hash as Bash
from karabo.bound import saveToFile as save_bound, loadFromFile as load_bound
value = 'a_string'
bash = Bash('key', value)
bash.setAttribute('key', 'tid', 5)
bash.setAttribute('key', 'source', 'bound')
save_bound(bash, "bash.xml")
loaded = load_mdl("bash.xml")
type(loaded)
karabo.middlelayer_api.hash.Hash
loaded
Hash([('key', 'a_string')])
loaded[key, ...]
{'tid': 5, 'source': 'bound'}
Note
These examples are using both Python APIs, but the behaviour is the same with C++, which also provides saveTo and loadFrom files. These examples work from and to any API.
Note
Although the two Python APIs provide identical functionalities with similar names, their implementation differ greatly, as the Bound API uses C++ whilst the Middlelayer is pure Python, and their usage should not be mixed.
Trying to deserialize a Hash from another API does work, but serialization does not!
Binary Serialization¶
Binary serialization is used to send data over network. The Framework usually does the serialization, and developers needn’t think of it.
The same hash will result in a binary object:
0x01 0x00 0x00 0x00 0x03 key 0x1c 0x00 0x00 0x00 0x02 0x00 0x00 0x00 0x03
tid 0x12 0x00 0x00 0x00 0x05 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x06 source
0x1c 0x00 0x00 0x00 0x03 0x00 0x00 0x00 mdl 0x08 0x00 0x00 0x00 a_string
Which is decomposed as follows:
0x01 0x00 0x00 0x00 # header, indicating how many entries in hash, here 1
0x03 key # the first byte define the length of the key, here of length 3 (k, e, and y), followed by its value
0x1c 0x00 0x00 0x00 # the type of the value for `key`, a string
0x02 0x00 0x00 0x00 # 2 attributes!
0x03 tid # the length of the first attribute key, followed by its value
0x12 0x00 0x00 0x00 # the type of the `tid` attribute, uint64
0x05 0x00 0x00 0x00 0x00 0x00 0x00 0x00 # tid, with a value of 5
0x06 source # the length of the second attribute key, followed by its value
0x1c 0x00 0x00 0x00 # the type of the `source` attribute
0x03 0x00 0x00 0x00 mdl # the length of the value of `source` and the value itself †
0x08 0x00 0x00 0x00 # the length of the value for `key`
a_string # the value of the string for the `key` key.
†: The reason why the length field of the mdl value is an uint32, as opposed to the length field for one of the keys, which are uint8, is that it is a value.
Warning
A Hash
can contain keys of any length. However, the binary
serialization only allowd keys up to 255 bytes. An error will be thrown
for longer keys.
Cross-API¶
As with xml, all APIs understand the binary format:
from karabo.bound import BinarySerializerHash, Hash as Bash
from karabo.middlelayer import decodeBinary, encodeBinary
value = 'a_string'
bash = Bash('key', value)
bash.setAttribute('key', 'tid', 5)
bash.setAttribute('key', 'source', 'bound')
serializer = BinarySerializerHash.create(Bash('Bin'))
bound_binary = serializer.save(bash) # Results in the binary explained above
loaded = decodeBinary(bound_binary)
type(loaded)
karabo.middlelayer_api.hash.Hash
loaded
Hash([('key', 'a_string')])
loaded[key, ...]
{'tid': 5, 'source': 'bound'}
Going from Middlelayer to Bound would be:
mdl_binary = encodeBinary(h)
loaded = serializer.load(mdl_binary)
type(loaded)
karathon.Hash
Table Element¶
In order to be serialized, a VectorHash
needs to be put within a hash
first. If your device has a table called table as one of its properties, then
it would be serialized as such:
h = Hash()
value, attrs = self.table.descriptor.toDataAndAttrs(self.table)
h['table'] = value
h['table', ...] = attrs
Then h can be serialized.
To restore it:
value = h['table']
attrs = h['table', ...]
table = self.table.descriptor.toKaraboValue(value, attrs)
setattr(self, 'table', table)