Middlelayer Device with Proxies

MonitorMotor is a middle layer device documenting best practice for monitoring a single remote device, that is, a device written with either of the C++ or Python API.

In this example, the device will initialise a connection with a remote motor device, restart the connection if the remote device disappears or resets, and display the motorPosition integer property of that device.

This example introduces the concepts of Device, connectDevice, isAlive, waitUntilNew, and wait_for.

Start simple: A single proxy

We recapitulate our knowledge and start simple by creating our device class, inheriting from Device:

from karabo.middlelayer import Device, State

class MonitorMotor(Device):

    def __init__(self, configuration):
        super(MonitorMotor, self).__init__(configuration)

    async def onInitialization(self):
        self.state = State.INIT

Device is the base class for all middle layer devices. It inherits from Configurable and thus you can define expected parameters for it.

Connecting to the Remote Device

To connect to the remote device, we must have its control address. In this example, it is registered as “SA1_XTD9_MONO/MOTOR/X”.

We must first import the connectDevice() function:

from karabo.middlelayer import connectDevice, Device, State

REMOTE_ADDRESS = "SA1_XTD9_MONO/MOTOR/X"

Device are typically connected to only once during the initialisation, using karabo.middlelayer.connectDevice():

def __init__(self, configuration):
    super(MonitorRemote, self).__init__(configuration)
    self.remoteDevice = None

async def onInitialization(self):
    self.state = State.INIT
    self.status = "Waiting for external device"
    self.remoteDevice = await connectDevice(REMOTE_ADDRESS)
    self.status = "Connection established"
    self.state = State.STOPPED

This function keeps the connection open until explicitly closing it. For a more local and temporary usage, karabo.middlelayer.getDevice(), can be used:

with (await getDevice(REMOTE_ADDRESS)) as remote_device:
    print(remote_device.property)

Note

The async with statement is supported from Karabo 2.13.0 onwards.

Continuous Monitoring

You now have a connection to a remote device! You may start awaiting its updates by defining a slot and using the waitUntilNew function

from karabo.middlelayer import connectDevice, State, waitUntilNew
...

@Slot(displayedName="Start",
      description="Start monitoring the remote device",
      allowedStates={State.OFF})
async def start(self):
    self.state = State.ON
    while True:
        await waitUntilNew(self.remoteDevice.remoteValue)
        print(self.remoteDevice.remoteValue)

By awaiting the waitUnitNew() coroutine, a non-blocking wait for the updated value of the property is executed before proceeding to the print statement.

Note

It may happen that the remote device gets reinitialized, e.g. the underlying device of the proxy is gone, such as after a server restart. The proxy will automatically switch the state property to State.UNKNOWN once the device is gone and reestablish all connections when it comes back.

Grow stronger: Several proxies in a device

Now that a device can be remotely monitored, and the connection kept alive, let’s see how to connect to several devices at once, and then control them.

In this example, we will build upon the previous chapter and initialise several connections with three remote motor devices, get their positions, and set them to a specific position.

The concepts of gather, background are introduced here.

Multiple Connection Handling

In order to handle several devices, we must make a few changes to the watchdog and reconnection coroutines.

Let us define three motors we want to monitor and control:

By using asyncio.gather() and karabo.middlelayer.background(), we simultaneously execute all the tasks in devices_to_connect and await their outcomes.

Monitoring Multiple Sources

Monitoring multiple resources is done very much the same way as monitoring a single one, passing a list of devices as a starred expression:

async def monitorPosition(self):
    while True:
        positions_list = [dev.position for dev in self.devices]
        await waitUntilNew(*positions_list)

        motorPos1 = self.devices[0].position
        motorPos2 = self.devices[1].position
        motorPos3 = self.devices[2].position

Controlling Multiple Sources

Setting properties of a device is done directly by assigning the property a value, for instance:

self.remoteMotor.targetPosition = 42

This guarantees to set the property. It is possible, however, to do a blocking wait, using setWait():

await setWait(device, targetPosition=42)

