Advanced Features

The following section describes the advanced features of the middlelayer api.

Complete Futures

Synchronizing to single karabo-future completion within a set of karabo-futures, or completion of all karabo-futures of the set is supported by firstCompleted, firstException, and allCompleted. As the set may contain only a single karabo-future it should not be surprising that all functions have the same argument and return signatures. Necessary arguments are the karabo-futures as position or keyword arguments and optional two keyword arguments (timeout and not cancel_pending) to modify function behaviour. The functions return three dictionaries (done, pending, and error) specifying the status of the karabo-futures when the function returns.

The first example shows a timeout wait for connections to be established to devices specified in a clients list of arbitrary length. Note that all exceptions are already caught in the function allCompleted.

async def setup(self):
    self.state = State.STARTING
    self.devices = []

    done, pending, error = await allCompleted(
        *[connectDevice(deviceId) for deviceId in self.clients],
        cancel_pending=False,
        timeout=10)

    if pending:
        self.logger.warning('Missing clients {}'.format(
              ', '.join([self.clients[k] for k in pending])))
        for k, v in pending.items():
            v.cancel()

    elif error:
        self.logger.warning('Error creating client proxy {}'.format(
              ', '.join([self.clients[k] for k in error])))
        except_entry = [(k, v.__class__.__name__)
                      for k, v in error.items() if v is not None]
        pending_entry = [k for k, v in error.items() if v is None]

    else:
        for k, v in done.items():
            self.devices.append(v)
        return

    self.state = State.UNKNOWN


from collection import ChainMap

async def setup(self):
    self.state = State.STARTING

    fut = {deviceId: connectDevice(deviceId) for deviceId in self.clients}
    fut["timeout"] = 5
    devices, *fails = await allCompleted(**fut, cancel_pending=True)
    chain = ChainMap(*fails)
    if chain:
        status = (f"Not all proxies could be connected {chain.keys()}.")
        self.state = State.UNKNOWN
    else:
        self.devices = list(devices.values())

The second example waits indefinitely for left, right and back motor device nodes to reach new target positions.

@Slot(displayedName="Move",
       description="Move to absolute position specified by targetPosition.",
       allowedStates={State.STOPPED})
async def move(self):
    left, right, back = virtualToActual(
        self.Y.targetPosition,
        self.Pitch.targetPosition,
        self.Roll.targetPosition,
        self.Length,
        self.Width)

    self.left.targetPosition = left
    self.right.targetPosition = right
    self.back.targetPosition = back

    await allCompleted(moveL=self.left.move(),
                       moveR=self.right.move(),
                       moveB=self.back.move())

The following points should be remembered when using the support functions

  • Functions wait indefinitely for their completion criteria unless the timeout keyword argument is set with the required timeout seconds, when the function will return (a TimeoutError is not thrown).

  • The returned done, pending and error dictionaries contain k,v pairs, where the key is the enumeration number (e.g. 0, 1, 2 and 3 in example 1) when the karabo-futures are specified as positional arguements, or as user specified values (e.g. “moveL”, “moveR” and “moveB” in example 2) when specified as keyword arguments.

  • The order of karabo-futures in done, pending and error returned dictionaries maintains the order of the karabo-futures of the calling arguments.

  • By default functions cancel any pending karabo-futures (cancel_pending) and append the corresponding k,v (with v = None) entry into error, before returning.

  • Karabo-futures which raise an exception have their k,v (v = Exception) entries returned in error. Example 1 shows how to build lists of exception and cancelled Karabo-futures in error.

Async Timer

There are different ways in the middlelayer to continous or repeated checks and procedures. Since Karabo 2.16.X the AsyncTimer can be used for repeating tasks.

A striking possibility is to bunch device status updates. Not only does the middlelayer bunch updates but also the Karabo Gui Server device is throttling device updates. In the code snippet below, two examples of status throttling are depicted utilizing the AsyncTimer.

from karabo.middlelayer import AsyncTimer, Device

STATUS_THROTTLE = 0.5
STATUS_THROTTLE_MAX = 2


class StackedStatus(Device):
    """This is a device that has a lot of status updates.

    The `status` property is in the base device and commonly used to provide
    information to the operator what is happening.

    In this example the status updates are concatenated and send out after
    a maximum snooze time of the timer `STATUS_THROTTLE_MAX`.

    The timer is a single shot timer.
    """

    async def onInitialization(self):
        self.stacked_status = []
        # Single shot timer example
        self.status_timer = AsyncTimer(
            self._timer_callback, timeout=STATUS_THROTTLE,
            flush_interval=STATUS_THROTTLE_MAX, single_shot=True)

    def post_status_update(self, status):
        """Cache a status and start the async timer"""
        self.stacked_status.append(status)
        # Start the timer, it will postpone by another `STATUS_THROTTLE`
        # if started already.
        self.status_timer.start()

    async def _timer_callback(self):
        self.status = "\n".join(self.stacked_status)
        self.stacked_status = []
        self.update()

    async def onDestruction(self):
        self.status_timer.stop()


class QueueStatus(Device):
    """This is a device that has a lot of status updates.

    In this example, a status list is continously emptied with a status
    throttle time of `STATUS_THROTTLE`
    """

    async def onInitialization(self):
        self.status_queue = []
        self.status_timer = AsyncTimer(
            self._timer_callback, timeout=STATUS_THROTTLE)
        # Start the timer to continously check the queue.
        self.status_timer.start()

    def queue_status_update(self, status):
        """Queue a status update for the device"""
        self.status_queue.append(status)

    async def _timer_callback(self):
        if self.status_queue:
            self.status = self.status_queue.pop(0)
            # Potential check for status changes before setting
            self.update()

    async def onDestruction(self):
        self.status_timer.stop()

Note

All AsyncTimers must be stopped before destruction of the device. A typical method is utilizing onDestruction!

Pipelining Channels

Fast or big data in Karabo is typically shared using Pipeline Channel connections. This section explores how to share data in such fashion.

The data is sent on output channels and received on input channels. An output channel can send data to several input channels on several devices, and likewise an input channel can receive data from many outputs.

Output Channels

Firstly, import the required classes:

from karabo.middlelayer import (
    AccessMode, Configurable, DaqDataType, Double, InputChannel,
    Node, OutputChannel, Type)

Then, define an output channel in your device:

output = OutputChannel(ChannelNode,
                       displayedName="Output",
                       description="Pipeline Output channel")

You’ll notice that we referenced ChannelNode. This is the schema of our output channel that defines what data we send and permits other devices to manage their expectations.

To define that schema, create a class that inherits from Configurable:

class DataNode(Configurable):
    daqDataType = DaqDataType.TRAIN

    doubleProperty = Double(
        defaultValue=0.0,
        accessMode=AccessMode.READONLY)

class ChannelNode(Configurable):
    data = Node(DataNode)

Notice that this class has a variable daqDataType defined. This is to enable the DAQ to triage the data. The type can be of either PULSE or TRAIN resolution and has to be encapsulated in the Node.

Now that the schema is defined, here’s how to send data over the output channel:

@Slot(displayedName="Send Pipeline Data")
async def sendPipeline(self):
    self.output.schema.data.doubleProperty = 3.5
    await self.output.writeData()

Alternatively, we can send a Hash without setting the property on the device:

@Slot(displayedName="Send Pipeline Raw Data")
async def sendPipelineRaw(self):
    await self.output.writeRawData(Hash('data.doubleProperty', 3.5))

The output channel can notify all the clients with an end of stream message. Typically, this information is used to change the state of the input processing device.

@Slot(displayedName="Send EndOfStream")
async def sendEndOfStream(self):
    await self.output.writeEndOfStream()

Input Channels

Receiving data from a Pipeline Channel is done by decorating a function with InputChannel:

@InputChannel(displayedName="Input")
async def input(self, data, meta):
    print("Data", data)
    print("Meta", meta)

The metadata contains information about the data, such as the source, whether the data is timestamped, and a timestamp if so.

If the device developer is interested in the bare Hash of the data, one can set the raw option to True:

@InputChannel(raw=True, displayedName="Input")
async def input(self, data, meta):
    """ Very Important Processing """

For image data it is recommended to use the raw=False option, as the middlelayer device will automatically assign an NDArray to the ImageData, accessible via:

@InputChannel(displayedName="Input")
async def input(self, data, meta):
    image = data.data.image

If it is needed to use the bare Hash in the case of ImageData, it can be converted to NDArray as:

from karabo.middlelayer import get_image_data

@InputChannel(raw=True, displayedName="Input")
async def input(self, data, meta):
    image = get_image_data(data)

It is possible to react on the endOfStream or the close signal from the output channel via:

@input.endOfStream
async def input(self, channel):
    # React on the end of stream of `channel`

@input.close
async def input(self, channel):
    # React on the close of stream of `channel`

Policies

Different policies can be set at the device level on the behaviour to adopt when data is arriving too fast on the input channel, or the consumer is too slow on the output channel. The various behaviours are:

  • queue: put the data in a queue;

  • drop: discard the data;

  • wait: create a background task that waits until the data can be sent;

  • queueDrop: cycle the data when the limit of the queue is hit

The default mode is drop for performance reasons.

The policies are the same on input channels if they are too slow for the fed data rate, but in copy mode only:

self.input.onSlowness = "drop"

Reference Implementation

A reference implementation can be found in pipelineMDL, where both receiving and sending data is shown.

Pipeline Proxy Example

In the previous section we learned that devices can have input and output channels. It is possible to access pipeline data with middlelayer proxies as well.

Output Proxies

Consider a remote device that has an OutputChannel with an equal schema as was presented in the last section,

class DataNode(Configurable):
    daqDataType = DaqDataType.TRAIN

    doubleProperty = Double(
        defaultValue=0.0,
        accessMode=AccessMode.READONLY)

class ChannelNode(Configurable):
    data = Node(DataNode)


output = OutputChannel(
    ChannelNode,
    displayedName="Output",
    description="Pipeline Output channel")

Then it is possible to create a middlelayer proxy and connect to the channel with a channel policy drop via:

from karabo.middlelayer import connectDevice, waitUntilNew

proxy = await connectDevice("device")
# This is a non-blocking connect in a background task
proxy.output.connect()

# Inspect the data
while True:
    value = proxy.output.schema.data.doubleProperty
    if value > 100:
        break
    await waitUntilNew(proxy.output.schema.data.doubleProperty)

# Disconnect if you are not interested anymore, this saves network traffic
# which other channels can use.
proxy.output.disconnect()

As can be seen, it is possible to use waitUntilNew on the properties in the output proxy. Additionally, handlers can be registered for convenient use, before connecting:

from karabo.middlelayer import Hash, connectDevice, get_timestamp

proxy = await connectDevice("deviceId")
# Register handlers (# strong reference)

async def data_handler(data, meta):
    # do something with data hash
    assert isinstance(data, Hash)

    # meta is an object containing source and timestamp information
    source = meta.source # string
    # get the timestamp object from timestamp variable
    timestamp = meta.timestamp.timestamp
    # make sure trainId is corrected
    timestamp = get_timestamp(meta.timestamp.timestamp)
    # do something

async def connect_handler(channel):
   """Connect stream handler of channel"""

async def eos_handler(channel):
   """End of stream handler of channel"""

async def close_handler(channel):
   """Close stream handler of channel"""

proxy.output.setDataHandler(data_handler)
proxy.output.setConnectHandler(connect_handler)
proxy.output.setEndOfStreamHandler(eos_handler)
proxy.output.setCloseHandler(close_handler)
proxy.output.connect()

Furthermore, from Karabo 2.16.X onwards, it is possible to reassign handlers after disconnecting. Using proxies for pipeline data is a very powerful feature and sometimes it is only needed to get a few context-specific pipeline packages. For this purpose, from Karabo 2.16.X onwards, the PipelineContext can be used.

Pipeline Context

This context represents a specific input channel connection to a karabo device and is not connected automatically, but may be connected using async with() or with()

channel = PipelineContext("deviceId:output")
async with channel:
    # wait for and retrieve exactly one data, metadata pair
    data, meta = await channel.get_data()
    source = meta.source
    timestamp = get_timestamp(meta.timestamp.timestamp)

with channel:
    await channel.get_data()

It is possible to ask for the connection status using is_alive() and wait for the pipeline connection awaiting wait_connected()

async with channel:
    if not channel.is_alive():
        await channel.wait_connected()

# Leaving the context, will disconnect
assert not channel.is_alive()

However, awaiting a connection is already implicitly done when waiting for pipeline data to arrive in get_data().

Images