It may be desirable to do so, when the parameter needs to be set before further action should be taken. In this example, setting the desired target position is done with setWait such that we proceed to moving the motor only after the device has acknowledged the new target position.

As with properties, functions are directly called. To move the motor to the aforementioned position, await the move() function:

await self.remoteMotor.move()

Once the parameters are set, karabo.middlelayer.background() can be used to run the task:

background(self.remoteMotor.move())

This will create a KaraboFuture object of which the status can easily be tracked or cancelled.

As with reconnections, expending this methodology to cover several devices is done using gather():

async def moveSeveral(self, positions):
    futures = []

    for device, position in zip(self.devices, positions):
        await setWait(device, targetPosition=position)
        futures.append(device.move())

    await gather(*futures)

Exception Handling with Multiple Sources

A problem that now arises is handling exception should one of the motors develop an unexpected behaviour or, more commonly, a user cancelling the task. Cancellation raises an asyncio.CancelledError, thus extending the above function with a try-except:

async def moveSeveral(self, positions):
    futures = []
    for device, position in zip(self.devices, positions):
        await setWait(device, targetPosition=position)
        futures.append(device.move())
    try:
        await gather(*futures)
        await self.guardian_yield(self.devices)
    except CancelledError:
        toCancel = [device.stop() for device in self.devices
                    if device.state == State.MOVING]
        await gather(*toCancel)

Note

Note that the appropriate policy to adopt is left to the device developer.

The try-except introduces a guardian_yield() function. This is required in order to remain within the try statement, such that any cancellation happening whilst executing the futures, will be caught by the except.

The suggested solution for the guardian yield is to wait until all the device go from their busy state (State.MOVING) to their idle (State.ON) as follows:

async def guardian_yield(self, devices):
    await waitUntil(lambda: all(dev.state == State.ON for dev in devices))

Device Nodes

Next to use connectDevice() or getDevice() it is possible in middle layer device development to use a DeviceNode.

Note

DeviceNodes are MANDATORY properties and can only be set before instantiation, e.g. are INITONLY!

In contrast to a regular Access.MANDATORY definition, a DeviceNode also does not accept an empty string. Defining a remote device is done as follows:

motor1Proxy = DeviceNode(displayedName="RemoteDevice",
                         description="Remote motor 1")

motor2Proxy = DeviceNode(displayedName="RemoteDevice",
                         description="Remote motor 2")

motor3Proxy = DeviceNode(displayedName="RemoteDevice",
                         description="Remote motor 3")

However, a DeviceNode must connect within a certain timeout (default is 2 seconds ) to the configured remote device, otherwise the device holding a DeviceNode shuts itself down.

Accessing, setting, and waiting for updates from DeviceNode is similar to what was done previously.

Monitoring the position of these three motors is done so:

positions_list = [dev.position for dev in [motor1Proxy,
                                           motor2Proxy,
                                           motor3Proxy]]
await waitUntilNew(*positions_list)

Setting a parameter is a simple assignment but calling a function needs to be awaited:

self.motor1Proxy.targetPosition = 42
await self.motor1Proxy.move()

Doing so for several devices is done using gather():

async def moveSeveral(self, positions):
    futures = []

    for device, position in zip(self.devices, positions):
        await setWait(device, targetPosition=position)
        futures.append(device.move())

    await gather(*futures)

Function and parameter calls are now exactly as they were when using getDevice() or connectDevice(), but now details regarding the connection to remote devices are left to the middle layer API.

The DeviceNode holds a proxy to a remote device. During initialization, the DeviceNodes try to establish a proxy connection. Once connected, the deviceId of the remote device can be represented, but internally the proxy is retrieved. For this, a connection must be established. If this cannot be guaranteed within a certain time, the device holding the device nodes will shut itself down.

A timeout parameter of up to 5 seconds can be provided.

motor1Proxy = DeviceNode(displayedName="RemoteDevice",
                         description="Remote motor 1",
                         timeout=4.5)

The following devices implement the functionalities described above in a working environment, and can be considered reference implementations:

  • fastValve is a middle layer device interfacing with several remote devices

    through the use of DeviceNode