Karabo has the strong capability of sending ImageData via network. The middlelayer API provides this possibility as well.

Image Element

The Image element is a helper class to provide ImageData

Along the raw pixel values it also stores useful metadata like encoding, bit depth or binning and basic transformations like flip, rotation, ROI.

This special hash Type contains an NDArray element and is constructed:

import numpy as np
from karabo.middlelayer import Configurable, Device, EncodingType

class Device(Configurable):

    image = Image(
        data=ImageData(np.zeros(shape=(10, 10), dtype=np.uint64),
                       encoding=EncodingType.GRAY),
        displayedName="Image")

Hence, the Image element can be initialized with an ImageData KaraboValue.

Alternatively, the Image element can be initialized by providing shape and dtype and the encoding:

image = Image(
    displayedName="Image"
    shape=(2600, 2000),
    dtype=UInt8,
    encoding=EncodingType.GRAY)

The dtype can be provided with a simple Karabo descriptor or the numpy dtype, e.g. numpy.uint8.

Image Data

The Karabo ImageData is supposed to provide an encapsulated NDArray.

This KaraboValue can estimate from the input array the associated attributes of the ImageData, such as binning, encoding, etc. The minimum requiremnt to initialize is a numpy array with dtype and shape.

import numpy as np
from karabo.middlelayer import ImageData

data = ImageData(np.zeros(shape=(10, 10), dtype=np.uint64))

Further attributes can be provided as keyword arguments on object creation, also on runtime. The ImageData can be set on runtime on an Image element. However, changing attributes on runtime will not alter the Schema information

class ImageData(KaraboValue):
    def __init__(self, value, *args, binning=None, encoding=None,
                 rotation=None, roiOffsets=None, dimScales=None, dimTypes=None,
                 bitsPerPixel=None, flipX=False, flipY=False, **kwargs):
  • binning [uint64]: Array or list of the binning of the image, e.g. [0, 0]

  • encoding [int32]: The encoding of the image, e.g. EncodingType.GRAY (enum)

  • rotation [int32]: The rotation of the image, either 0, 90, 180 or 270

  • roiOffsets [uin64]: Array or list of the roiOffset, e.g. [0, 0]

  • dimScales [str]: Description of the dim scales

  • dimTypes [int32]: The dimension types array or list

  • bitsPerPixel: The bits per pixel

  • flipX: boolean, either True or False

  • flipY: boolean, either True or False

Pipeline Device Example: Images and Output Channel Schema Injection

Karabo has the strong capability of sending ImageData via network. Since, sometimes the data type of the image is not know, the corresponding schema has to be injected. Please find below a short example how MDL can use image data for pipelining. It shows a trivial example how schema injection may be utilized on runtime of a device for output channels with setOutputSchema.

This is available with Karabo 2.11.0

from asyncio import sleep

import numpy as np
from karabo.middlelayer import (
    AccessMode, Configurable, DaqDataType, Device, Image, Int32, Node,
    OutputChannel, Slot, State, UInt8, UInt32, background)


def channelSchema(dtype):
    """Return the output channel schema for a `dtype`"""

    class DataNode(Configurable):
        daqDataType = DaqDataType.TRAIN

        image = Image(displayedName="Image",
                      dtype=dtype,
                      shape=(800, 600))

    class ChannelNode(Configurable):
        data = Node(DataNode)

    return ChannelNode


class ImageMDL(Device):
    output = OutputChannel(
        channelSchema(UInt32),
        displayedName="Output")

    frequency = Int32(
        displayedName="Frequency",
        defaultValue=2)

    imageSend = UInt32(
        displayedName="Packets Send",
        defaultValue=0,
        accessMode=AccessMode.READONLY)

    def __init__(self, configuration):
        super(ImageMDL, self).__init__(configuration)
        self._dtype = UInt32

    async def onInitialization(self):
        self.state = State.ON
        self._acquiring = False
        background(self._network_action())

    @Slot(allowedStates=[State.ACQUIRING])
    async def stop(self):
        self.state = State.ON
        self._acquiring = False

    @Slot(allowedStates=[State.ON])
    async def start(self):
        self.state = State.ACQUIRING
        self._acquiring = True

    @Slot(displayedName="Reset Counter")
    async def resetCounter(self):
        self.imageSend = 0

    @Slot(displayedName="Send EndOfStream", allowedStates=[State.ON])
    async def writeEndOfStream(self):
        await self.output.writeEndOfStream()

    @Slot(displayedName="Set Image data dtype")
    async def setImageDtype(self):
        """Example method to show how output channel can be changed on
        runtime"""
        dtype = UInt8 if self._dtype == UInt32 else UInt32
        self._dtype = dtype
        schema = channelSchema(dtype)
        # provide key and new schema
        await self.setOutputSchema("output", schema)

    async def _network_action(self):
        while True:
            if self._acquiring:
                output = self.output.schema.data
                # Descriptor classes have `numpy` property
                dtype = self._dtype.numpy
                image_array = np.random.randint(
                    0, 255, size=(800, 600),
                    dtype=dtype)
                output.image = image_array
                self.imageSend = self.imageSend.value + 1
                await self.output.writeData()

            await sleep(1 / self.frequency.value)

Broker Shortcut

Karabo devices are hosted on device servers. In to communicate, messages are send via the broker to either reconfigure other devices or call device slots.

However, it is possible to directly communicate and control devices on the same devices server via getLocalDevice.

This is available with Karabo 2.15.X

from karabo.middlelayer import Device, Int32, String, waitUntil


class Motor(Device):
    """This is a motor that has shared access to a controller talking
    to hardware"""

    controllerId = String()
    channelId = Int32(defaultValue=1)

    async def onInitialization(self):
        """This method is executed on instantiation"""

        def is_online():
            return self.getLocalDevice(self.controllerId.value) is not None

        await waitUntil(lambda: is_online)
        # A bit less readable example with 2 lambdas
        await waitUntil(lambda: lambda: self.getLocalDevice(
            self.controllerId.value) is not None)
        # Strong reference to the controller device
        controller = self.getLocalDevice(self.controllerId.value)
        # Call a function directly on the device object
        values = await controller.read_hardware_values(self.channelId)

Serialization

What is a Hash

The Hash is Karabo’s container, across all APIs. All data transferred over network or saved to file by Karabo is in this format. There exists two implementations: in C++, used by the C++ and Bound APIs, and the Middlelayer Python implementation. This document covers serialization details of karabo.middlelayer.Hash.

There are various ways to create a Hash:

from karabo.middlelayer import Hash

value = 'a_string'
h = Hash()
h['key'] = value

h = Hash('key', value)

dict_ = {'key': value}
h = Hash(dict_)

Hash can be considered as a supercharged OrderedDict The big difference to normal Python containers is the dot-access method. The Hash has a built-in knowledge about it containing itself. Thus, one can access subhashes by hash['key.subhash'].

It also allows to store extra metadata, called attributes linked to a datum.

Most commonly, you will find as attribute the trainId, telling when was the datum created. These attributes are also key-value pairs stored in a dictionary:

h.setAttribute('key', 'tid', 5)
h['key', 'source'] = 'mdl'

# It is possible to access a single attribute at a time:
h.getAttribute('key', 'tid')
5

h['key', 'source']
'mdl'


# Or all at once:
h.getAttributes('key')
{'tid': 5, 'source': 'mdl'}

h['key', ...]
{'tid': 5, 'source': 'mdl'}

Note

There are two ways of accessing and setting attributes. One is setAttribute and getAttribute, made to respect the C++ implementation, the other consists of using multiple keys and ellipses

With this in mind h['one', 'b'] accesses the b attribute, whereas h['one.b'] accesses the value b of the inner hash one

Note

the tid attribute is used here on purpose: it is a special attribute representing the trainId, and is always an unit64

XML Serialization

The Middlelayer API offers saveToFile and loadFromFile, which, given a Hash and a file name, will store or load the hash to XML:

from karabo.middlelayer import Hash as Mash
from karabo.middlelayer import saveToFile as save_mdl, loadFromFile as load_mdl

save_mdl(h, 'mash.xml')

This will result in an XML like the following:

<root KRB_Artificial="">
    <key KRB_Type="STRING", tid="KRB_UINT64:5" source="KRB_STRING:mdl">a_string</key>
</root>

As shown here, the tid and source are also stored as xml attributes of key. The definition of the entry for key specifies the data type (KRB_Type) and any attributes. These types (KRB_*) are specified using the types as defined in the Framework and have the values separated by a colon, and are the same type accross APIs.

The root xml node is there as marker to specify that the information is an encoded Hash.

Cross-API

As the format of a Hash is well defined, it is also possible to deserialize a Hash from another API:

from karabo.bound import Hash as Bash
from karabo.bound import saveToFile as save_bound, loadFromFile as load_bound

value = 'a_string'
bash = Bash('key', value)
bash.setAttribute('key', 'tid', 5)
bash.setAttribute('key', 'source', 'bound')

save_bound(bash, "bash.xml")

loaded = load_mdl("bash.xml")

type(loaded)
karabo.middlelayer_api.hash.Hash

loaded
Hash([('key', 'a_string')])

loaded[key, ...]
{'tid': 5, 'source': 'bound'}

Note

These examples are using both Python APIs, but the behaviour is the same with C++, which also provides saveTo and loadFrom files. These examples work from and to any API.

Note

Although the two Python APIs provide identical functionalities with similar names, their implementation differ greatly, as the Bound API uses C++ whilst the Middlelayer is pure Python, and their usage should not be mixed.

Trying to deserialize a Hash from another API does work, but serialization does not!

Binary Serialization

Binary serialization is used to send data over network. The Framework usually does the serialization, and developers needn’t think of it.

The same hash will result in a binary object:

0x01 0x00 0x00 0x00 0x03 key 0x1c 0x00 0x00 0x00 0x02 0x00 0x00 0x00 0x03
tid 0x12 0x00 0x00 0x00 0x05 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x06 source
0x1c 0x00 0x00 0x00 0x03 0x00 0x00 0x00 mdl 0x08 0x00 0x00 0x00 a_string

Which is decomposed as follows:

0x01 0x00 0x00 0x00                           # header, indicating how many entries in hash, here 1
0x03 key                                      # the first byte define the length of the key, here of length 3 (k, e, and y), followed by its value
0x1c 0x00 0x00 0x00                           # the type of the value for `key`, a string
0x02 0x00 0x00 0x00                           # 2 attributes!
    0x03 tid                                  # the length of the first attribute key, followed by its value
    0x12 0x00 0x00 0x00                       # the type of the `tid` attribute, uint64
    0x05 0x00 0x00 0x00 0x00 0x00 0x00 0x00   # tid, with a value of 5
    0x06 source                               # the length of the second attribute key, followed by its value
    0x1c 0x00 0x00 0x00                       # the type of the `source` attribute
    0x03 0x00 0x00 0x00 mdl                   # the length of the value of `source` and the value itself †
0x08 0x00 0x00 0x00                           # the length of the value for `key`
a_string                                      # the value of the string for the `key` key.

†: The reason why the length field of the mdl value is an uint32, as opposed to the length field for one of the keys, which are uint8, is that it is a value.

Warning

A Hash can contain keys of any length. However, the binary serialization only allowd keys up to 255 bytes. An error will be thrown for longer keys.

Cross-API

As with xml, all APIs understand the binary format:

from karabo.bound import BinarySerializerHash, Hash as Bash
from karabo.middlelayer import decodeBinary, encodeBinary

value = 'a_string'
bash = Bash('key', value)
bash.setAttribute('key', 'tid', 5)
bash.setAttribute('key', 'source', 'bound')

serializer = BinarySerializerHash.create(Bash('Bin'))
bound_binary = serializer.save(bash)  # Results in the binary explained above

loaded = decodeBinary(bound_binary)

type(loaded)
karabo.middlelayer_api.hash.Hash

loaded
Hash([('key', 'a_string')])

loaded[key, ...]
{'tid': 5, 'source': 'bound'}

Going from Middlelayer to Bound would be:

mdl_binary = encodeBinary(h)
loaded = serializer.load(mdl_binary)

type(loaded)
karathon.Hash

Table Element

In order to be serialized, a VectorHash needs to be put within a hash first. If your device has a table called table as one of its properties, then it would be serialized as such:

h = Hash()
value, attrs = self.table.descriptor.toDataAndAttrs(self.table)
h['table'] = value
h['table', ...] = attrs

Then h can be serialized.

To restore it:

value = h['table']
attrs = h['table', ...]

table = self.table.descriptor.toKaraboValue(value, attrs)
setattr(self, 'table', table)