C++ API

The karabo::core Namespace

namespace karabo::core

Namespace for package core

Enums

enum Capabilities

Values:

PROVIDES_SCENES = (1u << 0)
PROVIDES_MACROS = (1u << 1)
PROVIDES_INTERFACES = (1u << 2)
enum Interfaces

Values:

Motor = (1u << 0)
MultiAxisMotor = (1u << 1)
Trigger = (1u << 2)
Camera = (1u << 3)
Processor = (1u << 4)
DeviceInstantiator = (1u << 5)
enum ServerFlags

Values:

DEVELOPMENT = (1u << 0)

Functions

const std::string karabo::core::DEFAULT_CONFIG_MANAGER_ID("KaraboConfigurationManager")

Variables

const int kMaxCompleteInitializationAttempts = 2500
template <typename T>
class BaseWorker
#include <Worker.hh>

WorkerBase class contains one queue: request and can start auxiliary thread that will run on opposite end of the queue: Main thread Queue Auxiliary thread Methods push(…) > request > receive(…)

Public Functions

BaseWorker(const std::function<void()> &callbackint timeout = -1, int repetition = -1, )

Construct worker with callback and time and repetition parameters

Parameters
  • callback: this function will be called periodically
  • timeout: time in milliseconds auxiliary thread is waiting on the request queue; 0 means nowait mode; ,-1 means waiting forever
  • repetition: -1 means cycling forever; >0 means number of cycles.

BaseWorker &set(const std::function<void()> &callbackint timeout = -1, int repetition = -1, )

Set parameters defining the behavior of the worker

Parameters
  • callback: function to be called will boolean parameter signaling repetition counter expiration
  • timeout: timeout for receiving from queue
  • repetition: repetition counter

BaseWorker &setTimeout(int timeout = -1)

Set parameters defining the behavior of the worker

Parameters
  • timeout: timeout for receiving from queue

BaseWorker &setRepetition(int repetition = -1)

Set parameters defining the behavior of the worker

Parameters
  • repetition: repetition counter

BaseWorker &start()

Starts auxiliary thread that works on far ends of the queues Default settings are “waiting forever” and “repeat forever”

BaseWorker &stop()

Stop thread activity. If “request” queue still has some entries they will be received before thread exits. After requesting a stop the new entries can not be put (just ignored) into request queue.

BaseWorker &abort()

This function stops thread immediately despite the fact like nonempty queue.

void join()

It will block until the auxiliary thread is joined.

void push(const T &t)

Call this function from the main thread to put new data block to the request queue

Parameters
  • t: data of type T to put into the request queue

Private Functions

void run()

Receive data block of type T from request queue. The behavior is defined by two parameters: m_timeout and m_repetition. In case of successful receiving the m_callback function will be called.

class Device
#include <Device.hh>

all Karabo devices derive from this class

The Device class is the base class for all Karabo devices. It provides for a standardized set of basic properties that all devices share and provides interaction with its configuration and properties.

Devices are defined by their expected parameters; a list of properties and commands that are known to the distributed system at static time. These parameters describe a devices Schema, which in turn describes the possible configurations of the device.

Inherits from karabo::xms::SignalSlotable

Subclassed by karabo::devices::DataLogger, karabo::devices::DataLoggerManager, karabo::devices::DataLogReader, karabo::devices::GuiServerDevice, karabo::devices::PropertyTest

Public Functions

Device(const karabo::data::Hash &configuration)

Construct a device with a given configuration. The configuration Hash may contain any of the following entries:

serverId: a string representing the server this device is running on. If not given the device assumes to run in stand-alone mode.

deviceId: a string representing this device’s id, part of the unique identifier in the distributed system. If not given it defaults to none.

Parameters
  • configuration:

~Device()

The destructor will reset the DeviceClient attached to this device.

void registerInitialFunction(const std::function<void()> &func)

Register a function to be called after construction

Can be called by each class of an inheritance chain. Functions will be called in order of registration.

void finalizeInternalInitialization(const karabo::net::Broker::Pointer &connection, bool consumeBroadcasts, const std::string &timeServerId)

This method is called to finalize initialization of a device. It is needed to allow user code to hook in after the base device constructor, but before the device is fully initialized.

Parameters
  • connection: The broker connection for the device.
  • consumeBroadcasts: If false, do not listen directly to broadcast messages (addressed to ‘*’). Whoever sets this to false has to ensure that broadcast messages reach the Device in some other way, otherwise the device will not work correctly.
  • timeServerId: The id of the time server to be used by the device - usually set by the DeviceServer.

DeviceClient &remote()

This function allows to communicate to other (remote) devices. Any device contains also a controller for other devices (DeviceClient) which is returned by this function.

Return
DeviceClient instance

template <class ValueType>
void set(const std::string &key, const ValueType &value)

Updates the state/properties of the device. This function automatically notifies any observers in the distributed system.

Parameters
  • key: A valid parameter of the device (must be defined in the expectedParameters function)
  • value: The corresponding value (of corresponding data-type)

template <class ItemType>
void setVectorUpdate(const std::string &key, const std::vector<ItemType> &updates, VectorUpdate updateType, const karabo::data::Timestamp &timestamp)

Concurrency safe update of vector property

Does not work for Hash (i.e. table element) due to Hash equality just checking similarity. Removing might be unreliable for VECTOR_FLOAT or VECTOR_DOUBLE due to floating point equality issues.

Parameters
  • key: of the vector property to update
  • updates: items to remove from property vector (starting at the front) or to add (at the end)
  • updateType: indicates update type - applied individually to all items in ‘updates’
  • timestamp: timestamp to assign to updated vector property (e.g. getTimestamp())

template <class ValueType>
void set(const std::string &key, const ValueType &value, const karabo::data::Timestamp &timestamp)

Updates the state of the device. This function automatically notifies any observers in the distributed system.

Any updates are validated against the device schema and rejected if they are not appropriate for the current device state or are of wrong type. During validation alarm bounds are evaluated and alarms on properties will be raised if alarm conditions are met. Additionally, the distributed system is notified of these alarms.

Parameters
  • key: A valid parameter of the device (must be defined in the expectedParameters function)
  • value: The corresponding value (of corresponding data-type)
  • timestamp: The time of the value change

void writeChannel(const std::string &channelName, const karabo::data::Hash &data)

Writes a hash to the specified channel. The hash internally must follow exactly the data schema as defined in the expected parameters.

If ‘data’ contains an ‘NDArray’, consider to use the overload of this method that has the ‘safeNDArray’ flag. If that flag can be set to ‘true’, data copies can be avoided:

writeChannel(channelName, data, getActualTimestamp(), true);

Thread safety: The ‘writeChannel(..)’ methods and ‘signalEndOfStream(..)’ must not be called concurrently for the same ‘channelName’.

Parameters
  • channelName: The output channel name
  • data: Hash with the data

void writeChannel(const std::string &channelName, const karabo::data::Hash &data, const karabo::data::Timestamp &timestamp, bool safeNDArray = false)

Writes a hash to the specified channel. The hash internally must follow exactly the data schema as defined in the expected parameters.

Thread safety: The ‘writeChannel(..)’ methods and ‘signalEndOfStream(..)’ must not be called concurrently for the same ‘channelName’.

Parameters
  • channelName: The output channel name
  • data: Hash with the data
  • timestamp: A user provided timestamp (if e.g. retrieved from h/w)
  • safeNDArray: Boolean that should be set to ‘true’ if ‘data’ contains any ‘NDArray’ and their data is not changed after this ‘writeChannel’. Otherwise, data will be copied if needed, i.e. when the output channel has to queue or serves inner-process receivers.

void signalEndOfStream(const std::string &channelName)

Signals an end-of-stream event (EOS) on the output channel identified by channelName Thread safety: The ‘writeChannel(..)’ methods and ‘signalEndOfStream(..)’ must not be called concurrently for the same ‘channelName’.

Parameters
  • channelName: the name of the output channel.

void set(const karabo::data::Hash &hash)

Updates the state/properties of the device with all key/value pairs given in the hash.

Any updates are validated against the device schema and rejected if they are not appropriate for the current device state or are of wrong type. During validation alarm bounds are evaluated and alarms on properties will be raised if alarm conditions are met. Additionally, the distributed system is notified of these alarms. For those paths in ‘hash’ which do not already have time stamp attributes assigned as tested by Timestamp::hashAttributesContainTimeInformation(hash.getAttributes(<path>))), the actual timestamp is chosen.

NOTE: This function will automatically and efficiently (only one message) inform any observers.

Parameters
  • hash: Hash of updated internal parameters (must be in current full schema, e.g. since declared in the expectedParameters function)

void set(const karabo::data::Hash &hash, const karabo::data::Timestamp &timestamp)

Updates the state of the device with all key/value pairs given in the hash

Any updates are validated against the device schema and rejected if they are not appropriate for the current device state or are of wrong type. During validation alarm bounds are evaluated and alarms on properties will be raised if alarm conditions are met. Additionally, the distributed system is notified of these alarms.

NOTE: This function will automatically and efficiently (only one message) inform any observers.

Parameters
  • hash: Hash of updated internal parameters (must be in current full schema, e.g. since declared in the expectedParameters function)
  • timestamp: to indicate when the set occurred - but is ignored for paths in ‘hash’ that already have time stamp attributes as tested by Timestamp::hashAttributesContainTimeInformation(hash.getAttributes(<path>)))

template <class ValueType>
void setNoValidate(const std::string &key, const ValueType &value)

Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.

Parameters
  • key: identifying the property to update
  • value: updated value

template <class ValueType>
void setNoValidate(const std::string &key, const ValueType &value, const karabo::data::Timestamp &timestamp)

Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.

Parameters
  • key: identifying the property to update
  • value: updated value
  • timestamp: optional timestamp to indicate when the set occurred.

void setNoValidate(const karabo::data::Hash &hash)

Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.

NOTE: This function will automatically and efficiently (only one message) inform any observers.

Parameters
  • config: Hash of updated internal parameters (must be declared in the expectedParameters function)

void setNoValidate(const karabo::data::Hash &hash, const karabo::data::Timestamp &timestamp)

Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.

NOTE: This function will automatically and efficiently (only one message) inform any observers.

Parameters
  • config: Hash of updated internal parameters (must be declared in the expectedParameters function)
  • timestamp: optional timestamp to indicate when the set occurred.

template <class T>
T get(const std::string &key) const

Retrieves the current value of any device parameter (that was defined in the expectedParameters function)

Return
value of the requested parameter
Parameters
  • key: A valid parameter of the device (must be defined in the expectedParameters function)

template <class T>
T getAs(const std::string &key) const

Retrieves the current value of any device parameter (that was defined in the expectedParameters function) The value is casted on the fly into the desired type. NOTE: This function is considerably slower than the simple get() functionality

Return
value of the requested parameter
Parameters
  • key: A valid parameter of the device (must be defined in the expectedParameters function)

karabo::data::Schema getFullSchema() const

Retrieves all expected parameters of this device

Return
Schema object containing all expected parameters

void appendSchema(const karabo::data::Schema &schema, const bool = false)

Append a schema to the existing device schema

Parameters
  • schema: to be appended - may also contain existing elements to overwrite their attributes like min/max values/sizes, alarm ranges, etc. If it contains Input-/OutputChannels, they are (re-)created. If previously an InputChannel existed under the same key, its data/input/endOfStream handlers are kept for the recreated InputChannel.
  • unused: parameter, kept for backward compatibility.

void updateSchema(const karabo::data::Schema &schema, const bool = false)

Replace existing schema descriptions by static (hard coded in expectedParameters) part and add additional (dynamic) descriptions. Previous additions will be removed.

Parameters
  • schema: additional, dynamic schema - may also contain existing elements to overwrite their attributes like min/max values/sizes, alarm ranges, etc. If it contains Input-/OutputChannels, they are (re-)created (and previously added ones removed). If previously an InputChannel existed under the same key, its data/input/endOfStream handlers are kept for the recreated InputChannel.
  • unused: parameter, kept for backward compatibility.

template <class AliasType>
AliasType getAliasFromKey(const std::string &key) const

Converts a device parameter key into its aliased key (must be defined in the expectedParameters function)

Return
Aliased representation of the parameter
Parameters
  • key: A valid parameter of the device (must be defined in the expectedParameters function)

template <class AliasType>
std::string getKeyFromAlias(const AliasType &alias) const

Converts a device parameter alias into the original key (must be defined in the expectedParameters function)

Return
The original name of the parameter
Parameters
  • key: A valid parameter-alias of the device (must be defined in the expectedParameters function)

template <class T>
const bool aliasHasKey(const T &alias) const

Checks if the argument is a valid alias of some key, i.e. defined in the expectedParameters function

Return
true if it is an alias found in one of three containers of parameters: “reconfigurable”, “initial” or “monitored”, otherwise false
Parameters
  • alias: Arbitrary argument of arbitrary type

bool keyHasAlias(const std::string &key) const

Checks if some alias is defined for the given key

Return
true if the alias exists
Parameters
  • key: in expectedParameters mapping

karabo::data::Types::ReferenceType getValueType(const std::string &key) const

Checks the type of any device parameter (that was defined in the expectedParameters function)

Return
The enumerated internal reference type of the value
Parameters
  • key: A valid parameter of the device (must be defined in the expectedParameters function)

karabo::data::Hash getCurrentConfiguration(const std::string &tags = "") const

Retrieves the current configuration. If no argument is given, all parameters (those described in the expected parameters section) are returned. A subset of parameters can be retrieved by specifying one or more tags.

Return
A Hash containing the current value of the selected configuration
Parameters
  • tags: The tags (separated by comma) the parameter must carry to be retrieved

karabo::data::Hash getCurrentConfigurationSlice(const std::vector<std::string> &paths) const

Retrieves a slice of the current configuration.

Return
Hash with the current values and attributes (e.g. timestamp) of the selected configuration
Parameters
  • paths: of the configuration which should be returned (as declared in expectedParameters) (method throws ParameterExcepton if a non-existing path is given)

karabo::data::Hash filterByTags(const karabo::data::Hash &hash, const std::string &tags) const

Return a tag filtered version of the input Hash. Tags are as defined in the device schema

Return
a filtered version of the input Hash.
Parameters
  • hash: to filter
  • tags: to filter by

const std::string &getServerId() const

Return the serverId of the server this device is running on

Return

const karabo::data::State getState()

Return a State object holding the current unified state of the device.

Return

void updateState(const karabo::data::State &currentState)

Update the state of the device, using “actual timestamp”.

Will also update the instanceInfo describing this device instance (if new or old State are ERROR).

Parameters
  • currentState: the state to update to

void updateState(const karabo::data::State &currentState, const karabo::data::Hash &other)

Update the state of the device, using “actual timestamp”.

Will also update the instanceInfo describing this device instance (if new or old State are ERROR).

Parameters
  • currentState: the state to update to
  • other: a Hash to set other properties in the same state update message, time stamp attributes to its paths have precedence over the actual timestamp

void updateState(const karabo::data::State &currentState, const karabo::data::Timestamp &timestamp)

Update the state of the device, using given timestamp.

Will also update the instanceInfo describing this device instance (if new or old State are ERROR).

Parameters
  • currentState: the state to update to
  • timestamp: time stamp to assign to the state property and the properties in ‘other’ (if the latter do not have specified timestamp attributes)

void updateState(const karabo::data::State &currentState, karabo::data::Hash other, const karabo::data::Timestamp &timestamp)

Update the state of the device, using given timestamp.

Will also update the instanceInfo describing this device instance (if new or old State are ERROR).

Parameters
  • currentState: the state to update to
  • other: a Hash to set other properties in the same state update message, time stamp attributes to its paths have precedence over the given ‘timestamp’
  • timestamp: time stamp to assign to the state property and the properties in ‘other’ (if the latter do not have specified timestamp attributes)

void execute(const std::string &command) const

Execute a command on this device

Parameters
  • command:

template <class A1>
void execute(const std::string &command, const A1 &a1) const

Execute a command with one argument on this device

Parameters
  • command:
  • a1:

template <class A1, class A2>
void execute(const std::string &command, const A1 &a1, const A2 &a2) const

Execute a command with two arguments on this device

Parameters
  • command:
  • a1:
  • a2:

template <class A1, class A2, class A3>
void execute(const std::string &command, const A1 &a1, const A2 &a2, const A3 &a3) const

Execute a command with three arguments on this device

Parameters
  • command:
  • a1:
  • a2:
  • a3:

template <class A1, class A2, class A3, class A4>
void execute(const std::string &command, const A1 &a1, const A2 &a2, const A3 &a3, const A4 &a4) const

Execute a command with four arguments on this device

Parameters
  • command:
  • a1:
  • a2:
  • a3:
  • a4:

karabo::data::AlarmCondition getAlarmCondition() const

Get the current alarm condition the device is in

Return

void setAlarmCondition(const karabo::data::AlarmCondition &condition, bool needsAcknowledging = false, const std::string &description = std::string())

Set the global alarm condition

Parameters
  • condition: to set
  • needsAcknowledging: if this condition will require acknowledgment on the alarm service
  • description: an optional description of the condition. Consider including remarks on how to resolve

const karabo::data::AlarmCondition &getAlarmCondition(const std::string &key, const std::string &sep = ".") const

Get the alarm condition for a specific property

Return
the alarm condition of the property
Parameters
  • key: of the property to get the condition for
  • sep: optional separator to use in the key path

void slotTimeTick(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)

A slot called by the device server if the external time ticks update to synchronize this device with the timing system.

Parameters
  • id: current train id
  • sec: current system seconds
  • frac: current fractional seconds
  • period: interval between subsequent ids in microseconds

virtual void onTimeTick(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)

A hook which is called if the device receives external time-server update, i.e. if slotTimeTick on the device server is called. Can be overwritten by derived classes.

Parameters
  • id: train id
  • sec: unix seconds
  • frac: fractional seconds (i.e. attoseconds)
  • period: interval between ids in microseconds

virtual void onTimeUpdate(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)

If the device receives time-server updates via slotTimeTick, this hook will be called for every id in sequential order. The time stamps (sec + frac) of subsequent ids might be identical - though they are usually spaced by period. Can be overwritten in derived classes.

Parameters
  • id: train id
  • sec: unix seconds
  • frac: fractional seconds (i.e. attoseconds)
  • period: interval between ids in microseconds

void appendSchemaMaxSize(const std::string &path, unsigned int value, bool emitFlag = true)

Append Schema to change/set maximum size information for path - if paths does not exist, throw exception

This is similar to the more general appendSchema, but dedicated to a common use case.

Caveat: This does not recreate an output channel if its schema is changed

Parameters
  • path: indicates the parameter which should be a Vector- or TableElement
  • value: is the new maximum size of the parameter
  • emitFlag: indicates if others should be informed about this Schema update. If this method is called for a bunch of paths, it is recommended to set this to true only for the last call.

Public Static Functions

void expectedParameters(karabo::data::Schema &expected)

The expected parameter section of the Device class, known at static time. The basic parameters described here are available for all devices, many of them being expert or admin visible only.

Parameters
  • expected: a Schema to which these parameters will be appended.

Protected Functions

karabo::data::Timestamp getActualTimestamp() const

Returns the actual timestamp. The Trainstamp part of Timestamp is extrapolated from the last values received via slotTimeTick (or zero if no time ticks received yet). To receive time ticks, the server of the device has to be connected to a time server.

Return
the actual timestamp

karabo::data::Timestamp getTimestamp(const karabo::data::Epochstamp &epoch) const

Returns the Timestamp for given Epochstamp. The Trainstamp part of Timestamp is extrapolated forward or backward from the last values received via slotTimeTick (or zero if no time ticks received yet). To receive time ticks, the server of the device has to be connected to a time server.

Return
the matching timestamp, consisting of epoch and the corresponding Trainstamp
Parameters
  • epoch: for that the time stamp is searched for

Private Functions

void setNoLock(const karabo::data::Hash &hash, const karabo::data::Timestamp &timestamp)

Internal method for set(Hash, Timestamp), requiring m_objectStateChangeMutex to be locked

void setNoValidateNoLock(const karabo::data::Hash &hash, const karabo::data::Timestamp &timestamp)

Internal version of setNoValidate(hash, timestamp) that requires m_objectStateChangeMutex to be locked

void initChannels(const karabo::data::Schema &schema, const std::string &topLevel = "")

Called to setup pipeline channels, will recursively go through the given schema, assuming it to be at least a part of the schema of the device. Needs to be called with m_objectStateChangeMutex being locked.

  • Parameters
    • schema: the schema to traverse
  • Parameters
    • topLevel: std::string: empty or existing path of full
  • schema of the device

void prepareOutputChannel(const std::string &path)

Create OutputChannel for given path and take care to set handlers needed Needs to be called with m_objectStateChangeMutex being locked.

Parameters
  • path:

void prepareInputChannel(const std::string &path)

Create InputChannel for given path and take care to set handlers needed Needs to be called with m_objectStateChangeMutex being locked.

Parameters
  • path:

void slotCallGuard(const std::string &slotName, const std::string &callee)

This function is called by SignalSlotable to verify if a slot may be called from remote. The function only checks slots that are mentioned in the expectedParameter section (“DeviceSlots”) The following checks are performed:

Parameters
  • slotName: name of the slot
  • callee: the calling remote, can be unknown

1) Is this device locked by another device? If the lockedBy field is non-empty and does not match the callee’s instance id, the call will be rejected

2) Is the slot callable from the current state, i.e. is the current state specified as an allowed state for the slot.

void slotClearLock()

Clear any lock on this device

karabo::data::Hash getTimeInfo()

Internal method to retrieve time information of this device.

void slotGetTime(const karabo::data::Hash&)

Returns the actual time information of this device.

This slot reply is a Hash with the following keys:

  • time and its attributes provide an actual timestamp with train Id information.
  • timeServerId the id of the time server configured for the DeviceServer of this device; “None” when there’s no time server configured.
  • reference and its attributes provide the latest timestamp information received from the timeserver.
Parameters
  • info: an unused (at least for now) Hash parameter that has been added to fit the generic slot call (Hash in, Hash out) of the GUI server/client protocol.

void slotGetSystemInfo(const karabo::data::Hash&)

Returns the actual system information of this device.

This slot reply is a Hash with the following keys:

  • user the actual user running this device
  • broker the current connected broker node
Parameters
  • info: an unused (at least for now) Hash parameter that has been added to fit the generic slot call (Hash in, Hash out) of the GUI server/client protocol.

and all keys provided by slotGetTime.

Private Members

karabo::data::Validator m_validatorIntern

Validators to validate…

karabo::data::Validator m_validatorExtern

…internal updates via ‘Device::set

std::shared_ptr<DeviceClient> m_deviceClient

…external updates via ‘Device::slotReconfigure’

class DeviceClient
#include <DeviceClient.hh>

This class can be used to (remotely) control devices of the distributed system Synchronous calls (i.e. get()) are in fact asynchronous under the hood.

The Karabo DeviceClient provides a high-level interface for common calls to (remote) devices in the distributed system. In principle functionality implemented in the DeviceClient can be fully implemented in the Device using low level SignalSlotable calls alone, but device developers are discouraged from this approach, especially if synchronous behavior is acceptable or even desired.

In the context of a Device the DeviceClient is available using the Device::remote() function; it then shares the SignalSlotable instance of the device, e.g. there is no instantiation overhead.

Inherits from std::enable_shared_from_this< DeviceClient >

Public Functions

DeviceClient(const std::string &instanceId = std::string(), bool implicitInit = true, const karabo::data::Hash &serviceDeviceIds = karabo::data::Hash())

Constructor which establishes an own connection to the communication system. This constructor is intended for stand-alone C++ device clients. Once we care about authentication, this has to be added here.

Parameters
  • instanceId: The id with which the client should participate in the system. If not unique or invalid, constructor will throw an exception. If empty (i.e. default), an id will be generated from host name and process id.
  • implicitInit: If true (default for backward compatibility - but NOT recommended!), the constructor will implicitly try to trigger a call to initialize() via the event loop. Since this can fail silently, it is strongly recommended to use implicitInit = false and call the initialize() method right after the constructor.
  • serviceDeviceIds: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.

DeviceClient(const std::shared_ptr<karabo::xms::SignalSlotable> &signalSlotable, bool implicitInit = true, const karabo::data::Hash &serviceDeviceIds = karabo::data::Hash())

Constructor using instantiated signalSlotable object (shared communication - take care that the signalSlotable is kept alive since the DeviceClient will only keep a weak pointer)

Parameters
  • signalSlotable: An instance of the SignalSlotable lass
  • implicitInit: If true (default for backward compatibility - but NOT recommended!), the constructor will implicitly try to trigger a call to initialize() via the event loop. Since this can fail silently, it is strongly recommended to use implicitInit = false and call the initialize() method right after the constructor.
  • serviceDeviceIds: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.

DeviceClient(const std::string &instanceId, const karabo::data::Hash &serviceDeviceIds)

Constructor aimed at cases where a specific DataLoggerManagerId is required. Requires an explicit call to DeviceClient::initialize() after the construction takes place.

Parameters
  • instanceId: The id with which the client should participate in the system. If not unique or invalid, constructor will throw an exception. If empty, an id will be generated from host name and process id.
  • serviceDeviceIds: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.

DeviceClient(const std::shared_ptr<karabo::xms::SignalSlotable> &signalSlotable, const karabo::data::Hash &serviceDeviceIds)

Constructor using instantiated signalSlotable object (shared communication - take care that the signalSlotable is kept alive since the DeviceClient will only keep a weak pointer) and aimed at cases where a specific DataLoggerManagerId is required. Requires an explicit call to DeviceClient::initialize() after the construction takes place.

Parameters
  • signalSlotable: An instance of the SignalSlotable lass
  • serviceDeviceIds: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.

void initialize()

Second constructor. It is strongly recommended to use the constructors with implicitInit = false and explicitely call initialize() after the construction.

const std::string &getInstanceId()

InstanceId of underlying communication object (i.e. SignalSlotable)

void setInternalTimeout(const unsigned int internalTimeout)

Sets the internal timeout for any request/response like communications

Parameters
  • internalTimeout: The default timeout in ms

int getInternalTimeout() const

Retrieves the currently set internal timeout

Return
default timeout in ms

void setAgeing(bool toggle)

Set ageing on or off (on by default)

Return

void setDeviceMonitorInterval(long int milliseconds)

Set interval to wait between subsequent (for the same instance) calls to handlers registered via registerDeviceMonitor. Changes received within that interval will be cached and, in case of several updates of the same property within the interval, only the most up-to-date value will be handled. If negative, switch off caching and call handler immediately.

std::pair<bool, std::string> exists(const std::string &instanceId)

Allows asking whether an instance is online in the current distributed system

Return
Parameters
  • boolean: indicating whether existing and hostname if exists

void enableInstanceTracking()

Enables tracking of new and departing device instances

The handlers registered with registerInstance[New|Gone|Updated]Monitor will be called accordingly. If the handler for instanceNew is registered before calling this method, it will be called for each device currently in the system.

NOTE: Use wisely! There is a performance cost to tracking all devices since it means subscribing to the heartbeats of all servers and devices in the system.

Hash getSystemInformation()

Returns the full information about the current (runtime) distributed system

Return
a Hash containing the full system description

Hash getSystemTopology()

Returns only the topology of the current system (no instance configurations or descriptions)

Return
Hash containing the topology of the runtime system

std::vector<std::string> getServers()

Retrieves all servers currently existing in the distributed system

Return
array of server ids

std::vector<std::string> getClasses(const std::string &deviceServer)

Retrieves all device classes (plugins) available on a given device server

Return
array of device classes
Parameters
  • deviceServer: device server id

std::vector<std::string> getDevices(const std::string &deviceServer)

Retrieves all devices (instances) available on a given device server

Return
array of device instanceIds
Parameters
  • deviceServer: device server id

std::vector<std::string> getDevices()

Retrieves all devices in the distributed system.

Return
array of device instanceIds

karabo::data::Schema getDeviceSchema(const std::string &instanceId)

Retrieves the full Schema (parameter description) of the given instance;

Return
full Schema
Parameters
  • instanceId: Device’s instance ID

karabo::data::Schema getDeviceSchemaNoWait(const std::string &instanceId)

Retrieves the full Schema (parameter description) of the given instance The call is non-blocking, if no Schema is currently available the return will be empty. However, the schema request will be sent and should lead to later arrival of a schema.

Return
full Schema
Parameters
  • instanceId: Device’s instance ID

karabo::data::Schema getActiveSchema(const std::string &instanceId)

Retrieves the currently active Schema (filtered by allowed states and allowed roles) of the given instance

Return
active Schema
Parameters
  • instanceId: Device’s instance ID

karabo::data::Schema getClassSchema(const std::string &serverId, const std::string &classId)

Retrieves a schema from static context of a loaded Device class plug-in. This schema represents a description of parameters possible to configure for instantiation. I.e. returns in fact a description of the constructor arguments to that device class.

Return
Schema describing parameters available at instantiation time
Parameters
  • serverId: instanceId of a deviceServer
  • classId: name of loaded class on the deviceServer (classId)

karabo::data::Schema getClassSchemaNoWait(const std::string &serverId, const std::string &classId)

Retrieves a schema from static context of a loaded Device class plug-in. This schema represents a description of parameters possible to configure for instantiation. This function can be used to pre-cache a schema for later usage. It returns an empty schema.

Return
an empty schem
Parameters
  • serverId: instanceId of a deviceServer
  • classId: name of loaded class on the deviceServer (classId)

std::vector<std::string> getProperties(const std::string &deviceId)

Retrieve the properties of a device at deviceId.

Return
a vector containing the property paths of the device
Parameters
  • deviceId: of the device to request information from

std::vector<std::string> getClassProperties(const std::string &serverId, const std::string &classId)

Retrieve the properties of a class loaded on a server

Return
vector containing the property paths of the class
Parameters
  • serverId: server to request information from
  • classId: of the class

std::vector<std::string> getCurrentlyExecutableCommands(const std::string &instanceId)

Retrieve a list of commands that may be currently executed on a device in the distributed system. Available commands are determined by device state and access rights.

Return
a vector containing the slot names of the commands that can be executed
Parameters
  • instanceId: of the device to ask for available commands

std::vector<std::string> getCurrentlySettableProperties(const std::string &instanceId)

Retrieve a list of properties that may be currently altered on a device in the distributed system. Available properties are determined by device state and access rights.

Return
a vector containing the slot names of the properties that can be altered.
Parameters
  • instanceId: of the device to ask for settable properties

Hash loadConfigurationFromFile(const std::string &filename)

Load a device configuration from a file

Return
a Hash containing the configuration
Parameters
  • filename:

std::pair<bool, std::string> instantiate(const std::string &serverInstanceId, const std::string &classId, const karabo::data::Hash &configuration = karabo::data::Hash(), int timeoutInSeconds = -1)

Attempt to instantiate a device of the specified class, on a remote server with a given initial configuration

Return
(ok, reply) pair where ok is true if no exception occurred and reply is the answer received from server
Parameters
  • serverInstanceId: of the server to instantiate the device on. Needs to have the device plugin available
  • classId: of the device to be instantiate
  • configuration: Hash which contains the initial device configuration. It must have one out of the two following forms: option 1:
    • key “classId” pointing to a string, option 2:
    • no classId specified: class id to be instantiated is taken from classId parameter option 3 (for backward compatibility - not recommended):
    • a single key (e.g. “myClassId”) representing the classId
    • the value for this key is a Hash with all the non-default properties
  • timeoutInSeconds: by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device hasn’t been instantiated.

std::pair<bool, std::string> instantiate(const std::string &serverInstanceId, const karabo::data::Hash &configuration, int timeoutInSeconds = -1)

Instantiate a device on a remote server

Return
Parameters
  • serverInstanceId: of the server to instantiate the device on. Needs to have the device plugin available
  • configuration: Hash which contains the initial device configuration. The ‘classId’ attribute must be present.
  • timeoutInSeconds: by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device hasn’t been instantiated.

Hash formatConfigToInstantiate(const std::string &classId, const karabo::data::Hash &configuration)

Utility method that takes care of adding classId to configuration of device to be instantiated by instantiate and instantiateNoWait. If configuration does not have ‘classId’ key, this is added, with the value of classId parameter. Otherwise the configuration ‘classId’ value is used. In the latter case, if the value of classId parameter mismatches the one of ‘classId’ attribute of configuration a warning is thrown.

Return
configuration ready to be sent to device server
Parameters
  • classId: of the device to be instantiated.
  • configuration: of the device to be instantiated.

void instantiateNoWait(const std::string &serverInstanceId, const std::string &classId, const karabo::data::Hash &configuration = karabo::data::Hash())

Instantiate a device on a remote server. In contrast to DeviceClient::instantiate, this function returns immediately.

Parameters
  • serverInstanceId: of the server to instantiate the device on. Needs to have the device plugin available
  • classId: of the device to be instantiate
  • configuration: Hash which contains the initial device configuration. It must have one out of the two following forms: option 1:
    • key “classId” pointing to a string, option 2:
    • no classId specified: class id to be instantiated is taken from classId parameter option 3 (for backward compatibility - not recommended):
    • a single key (e.g. “myClassId”) representing the classId
    • the value for this key is a Hash with all the non-default properties

void instantiateNoWait(const std::string &serverInstanceId, const karabo::data::Hash &configuration)

Instantiate a device on a remote server. In contrast to DeviceClient::instantiate, this function returns immediately.

Return
Parameters
  • serverInstanceId: of the server to instantiate the device on. Needs to have the device plugin available
  • configuration: Hash which contains the initial device configuration. The ‘classId’ attribute must be present.

std::pair<bool, std::string> killDevice(const std::string &deviceId, int timeoutInSeconds = -1)

Kill a device in the distributed system and wait until it is actually dead

Return
Parameters
  • deviceId: of the device to kill
  • timeoutInSeconds: timeoutInSeconds by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device hasn’t been killed.

void killDeviceNoWait(const std::string &deviceId)

Kill a device in the distributed system and return immediately

Return
Parameters
  • deviceId: of the device to kill

std::pair<bool, std::string> killServer(const std::string &serverId, int timeoutInSeconds = -1)

Kill a device server in the distributed system and all its associated devices. Waits til the server is dead.

Return
Parameters
  • serverId: of the server to kill
  • timeoutInSeconds: timeoutInSeconds timeoutInSeconds by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device server hasn’t been killed.

void killServerNoWait(const std::string &serverId)

Kill a device server in the distributed system and all its associated devices. Returns immediately.

Return
Parameters
  • serverId: of the server to kill

karabo::data::Hash get(const std::string &instanceId)

Return the configuration Hash of an instance. The configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.

Return
a Hash holding the instance configuration
Parameters
  • instanceId: for which to return the configuration of

void get(const std::string &instanceId, karabo::data::Hash &hash)

Return the configuration Hash of an instance. The configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.

Parameters
  • instanceId: for which to return the configuration of
  • hash: reference to write configuration into

karabo::data::Hash getConfigurationNoWait(const std::string &deviceId)

Return the cached configuration if it is still valid, otherwise query an updated version but return an empty Hash.

Return
a Hash holding the instance configuration
Parameters
  • deviceId: for which to return the configuration of

bool hasAttribute(const std::string &instanceId, const std::string &key, const std::string &attribute, const char keySep = data::Hash::k_defaultSep)

Check if an attribute exists for a property on a given instance

Return
a boolean indicating if the attribute is present
Parameters
  • instanceId: to check on
  • key: path to the property to check if it has a given attribute
  • attribute: to check for
  • keySep: path separator

template <class T>
T get(const std::string &instanceId, const std::string &key, const char keySep = data::Hash::k_defaultSep)

Return a property from a remote instance. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.

Return
the current property value on the remote device TypeException if the templated type does not match the property type.
Parameters
  • instanceId: to retrieve the property from
  • key: identifying the property
  • keySep: path separator

template <class T>
void get(const std::string &instanceId, const std::string &key, T &value, const char keySep = data::Hash::k_defaultSep)

Return a property from a remote instance. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.

Parameters
  • instanceId: to retrieve the property from
  • key: identifying the property
  • value: reference to write the property value to
  • keySep: path separator TypeException if the templated type does not match the property type.

template <class T>
T getAs(const std::string &instanceId, const std::string &key, const char keySep = data::Hash::k_defaultSep)

Return a property from a remote instance casted to the template type. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.

Return
the current property value on the remote device TypeException if the property cannot be casted to the template type
Parameters
  • instanceId: to retrieve the property from
  • key: identifying the property
  • keySep: path separator

std::any getAsAny(const std::string &instanceId, const std::string &key, const char keySep = data::Hash::k_defaultSep)

Return a property from a remote instance as a std::any value. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.

Return
the current property value on the remote device as std::any type
Parameters
  • instanceId: to retrieve the property from
  • key: identifying the property
  • keySep: path separator

bool cacheLoggerMap(bool toggle)

Toggles caching of the DataLogger map on (true) and off (false). If set to true the logger map is always kept up to date, which speeds up repeated calls to DeviceClient::getProperyHistory.

Return
true if operation was successful
Parameters
  • toggle:

std::vector<karabo::data::Hash> getFromPast(const std::string &deviceId, const std::string &key, const std::string &from, std::string to = "", int maxNumData = 0)

Returns the history of a device property for a given period of time

Return
a vector of Hashes holding the property’s history. Each entry consists of a Hash with a key “v” holding the value of the appropriate type. For each entry “v” Karabo train and timestamp attributes are set which can be retrieved using the karabo::data::Timestamp::fromHashAttributes method.
Parameters
  • deviceId: of the device holding the property
  • key: path to the property on the device
  • from: karabo::data::Epochstamp in Iso8601 format signifying the start of the time interval to get the history from
  • to: karabo::data::Epochstamp in Iso8601 format signifying the end of the time interval to get the history from. If left empty default to now
  • maxNumData: maximum number of data points to retrieve, starting from the start of the interval

std::vector<karabo::data::Hash> getPropertyHistory(const std::string &deviceId, const std::string &key, const std::string &from, std::string to = "", int maxNumData = 0)

Returns the history of a device property for a given period of time

Return
a vector of Hashes holding the property’s history. Each entry consists of a Hash with a key “v” holding the value of the appropriate type. For each entry “v” Karabo train and timestamp attributes are set which can be retrieved using the karabo::data::Timestamp::fromHashAttributes method.
Parameters
  • deviceId: of the device holding the property
  • key: path to the property on the device
  • from: karabo::data::Epochstamp in Iso8601 format signifying the start of the time interval to get the history from
  • to: karabo::data::Epochstamp in Iso8601 format signifying the end of the time interval to get the history from. If left empty default to now
  • maxNumData: maximum number of data points to retrieve, starting from the start of the interval

std::string getDataLogReader(const std::string &deviceId)

Returns instanceId of data log reader for data of given device. Could be empty.

Return
Parameters
  • deviceId:

std::pair<karabo::data::Hash, karabo::data::Schema> getConfigurationFromPast(const std::string &deviceId, const std::string &timepoint)

Returns the device configuration and corresponding schema for a given point in time. Information for the nearest matching logged time is returned.

Return
a pair of the configuration Hash and corresponding device Schema
Parameters
  • deviceId: of the device to return the configuration for
  • timepoint: to return information for. Should be an iso8601 formatted string.

karabo::data::Hash listConfigurationFromName(const std::string &deviceId, const std::string &namePart = "")

Returns the configurations saved for a device under names that contain a given name part.

Return
a hash with the operation execution status and the list of configuration(s) and schema(s) in case of success. For the operation execution status, the returned hash has the keys “success” with a boolean value that indicates whether the the operation was successful and a key “reason” with a string value that will contain the reason for failure or will be empty in the case of success. The returned hash will also have a key “configs” whose value will be a vector of hashes with data about the configs that match the name part. If no configuration is saved for the device under a name that contains the namePart, the “configs” vector will be empty. Each hash in the “configs” vector contains the keys “name”, “timepoint”.
Parameters
  • deviceId: of the device whose named configuration(s) and schema(s) should be returned.
  • namePart: of the device configuration(s) and schema(s) to be returned. An empty namePart means returns all the named configuration(s) and schema(s)

karabo::data::Hash getConfigurationFromName(const std::string &deviceId, const std::string &name)

Returns the configuration and schema saved for a device under a given name.

Return
a hash with the operation execution status and the device configuration and schema in case of success. For the operation execution status, the returned hash has the keys “success” with a boolean value that indicates whether the the operation was successful and a key “reason” with a string value that will contain the reason for failure or will be empty in the case of success. The returned hash will also have a key “config” whose value will be a hash with the keys “name”, “timepoint”, “description”, “priority”, “user”, “config” and “schema” when a device configuration with the given name is found or an empty hash in case of failure or when no device configuration with the given name exists.
Parameters
  • deviceId: of the device whose named configuration and schema should be returned.
  • name: of the device configuration and schema to be returned.

karabo::data::Hash getLastConfiguration(const std::string &deviceId, int priority = 1)

Returns the most recently saved configuration for a device that has a given priority.

Return
a hash with the operation execution status and the device configuration and schema in case of success. For the operation execution status, the returned hash has the keys “success” with a boolean value that indicates whether the the operation was successful and a key “reason” with a string value that will contain the reason for failure or will be empty in the case of success. The returned hash will also have a key “config” whose value will be a hash with the keys “name”, “timepoint”, “description”, “priority”, “user”, “config” and “schema” when a device configuration with the given priority is found or an empty hash in case of failure or when no device configuration with the given priority exists.
Parameters
  • deviceId: of the device whose named configuration and schema should be returned.
  • priority: of the device configuration and schema to be returned.

std::pair<bool, std::string> saveConfigurationFromName(const std::string &name, const std::vector<std::string> &deviceIds)

Saves a collection of current device configurations (and the corresponding schemas) in the configuration database under a common name, user, priority and description.

Return
a pair with a success flag (true when the operation succeeds) in the first position and a reason failture description (empty in case of success) in the second position.
Parameters
  • name: to be assigned to the saved collection of device configurations (with schemas).
  • deviceIds: the devices whose current configurations (and schemas) are to be saved.

void registerInstanceChangeMonitor(const InstanceChangeThrottler::InstanceChangeHandler &callBackFunction, unsigned int throttlerIntervalMs = 500u, unsigned int maxChangesPerCycle = 100u)

Register a throttled callback handler to be triggered when a new device instance appears, updates its instance info record or goes away in the distributed system. The throtter that dispatches the instance changes events to the handler uses a given interval between its running cycles.

Parameters
  • callBackFunction: Function to be invoked with information about the instances changes events.
  • throttlerInterval: Interval, in milliseconds, between successive cycles of the throttle.
  • maxChangesPerCycle: Maximum number of instance changes to be dispatched per cycle of the throttler
    • upon reaching this limit the throttler immediately dispatches the changes, despite the elapsed time from the last cycle.

void flushThrottledInstanceChanges()

Flushes, asap, the throttled instance changes that are waiting to be dispatched.

void registerInstanceNewMonitor(const InstanceNewHandler &callBackFunction)

Register a callback handler to be triggered if a new instance appears in the distributed system.

Parameters
  • callBackFunction: which will receive the instanceInfo Hash

void registerInstanceUpdatedMonitor(const InstanceUpdatedHandler &callBackFunction)

Register a callback handler to be triggered if an instance receives a state update from the distributed system

Parameters
  • callBackFunction: which will receive the instanceInfo Hash

void registerInstanceGoneMonitor(const InstanceGoneHandler &callBackFunction)

Register a callback handler to be triggered if an instance disappears from the distributed system

Parameters
  • callBackFunction: receiving the instanceId and instanceInfo Hash

void registerSchemaUpdatedMonitor(const SchemaUpdatedHandler &callBackFunction)

Register a callback handler to be triggered if an instance receives a schema update from the distributed system Example:

Note
Currently, registering only a schema update monitor with an instance of a DeviceClient is not enough to have the registered call-back activated. A workaround for this is to also register a property monitor with the same instance of DeviceClient that has been used to register the schema update monitor.
Parameters
  • callBackFunction: receiving the instanceId and updated Schema

DeviceClient dc = std::shared_ptr<DeviceClient>(new DeviceClient()); dc->registerSchemaUpdateMonitor(fnSchemaUpdateHandler); dc->registerPropertyMonitor(“deviceId”, “property_to_monitor”, fnCallback);

void registerClassSchemaMonitor(const ClassSchemaHandler &callBackFunction)

Register a callback handler to be triggered if a new class appears on a device server

Parameters
  • callBackFunction: receiving the server id, class id and new class Schema

template <class ValueType>
bool registerPropertyMonitor(const std::string &instanceId, const std::string &key, const std::function<void(const std::string&, const std::string&, const ValueType&, const karabo::data::Timestamp&)> &callbackFunction)

Register a callback function to be triggered when a given property on a device in the distributed system updates

Return
true if the operation was successful
Parameters
  • instanceId: of the device to be monitored
  • key: path to the property to be monitored
  • callbackFunction: handling the update notification. It receives the device id, path, value and timestamp of the updated property

template <class ValueType, class UserDataType>
bool registerPropertyMonitor(const std::string &instanceId, const std::string &key, const std::function<void(const std::string&, const std::string&, const ValueType&, const karabo::data::Timestamp&, const std::any&)> &callbackFunction, const UserDataType &userData, )

Register a callback function to be triggered when a given property on a device in the distributed system updates. Additional user data may be passed to the callback

Return
true if the operation was successful
Parameters
  • instanceId: of the device to be monitored
  • key: path to the property to be monitored
  • callbackFunction: handling the update notification. It receives the device id, path, value and timestamp of the updated property as well as std::any userData.
  • userData: to be passed to the callback as std::any

void unregisterPropertyMonitor(const std::string &instanceId, const std::string &key)

Unregister a property monitor

Parameters
  • instanceId: to unregister the monitor from
  • key: path to the property to unregister from.

void registerDeviceMonitor(const std::string &instanceId, const std::function<void(const std::string&, const karabo::data::Hash&)> &callbackFunction)

Register a callback function to be triggered when a a device in the distributed system updates.

Parameters
  • instanceId: of the device to register to
  • callbackFunction: handling the update. It will receive the device instance id and the updated device configuration Hash

void registerDeviceForMonitoring(const std::string &deviceId)

Registers a device to have its configurations changes monitored.

Note
In order to receive notifications about configuration changes for any of the monitored devices, one needs to register handlers by calling registerDeviceMonitor (updates one by one - even if updates are throttled) or with registerDevicesMonitor (bulk updates).
Parameters
  • deviceId: of the device to be added to the set of monitored devices.

void registerDevicesMonitor(const DevicesChangedHandler &devicesChangedHandler)

Registers a handler for configuration changes for any of the monitored devices.

Note
* To register a device to be monitored, a call to registerDeviceForMonitoring must be made.
  • Throttling of device updates must be enabled via a call to setDeviceMonitorInterval with an argument greater than 0.
Parameters
  • devicesChangesHandler: callback function for configuration changes events for any monitored device.

void unregisterDeviceFromMonitoring(const std::string &deviceId)

Unregisters a device from configuration changes monitoring.

Parameters
  • deviceId: of the device to be removed from the set of monitored devices.

template <class UserDataType>
void registerDeviceMonitor(const std::string &instanceId, const std::function<void(const std::string&, const karabo::data::Hash&, const std::any&)> &callbackFunction, const UserDataType &userData, )

Register a callback function to be triggered when a a device in the distributed system updates. Additional user data may be passed to the callback

Parameters
  • instanceId: of the device to register to
  • callbackFunction: handling the update. It will receive the device instance id and the updated device configuration Hash as well as std::any userData.
  • userData: to be passed to the callback as std::any

void unregisterDeviceMonitor(const std::string &instanceId)

Unregister a device monitor.

Parameters
  • instanceId: to unregister the monitor from

bool registerChannelMonitor(const std::string &channelName, const InputChannelHandlers &handlers, const karabo::data::Hash &inputChannelCfg = karabo::data::Hash())

Register handlers to be called whenever the defined output channel receives data or end-of-stream (EOS). Internally, an InputChannel is created and configured using the cfg Hash and its connection status can be monitored via the ‘statusTracker’ of the handlers argument

Return
false if channel is already registered
Parameters
  • channelName: identifies the channel as a concatenation of the id of its devices, a colon (:) and the name of the output channel (e.g. A/COOL/DEVICE:output)
  • handlers: container for various handlers (handlers can be empty function pointers):
    • dataHandler std::function<void (const karabo::data::Hash&, const MetaData&)> to be called whenever data arrives
    • inputHandler std::function<void (const InputChannel::Pointer&)> to be called whenever data arrives
    • eosHandler std::function<void (const InputChannel::Pointer&)> called for EOS
    • statusTracker std::function<void(karabo::net::ConnectionStatus)> called whenever the connection status of the underlying InputChannel changes
  • inputChannelCfg: configures via InputChanel::create(..) - use default except you know what your are doing. For the expert: “connectedOutputChannels” will be overwritten

bool registerChannelMonitor(const std::string &instanceId, const std::string &channel, const karabo::xms::SignalSlotable::DataHandler &dataHandler, const karabo::data::Hash &inputChannelCfg = karabo::data::Hash(), const karabo::xms::SignalSlotable::InputHandler &eosHandler = karabo::xms::SignalSlotable::InputHandler(), const karabo::xms::SignalSlotable::InputHandler &inputHandler = karabo::xms::SignalSlotable::InputHandler())

Register handlers to be called whenever the defined output channel receives data or end-of-stream (EOS).

DEPRECATED - use interface with ‘InputChannelHandlers’ argument!

Return
false if channel is already registered
Parameters
  • instanceId: of the device having the output channel
  • channel: is name of the output channel
  • dataHandler: std::function<void (const karabo::data::Hash&, const MetaData&) to be called whenever data arrives
  • inputChannelCfg: configures via InputChanel::create(..) - use default except you know what your are doing for the expert: “connectedOutputChannels” will be overwritten
  • eosHandler: std::function<void (const InputChannel::Pointer&)> called for EOS if given
  • inputHandler: std::function<void (const InputChannel::Pointer&)> to be called whenever data arrives

bool registerChannelMonitor(const std::string &channelName, const karabo::xms::SignalSlotable::DataHandler &dataHandler, const karabo::data::Hash &inputChannelCfg = karabo::data::Hash(), const karabo::xms::SignalSlotable::InputHandler &eosHandler = karabo::xms::SignalSlotable::InputHandler(), const karabo::xms::SignalSlotable::InputHandler &inputHandler = karabo::xms::SignalSlotable::InputHandler())

Register handlers to be called whenever the defined output channel receives data or end-of-stream (EOS).

DEPRECATED - use interface with ‘InputChannelHandlers’ argument!

Return
false if channel is already registered
Parameters
  • channelName: identifies the channel as a concatenation of the id of its devices, a colon (:) and the name of the output channel (e.g. A/COOL/DEVICE:output)
  • dataHandler: std::function<void (const karabo::data::Hash&, const MetaData&) to be called whenever data arrives
  • inputChannelCfg: configures via InputChanel::create(..) - use default except you know what your are doing for the expert: “connectedOutputChannels” will be overwritten
  • eosHandler: std::function<void (const InputChannel::Pointer&)> called for EOS if given
  • inputHandler: std::function<void (const InputChannel::Pointer&)> to be called whenever data arrives

bool unregisterChannelMonitor(const std::string &instanceId, const std::string &channel)

Unregister monitoring of output channel

Return
false if channel was not registered
Parameters
  • instanceId: of the device having the output channel
  • channel: is name of the output channel

bool unregisterChannelMonitor(const std::string &channelName)

Unregister monitoring of output channel

Return
false if channel was not registered
Parameters
  • channelName: identifies the channel as a concatenation of the id of its devices, a colon (:) and the name of the output channel (e.g. A/COOL/DEVICE:output)

template <class T>
void set(const std::string &instanceId, const std::string &key, const T &value, int timeoutInSeconds = -1, const char keySep = data::Hash::k_defaultSep)

Set a remote property in the distributed system

Parameters
  • instanceId: of the device to set the property on
  • key: path to the property to set
  • value: to set
  • timeoutInSeconds: maximum timeout until set operation fails, set to -1 to wait forever
  • keySep: path separator

template <class T>
void setNoWait(const std::string &instanceId, const std::string &key, const T &value, const char keySep = data::Hash::k_defaultSep)

Set a remote property in the distributed system as a fire-and-forget operation. Warning: there is no guarantee and indication if the set succeeded!

Parameters
  • instanceId: of the device to set the property on
  • key: path to the property to set
  • value: to set
  • keySep: path separator

void set(const std::string &instanceId, const karabo::data::Hash &values, int timeoutInSeconds = -1)

Bulk-set remote properties in the distributed system

Parameters
  • instanceId: of the device to set the property on
  • values: a Hash containing the to be set value in a path structure indicating which properties to set
  • timeoutInSeconds: maximum timeout until set operation fails, set to -1 to wait forever

void setNoWait(const std::string &instanceId, const karabo::data::Hash &values)

Bulk-set remote properties in the distributed system as a fire-and-forget operation. Warning: there is no guarantee and indication if the set succeeded!

Parameters
  • instanceId: of the device to set the property on
  • values: a Hash containing the to be set value in a path structure indicating which properties to set

void executeNoWait(const std::string &deviceId, const std::string &command)

Executes a function on a device (an exposed via its Schema) and immediately returns (fire & forget)

Parameters
  • deviceId: The deviceId
  • command: Name of the command

template <typename… Args>
void execute(const std::string &deviceId, const std::string &command, int timeoutInSeconds = 3, const Args&... slotArgs)

Executes a function on a device synchronously (waits until the function finished)

Template Parameters
  • Args: Variadic template for the slot args (no arg is a particular case).
Parameters
  • deviceId: The devideId
  • command: The command
  • timeoutInSeconds: Timeout

template <typename R1, typename… Args>
R1 execute1(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)

Synchronously executes a slot that returns a single element response.

Return
A value of R1 type
Template Parameters
  • R1: Type of the response.
  • Args: Variadic template for the slot arguments.
Parameters
  • deviceId: Id of the device whose slot should be executed.
  • slotName: Name of the slot to execute.
  • timeoutInSeconds: Timeout for the slot execution.
  • slotArgs: Slot arguments.

template <typename R1, typename R2, typename… Args>
std::tuple<R1, R2> execute2(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)

Synchronously executes a slot that returns a two element tuple as a response.

Note
a tuple, instead of a pair, is used as the return value for uniformity with the other executeN methods.
Return
std::tuple<R1, R2> with the results of the slot execution.
Template Parameters
  • R1: Type of first element of the resulting pair.
  • R2: Type of second element of the resulting pair.
  • Args: Variadic template for the slot arguments.
Parameters
  • deviceId: Id of the device whose slot should be executed.
  • slotName: Name of the slot to execute.
  • timeoutInSeconds: Timeout for the slot execution.
  • slotArgs: Slot arguments.

template <typename R1, typename R2, typename R3, typename… Args>
std::tuple<R1, R2, R3> execute3(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)

Synchronously executes a slot that returns a three element tuple as a response.

Return
std::tuple<R1, R2, R3> Tuple with the results of the slot execution.
Template Parameters
  • R1: Type of first element of the resulting tuple.
  • R2: Type of second element of the resulting tuple.
  • R3: Type of third element of the resulting tuple.
  • Args: Variadic template for the slot arguments.
Parameters
  • deviceId: Id of the device whose slot should be executed.
  • slotName: Name of the slot to execute.
  • timeoutInSeconds: Timeout for the slot execution.
  • slotArgs: Slot arguments.

template <typename R1, typename R2, typename R3, typename R4, typename… Args>
std::tuple<R1, R2, R3, R4> execute4(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)

Synchronously executes a slot that returns a four element tuple as a response.

Return
std::tuple<R1, R2, R3, R4> Tuple with the results of the slot execution.
Template Parameters
  • R1: Type of first element of the resulting tuple.
  • R2: Type of second element of the resulting tuple.
  • R3: Type of third element of the resulting tuple.
  • R4: Type of fourth element of the resulting tuple.
  • Args: Variadic template for the slot arguments.
Parameters
  • deviceId: Id of the device whose slot should be executed.
  • slotName: Name of the slot to execute.
  • timeoutInSeconds: Timeout for the slot execution.
  • slotArgs: Slot arguments.

karabo::data::Hash getOutputChannelSchema(const std::string &deviceId, const std::string &outputChannelName)

Request the data schema for an output channel as a Hash containing relevant information

Return
a Hash containing the output channel’s data schema
Parameters
  • deviceId:
  • outputChannelName:

std::vector<std::string> getOutputChannelNames(const std::string &deviceId)

Get the list of all output channel names of the remote device.

Return
vector containing output channel names
Parameters
  • deviceId:

karabo::core::Lock lock(const std::string &deviceId, bool recursive = false, int timeout = -1)

Request locking of device at deviceId. Throws a karabo::util::LockException in case the lock cannot be acquired in the given timeout

Return
a Lock object, holding the lock to deviceId.
Parameters
  • deviceId: the device to be locked
  • recursive: if true, recursive locks on this device are allowed
  • timeout: timeout during which we try to acquire the lock. Set to -1 to wait try indefinitely, 0 to only try once, otherwise give integer seconds to wait.

Protected Types

typedef std::map<std::string, int> InstanceUsage

Map of devices that we are connected to with timer stating their age since last access. Before C++14 not an unordered_map since we want to erase while looping over it

Protected Functions

karabo::data::Hash prepareTopologyEntry(const std::string &path, const karabo::data::Hash &instanceInfo) const

Prepare a topology entry for the runtime system description

Parameters
  • path: the path created with prepareTopologyPath using instanceId and instanceInfo
  • instanceInfo: The instanceInfo Hash received from the broadcast

void mortalize(const std::string &deviceId)

Unmark deviceId from staying connected all the time without ageing.

Also clears a zombie (marked by negative age) from m_instanceUsage and thus locks m_instanceUsageMutex. That means, unlike immortalize(..) and isImortal(..), mortalize(..) must not be called under protection of m_instanceUsageMutex.

bool eraseFromRuntimeSystemDescription(const std::string &path)

returns true if path could be removed

data::Hash getSectionFromRuntimeDescription(const std::string &section) const

Get section (e.g. “device”) from runtime description. Returns empty Hash if section does not exist.

std::string findInstanceSafe(const std::string &instanceId) const

Find full path of ‘instanceId’ in m_runtimeSystemDescription, empty if path does not exist.

Protected Attributes

karabo::data::Hash m_runtimeSystemDescription

server + <serverId> type host version status deviceClasses + classes + <classId> + description SCHEMA configuration HASH description SCHEMA configuration HASH

device + <deviceId> type host version status classId serverId + fullSchema => SCHEMA configuration => HASH activeSchema + <stateName> => SCHEMA

DevicesChangedHandler m_devicesChangesHandler

Handler for all monitored devices configuration updates during last interval.

boost::asio::steady_timer m_signalsChangedTimer

defines whether aging is running or not

std::mutex m_loggerMapMutex

map of collected signalChanged

Private Types

typedef std::map<std::string, std::set<std::string>> SignalChangedMap

keys are instance IDs, values are a sets of properties that changed

Private Functions

std::string findInstance(const std::string &instanceId) const

As findInstanceSafe, but to be called under protection of m_runtimeSystemDescriptionMutex.

void doSendSignalsChanged(const SignalChangedMap &signalChangedMap)

Actually process data in ‘signalChangedMap’ - try/catch should be outside.

bool connectNeeded(const std::string &instanceId)

Marks ‘instanceId’ as used. Returns true if explicit “connect” call should still be done for it.

void initServiceDeviceIds(const karabo::data::Hash &serviceDeviceIds)

Internal helper method to initialize the service device ids members of the DeviceClient instance.

Parameters
  • serviceDeviceIds: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported. If a supported key is missing, the default ID for the service device type is used.

std::vector<std::pair<std::string, karabo::data::Hash>> findAndEraseDevicesAsGone(const std::string &serverId)

Helper for _slotInstanceGone for servers

Finds all devices that belong to given server, removes them from m_runtimeSystemDescription and returns pairs of their deviceIds and instanceInfo. Requires protection of m_runtimeSystemDescriptionMutex.

void treatInstanceAsGone(const std::string &instanceId, const karabo::data::Hash &instanceInfo)

Helper for _slotInstanceGone

Does all needed action

  • despite of removal from m_runtimeSystemDescription
  • and despite of special treatment of devices on the server if instance is a server

struct InputChannelHandlers
#include <DeviceClient.hh>

Container of handlers for InputChannel, to be passed to

bool registerChannelMonitor(const std::string& channelName, const InputChannelHandlers& handlers, const karabo::data::Hash& inputChannelCfg = karabo::data::Hash());

See documentation of that method for meaning of various handlers.

Public Functions

InputChannelHandlers(const karabo::xms::SignalSlotable::DataHandler &data, const karabo::xms::SignalSlotable::InputHandler &eos = karabo::xms::SignalSlotable::InputHandler(), const std::function<void(karabo::net::ConnectionStatus)> &status = std::function<void(karabo::net::ConnectionStatus)>())

Construct with all handlers except input handler (could be specified afterwards)

InputChannelHandlers(const karabo::xms::SignalSlotable::InputHandler &input, const karabo::xms::SignalSlotable::InputHandler &eos = karabo::xms::SignalSlotable::InputHandler(), const std::function<void(karabo::net::ConnectionStatus)> &status = std::function<void(karabo::net::ConnectionStatus)>())

Construct with all handlers except data handler (could be specified afterwards)

class DeviceServer
#include <DeviceServer.hh>

The DeviceServer class hosts device instances.

The DeviceServer class hosts device instances. It monitors the system for new class plugins appearing and notifies the distributed system of these and their static information.

Inherits from karabo::xms::SignalSlotable

Public Functions

DeviceServer(const karabo::data::Hash &config)

The constructor expects a configuration Hash. The following configuration options are supported:

  • serverId: a string giving the server’s id
  • init: a json string containing configurations for device that are to be automatically started by the server
  • connection: a Hash containing the connection information for a karabo::net::Broker
  • pluginDirectory: a path to the plugin directory for this device server
  • heartbeatInterval: interval in seconds at which this server sends heartbeats to the distributed system
    Parameters

bool isRunning() const

Check if the device server is running

Return

void autostartDevices()

It just launches devices that marked to be started by server automatically.

Public Static Functions

void expectedParameters(karabo::data::Schema&)

Static properties of the device server

Parameters
  • to: inject these properties to

Private Functions

std::tuple<std::string, std::string, data::Hash> prepareInstantiate(const data::Hash &configuration)

Helper to create input passed to instantiate. Returns a tuple of the deviceId, the classId and the configuration.

void instantiate(const std::string &deviceId, const std::string &classId, const data::Hash &config, const SignalSlotable::AsyncReply &asyncReply)

Helper for instantiateDevices - e.g. provides the (async) reply for slotStartDevice.

void slotTimeTick(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)

A slot called by the time-server to synchronize this device with the timing system.

Parameters
  • id: current train id
  • sec: current system seconds
  • frac: current fractional seconds
  • period: interval between subsequent ids in microseconds

void timeTick(const boost::system::error_code ec, unsigned long long newId)

Helper function for internal time ticker deadline timer to provide internal clock that calls ‘onTimeUpdate’ for every id even if slotTimeTick is called less often.

Parameters
  • ec: error code indicating whether deadline timer was cancelled
  • id: current train id

class InstanceChangeThrottler
#include <InstanceChangeThrottler.hh>

Receives instance new, update and gone messages and dispatches them to an interested part in “cycles” spaced by a given interval. Also takes care of removing redundant message sequences.

The second level hashes are the values of the root hash. Their keys are the types of the instances whose changes are on the third level hashes. The keys are the contents of the InstanceInfo.Type field in the instance change data. Typical values for second level keys would be “device”, “server” and “macro”.

Note
The hash is composed of three level of hashes. The root hash has the types of the instances changes as its keys . The possible values for this first level keys are “new”, “gone” and “update”. The three keys will always be present in the root hash, even when a particular cycle has no change of the given type to dispatch.

The third level hashes are the values of the second level hashes. Their keys will be the instanceIds in the instance change data. Those keys can be either a deviceId, a serverId or any other kind of Id, depending on the type of the instance. The the third level hashes will be the ones resulting from calling the InstanceChangeEntryEncoder function passed as an argument to the Throttler with the InstanceId and InstanceInfo in the instance change data. For “new” and “update” changes the third level hash will be an empty hash with the input InstanceInfo fields as attributes. For “gone” changes the third level hash will not be empty and will have the same lay-out as the input InstanceInfo hash.

Inherits from std::enable_shared_from_this< InstanceChangeThrottler >

Public Functions

void submitInstanceNew(const std::string &instanceId, const karabo::data::Hash &instanceInfo)

Submits an instance new change for dispatching by the throttler.

Parameters
  • instanceId: The id of the instance the new change refers to.
  • instanceInfo: Information about the instance the new change refers to.

void submitInstanceUpdate(const std::string &instanceId, const karabo::data::Hash &instanceInfo)

Submits an instance update change for dispatching by the throttler.

Parameters
  • instanceId: The id of the instance the update change refers to.
  • instanceInfo: Information about the instance the update change refers to.

void submitInstanceGone(const std::string &instanceId, const karabo::data::Hash &instanceInfo)

Submits an instance gone change for dispatching by the throttler.

Parameters
  • instanceId: The id of the instance the gone change refers to.
  • instanceInfo: Information about the instance the gone change refers to.

unsigned int cycleIntervalMs() const

The interval, in milliseconds, between cycles of the throttler.

unsigned int maxChangesPerCycle() const

The maximum number of instance changes entries to be dispatched per throttler cycle. If this limit is reached before the throttler interval elapses, a cycle is started immediately to dispatch the changes to the registered handler.

std::string getInstChangeTypeStr(InstChangeType changeType) const

Returns the string representation of a given InstChangeType value.

void flush()

Flushes the throttler by making it dispatch the instance changes it has stored asap.

Note
this is a wrapper for the private flushThrottler(bool) method.

Public Static Functions

std::shared_ptr<InstanceChangeThrottler> createThrottler(const InstanceChangeHandler &instChangeHandler, unsigned int cycleIntervalMs = 500u, unsigned int maxChangesPerCycle = 100)

InstanceChangeThrottler factory.

Return
A shared pointer to an InstanceChangeThrottler.
Note
The Throttler only has a private constructor; every instantiation of a Throttler must come from this factory method. It takes care of initializing the newly instantiated Throttler.
Note
instChangeEntryEncoder has been added to allow the Throttler to call DeviceClient::prepareTopologyEntry without directly knowing DeviceClient.
Parameters
  • instChangeHandler: The handler for instance change events dispatched by the Throttler.
  • cycleIntervalMs: The interval in milliseconds between throttler cycles.
  • maxChangesPerCycle: The maximum number of instance changes entries to be dispatched per throttler cycle. If this limit is reached before the throttler interval elapses, a cycle is started immediately to dispatch the changes to the handler.

Private Functions

Hash instNewUpdateEncoder(const std::string &instanceId, const karabo::data::Hash &instanceInfo) const

Encodes the instanceInfo hash into the format that the Throttler uses internally for changes of type NEW and UPDATE.

Return
a hash whose only key is the instanceId, with the keys/values in instanceInfo as attributes and an empty hash as the only value.
Parameters
  • instanceId: the id of the instance the change is about.
  • instanceInfo: the instanceInfo hash as used by the karabo GUI.

karabo::data::Hash instGoneEncoder(const std::string &instanceId, const karabo::data::Hash &instanceInfo) const

Encodes the instanceInfo hash into the format that the Throttler uses internally for changes of type GONE.

Return
a hash whose only key is the instanceId and whose only value is the instanceInfo hash.
Parameters
  • instanceId: the id of the instance the change is about.
  • instanceInfo: the instanceInfo hash as used by the karabo GUI.

void addChange(InstChangeType changeType, const std::string &instanceId, const karabo::data::Hash &instanceInfo)

Adds an instance change to m_instChanges.

As part of the addition, performs some optimizations to the set of events already in the hash. It can happen that the new change actually “cancels” a set of changes that had been previously added. An example: an ‘instance gone’ event can “cancel” all the ‘instance new’ and ‘instance update’ events related to the same instance; in this scenario, the ‘addition’ of the gone event would actually consist of the removal of the other events related to the same instance.

Parameters
  • changeType:
  • instanceId:
  • instanceInfo:

void runThrottlerCycleAsync(const boost::system::error_code &e)

Throttler cycle execution. For each cycle, the throttler dispatches the instances changes hash.

Parameters
  • e: Error code passed by the boost::asio::deadline_timer ticking mechanism.

void kickNextThrottlerCycleAsync()

Schedules the next throttler event dispatching cycle.

void flushThrottler(bool kickNextCycle = true)

Flushes the throttler by running its dispatching loop immediately.

Note
Assumes that the mutex for acessing instanceChange data is acquired by a caller (either the direct caller or another caller down the activation stack).
Parameters
  • if: true, the next throttler cycle is scheduled after the flush completes.

Private Members

karabo::data::Hash m_instChanges

A Hash with all the instances changes to be dispatched by the Throttler in its next cycle.

Description for the Hash format can be found in the class documentation.

class Lock

Public Functions

Lock(std::weak_ptr<karabo::xms::SignalSlotable> sigSlot, const std::string &deviceId, bool recursive = false)

Create a lock on a device. Throws a karabo::util::LockException if the lock cannot be acquired

Parameters
  • sigSlot: a SignalSlotable instance to use for locking the remote device
  • deviceId: the deviceId of the device to lock
  • recursive: allow recursive locking if true

Lock(const Lock &other)

Copy construction is disabled

Parameters
  • other:

Lock(Lock &&other)

Move construction will invalidate the lock being moved.

Parameters
  • other:

~Lock()

The destructor unlocks the device the lock is held on if the lock is valid. It is called explictly if the lock was stolen and will then throw a karabo::util::LockException

void lock(bool recursive = false) const

Reacquire a lock if this lock was previously unlocked

Parameters
  • recursive: allow recursive locking if true

void unlock() const

Unlock this lock

bool valid() const

Returns if this lock is currently valid. Note that the locked device will be queried through the distributed system when asking for lock validity.

Private Functions

void lock_impl(bool recursive) const

Perform locking. Calling this function leads to the following remote calls:

1) check if we are allowed to lock: the lockedBy field on the remote device is either empty, or if recursive == true contains the lock requestor’s device id

2) request locking, e.g. set the lockedBy field. This can still fail if another device locked in between

3) check if we are the lock holder: lockedBy should now contain our device id

Parameters
  • recursive: allow recursive locking if true

void unlock_impl() const

Simply calls the clearLock slot on the locked device if we are the lock-holder

struct QueueWorker

Inherits from karabo::core::BaseWorker< karabo::data::Hash::Pointer >

class Runner
#include <Runner.hh>

The Runner class starts device-servers in the distributed system.

The Runner class instantiates device-servers in the distributed system. It parses command line arguments to deduce configuration.

Public Static Functions

DeviceServer::Pointer instantiate(int argc, const char **argv)

Instantiates a device server taking command line arguments into account

Users of this function must check the returned pointer for validity. The pointer may be empty in case the help option is given.

Return
Pointer to device server instance (may be empty)
Parameters
  • argc: Number of commandline arguments
  • argv: String array of commandline options

struct Worker
#include <Worker.hh>

A worker that passes any data received in its queue to a callback function working asynchronously working in a separate thread.

Inherits from karabo::core::BaseWorker< bool >

Public Functions

Worker(const std::function<void()> &callbackint delay = -1, int repetitions = -1, )

Instantiate a worker with a callback function to work on data. See Worker::WorkerBase for options

Parameters
  • callback:
  • delay:
  • repetitions:

The karabo::devices Namespace

namespace karabo::devices

Namespace for package core

Typedefs

typedef nlohmann::json json
using karabo::devices::BeginTemporarySessionHandler = typedef std::function<void(const BeginTemporarySessionResult&)>
using karabo::devices::ExpirationHandler = typedef std::function<void(const ExpiredTemporarySessionInfo&)>

Handler for expired temporary session events.

using karabo::devices::EminentExpirationHandler = typedef std::function<void(const EminentExpirationInfo&)>

Handler for “temporary session about to expire” events.

using karabo::devices::AsyncHandler = typedef std::function<void()>
using karabo::devices::InfluxResponseHandler = typedef std::function<void(const karabo::net::HttpResponse&)>

Enums

enum RejectionType

Values:

TOO_MANY_ELEMENTS = 0
VALUE_STRING_SIZE
PROPERTY_WRITE_RATE
SCHEMA_WRITE_RATE
FAR_AHEAD_TIME

Functions

template <class T>
void addToSetOrCreate(Hash &h, const std::string &key, const T &object)

Helper function used below: Add object to set<T> at key’s position in h - if no such key exists, create one

static void trim_vector_elements(std::vector<std::string> &v)

Variables

constexpr karabo::data::Schema::AccessLevel MAX_LOGIN_ACCESS_LEVEL = karabo::data::Schema::AccessLevel::EXPERT
constexpr karabo::data::Schema::AccessLevel MAX_TEMPORARY_SESSION_ACCESS_LEVEL = karabo::data::Schema::AccessLevel::EXPERT
constexpr unsigned int CHECK_TEMPSESSION_EXPIRATION_INTERVAL_SECS = 5U
const unsigned int defVectorMaxSize = 100
struct BeginTemporarySessionResult

Inherits from karabo::net::OneTimeTokenAuthorizeResult

class DataLogger
#include <DataLogger.hh>

A DataLogger device is assigned devices in the distributed system and logs their slow control data.

DataLoggers are managed by the karabo::devices::DataLoggerManager.

Each is able to log any number of devices. This list can be specified at instantiation, but can also dynamically changed by the slots slotTagDeviceToBeDiscontinued and slotAddDevicesToBeLogged. When the logger is ready to log data, its state changes from INIT to NORMAL.

Inherits from karabo::core::Device

Subclassed by karabo::devices::FileDataLogger, karabo::devices::InfluxDataLogger

Protected Functions

void initializeLoggerSpecific()

Do some actions here that may require asynchronous logic … and, finally, startConnection() should be called This function may be overridden by derived classes but at the end the ‘startConnection’ function should be called as a last step of initialization

bool removeFrom(const std::string &str, const std::string &vectorProp)

Helper to remove an element from a vector<string> element - needs protection by m_perDeviceDataMutex. Note that if the same element is in the vector more than once, only the first one is removed.

Return
whether could be removed
Parameters
  • str: the element to remove
  • vectorProp: the key of the vector<string> element

bool appendTo(const std::string &str, const std::string &vectorProp)

Helper to add an element to a vector<string> element - needs protection by m_perDeviceDataMutex. Note that if the same element is already in, it will not be added again.

Return
whether it was added (i.e. false if ‘str’ was already in the vectorProperty
Parameters
  • str: the element to add
  • vectorProp: the key of the vector<string> element

void preDestruction()

Override preDestruction from Device class

Private Functions

void slotTagDeviceToBeDiscontinued(const std::string &reason, const std::string &deviceId)

FIXME: Update text This tags a device to be discontinued, three cases have to be distinguished

(a) Regular shut-down of the device (wasValidUpToNow = true, reason = ‘D’) (b) Silent death of the device (wasValidUpToNow = true, reason = ‘D’) (c) Start-up of this (DataLogger) device whilst the device was alive (wasValidUpToNow = false, reason = ‘L’)

This slot will be called by the DataLoggerManager

void handleConfigConnected(const DeviceData::Pointer &data, const std::shared_ptr<std::atomic<unsigned int>> &counter)

Helper for connecting to signalChanged.

void updateTableAndFlush(const std::shared_ptr<SignalSlotable::AsyncReply> &aReplyPtr)

Flush data in file hierarchy or to the database tables

Parameters
  • aReplyPtr: if pointer to an AsyncReply that (if non-empty) has to be called without argument when done

virtual void flushImpl(const std::shared_ptr<SignalSlotable::AsyncReply> &aReplyPtr) = 0

“Flush” data accumulated in the internal cache to the external storage (file, database,…)

bool allowLock() const

This device may not be locked

Return
false

class DataLoggerManager
#include <DataLoggerManager.hh>

The DataLoggerManager manages device loggers in the distributed system.

In the Karabo distributed system two types of data archiving exist:

  • run associated archival of scientific data through the DAQ system
  • constant device state and slow control logging using the data logger services

This device manages the data loggers used in the second logging scenario. It is the central configuration point to set via its expected parameters the

  • flushInterval: at which loggers flush their data to disk
  • maximumFileSize: of log files after which a new log file chunk is created
  • directory: the directory into which loggers should write their data
  • serverList: a list of device servers which each runs one logger. Each device in the distributed system is assigned to one logger. They are added to the loggers on these servers in a round robin fashion, allowing for load balancing. Assignment is made permanent in a loggermap.xml file that is regularly written to disk. This allows to distribute the servers in the serverList to be distributed among several hosts and still have fixed places for reading the data back.

Inherits from karabo::core::Device

Private Functions

std::pair<bool, std::string> checkSummary()

Assemble summary from m_checkStatus and clear for next use. The boolean returned tells whether the status requires operator attention.

void printLoggerData() const

Print internal cash of logger status. Needs to be protected by m_strand.

void forceDeviceToBeLogged(const std::string &deviceId)

If deviceId’s logging status is fishy, re-add to its logger. Needs to be protected by m_strand.

Parameters
  • deviceId: the fishy device

void slotGetLoggerMap()

Request the current mapping of loggers to servers. The reply contains a Hash where the the logger id is the key and the server id the value.

std::string loggerServerId(const std::string &deviceId, bool addIfNotYetInMap)

Get id of server that should run logger for given device that should be logged

Return
the server id - can be empty if addIfNotYetInMap == false
Parameters
  • deviceId: the device that should be logged
  • addIfNotYetInMap: whether to create a server/logger relation in the logger map in case it does not yet exist for deviceId

std::string serverIdToLoggerId(const std::string &serverId) const

Get id of DataLogger running on server with id ‘serverId’

std::string loggerIdToServerId(const std::string &loggerId) const

Get id of server that should run logger with id ‘loggerId’

bool allowLock() const

This device may not be locked

Return
false

void evaluateBlockedOnStrand(const karabo::data::Hash &oldList, const karabo::data::Hash &newList)

Evaluate old and new configuration in order to possibly start/stop archiving for some devices/device classes.

Parameters
  • oldList: old configuration for blocked devices/classes
  • newList: new configuration for blocked devices/classes

std::vector<karabo::data::Hash> makeLoggersTable()

create a vector of hashes that can be loaded in a table from the Hash with the mapping device -> logger

Private Members

karabo::data::Hash m_checkStatus

1st level keys: entries in m_serverList, 2nd level: “state”, “backlog”, “beingAdded” and “devices”

std::unordered_map<std::string, std::set<std::string>> m_knownClasses

Keep track of all important stuff during check.

karabo::net::Strand::Pointer m_strand

to be accessed on the strand

const std::string m_blockListFile

Hash with ‘deviceIds’ and ‘classIds’ entries.

class DataLogReader
#include <DataLogReader.hh>

DataLogReader devices read archived information from the Karabo data loggers.

DataLogReader devices read archived information from the Karabo data loggers. They are managed by karabo::devices::DataLoggerManager devices. Calls to them should usually not happen directly, but rather through a karabo::core::DeviceClient and it’s karabo::core::DeviceClient::getPropertyHistory and karabo::core::DeviceClient::getConfigurationFromPast methods.

Inherits from karabo::core::Device

Subclassed by karabo::devices::FileLogReader, karabo::devices::InfluxLogReader

Protected Functions

void slotGetPropertyHistory(const std::string &deviceId, const std::string &property, const karabo::data::Hash &params)

Use this slot to get the history of a given property The slot replies a vector of Hashes where each entry consists of a Hash with a key “v” holding the value of the property at timepoint signified in “v“‘s attributes using a format compatible with karabo::data::Timestamp::fromHashAttributes

Parameters
  • deviceId: for which to get the history for
  • property: path to the property for which to get the history from
  • params: Hash containing optional limitations for the query:
    • from: iso8601 timestamp indicating the start of the interval to get history from
    • to: iso8601 timestamp indicating the end of the interval to get history from
    • maxNumData: maximum number of data points to retrieve starting from start

void slotGetConfigurationFromPast(const std::string &deviceId, const std::string &timepoint)

Request the configuration Hash and schema of a device at a given point at time. Depending on the device status and on the availability of logged data, the configuration and schema returned will be:

  1. If the device was online and logging data at the given timepoint, the configuration and the schema will be the ones that were active at the timepoint;
  2. If the device was offline at the given timepoint, but there is data logged for it before the timepoint, the last active configuration and schema before that timepoint will be returned;
  3. If the device was offline at the given timepoint and there’s no data logged before the timepoint, an empty configuration and an empty schema will be returned.

The slot replies with a tuple of 4 values. The first two are the Hash and Schema objects containing the configuration Hash and the corresponding device schema for timepoint. The third is a boolean whose true value indicates the device was online and actively logging data at the timepoint. The fourth value is the string form of the timepoint for the configuration returned and will be the latest timestamp among the timestamps of the properties in the configuration returned.

Parameters
  • deviceId: of the device to get the configuration from
  • timepoint: in iso8601 format for which to get the information

An important note: if no configuration is found for the device at the timepoint (or before the timepoint), the third value in the reply will be false and the fourth will be the string form of the Epoch (01/01/1970 at 00:00:00).

void onOk()

helper functions to handle state transition in derived classes: onOk: sets the State to ON

const string onException(const std::string &message)

helper functions to handle state transition in derived classes: onException: sets the State to ERROR, logs the exception trace to status and to the Karabo Logs

Return
the exception trace
Parameters
  • message: a string to be prepended to the trace

struct DeviceData

Inherits from std::enable_shared_from_this< DeviceData >

Subclassed by karabo::devices::FileDeviceData, karabo::devices::InfluxDeviceData

Public Functions

virtual void handleChanged(const karabo::data::Hash &config, const std::string &user) = 0

Called when configuration updates arrive for logging

Parameters
  • config: a Hash with the updates and their timestamps
  • the: user responsible for this update - if any

virtual void handleSchemaUpdated(const karabo::data::Schema &schema, const karabo::data::Timestamp &stamp) = 0

Called when a Schema update arrive for logging

Parameters
  • schema: - the new one
  • stamp: - the timestamp to be assigned for that update

void getPathsForConfiguration(const karabo::data::Hash &configuration, const karabo::data::Schema &schema, std::vector<std::string> &paths) const

Retrieves the paths of the leaf nodes in a given configuration. The paths are returned in ascending order of their corresponding nodes timestamps.

Note
karabo::devices::DataLogReader depends on the configuration items being properly sorted in time to retrieve configuration changes.
Parameters
  • configuration: A configuration with the nodes corresponding to the paths.
  • schema: The schema for the configuration hash.
  • paths: The paths of the leaf nodes in the configuration, sorted by nodes timestamps.

class FileDataLogger

Inherits from karabo::devices::DataLogger

Private Functions

void flushImpl(const std::shared_ptr<SignalSlotable::AsyncReply> &aReplyPtr)

“Flush” data accumulated in the internal cache to the external storage (file, database,…)

struct FileDeviceData

Inherits from karabo::devices::DeviceData

Public Functions

void handleChanged(const karabo::data::Hash &config, const std::string &user)

Called when configuration updates arrive for logging

Parameters
  • config: a Hash with the updates and their timestamps
  • the: user responsible for this update - if any

bool updatePropsToIndex()

Helper function to update data.m_idxprops, returns whether data.m_idxprops changed.

void ensureFileClosed()

Helper to ensure archive file is closed. Must only be called from functions posted on ‘data.m_strand’.

std::pair<bool, size_t> ensureFileOpen()

Helper to ensure archive file (m_configStream) is open. Must only be called from functions posted on ‘data.m_strand’.

Return
pair of * whether it is a new file (in contrast to a re-opened existing one)
  • current file position, size_t(-1) tells that file could not be opened (permissions?)

void handleSchemaUpdated(const karabo::data::Schema &schema, const karabo::data::Timestamp &stamp)

Called when a Schema update arrive for logging

Parameters
  • schema: - the new one
  • stamp: - the timestamp to be assigned for that update

struct FileLoggerIndex
#include <FileLogReader.hh>

A compound for representing indexes in text file logged data.

class FileLogReader
#include <FileLogReader.hh>

A reader for data logs stored in text files by the class karabo::devices::FileDataLogger.

Inherits from karabo::devices::DataLogReader

Private Functions

void readToHash(karabo::data::Hash &hashOut, const std::string &path, const karabo::data::Timestamp &timestamp, const std::string &type, const std::string &value) const

Internal helper: Place ‘value’ interpreted as ‘type’ (and with given ‘timestamp’) into ‘hashOut’ at ‘path’.

std::pair<bool, FileLoggerIndex> findLoggerIndexTimepoint(const std::string &deviceId, const std::string &timepoint)

Retrieves, from the logger index, the event of type “device became online” that is closest, but not after a given timepoint. The retrieved logger index event can be used as a starting point for sweeping the device log for the last known given configuration at that timepoint.

Return
a pair whose ‘first’ is a boolean that indicates whether configuration was active at the timepoint (true) or whether it is a configuration from the most recent activation of the device prior to the timepoint because the device was not active logging at the timepoint. The pair’s ‘second’ value is the logger index of the given device that is the closest “device became online” event that is not after the given timepoint.
Parameters
  • deviceId: the device whose logger index event should be retrieved.
  • timepoint: the timepoint that will be used as the reference to find the logger index event.

FileLoggerIndex findNearestLoggerIndex(const std::string &deviceId, const karabo::data::Epochstamp &timepoint, const bool before)

Find logger closest index from archive_index.txt file that is before/after (according to ‘before’) ‘timepoint’. If there is none before (after) but that is asked for, take the one just after (before).

size_t findPositionOfEpochstamp(std::ifstream &f, double stamp, size_t left, size_t right, bool preferBefore)

Find index of that MetaData::Record in ‘f’ (between indices ‘left’ and ‘right’) that matches the Epochstamp ‘stamp’. In case no exact match (within 1 ms) is found, ‘preferBefore’ decides whether the index with a smaller or larger time stamp is returned.

void extractTailOfArchiveIndex(const std::string &tail, FileLoggerIndex &entry) const

Helper to extract DataLoggerIndex values out of the tail of a line in archive_index.txt. The tail is everything after event, timestampAsIso8061 and timestampAsDouble. The entry has to be partly filled (m_event and m_epoch) and partly serves as output (m_train, m_position, m_user and m_fileindex). Works for lines written to archive_index.txt by >= 1.5

class GuiServerDevice
#include <GuiServerDevice.hh>

The GuiServerDevice mediates between GUI clients and the distributed system.

The GuiServerDevice acts as a mediator between the distributed system and GUI clients, which connect to it through (tcp) channels. The device centrally manages updates from the distributed system and pushes them to the clients. Conversely, it handles requests by clients and passes them on to devices in the distributed system.

Inherits from karabo::core::Device

Private Functions

void initUsersActionsLog()

Initializes the user actions log.

The log contains entries describing the writing actions the GUI Server performed upon request of a user. It is separated from the remaining device server logs.

void logUserAction(const WeakChannelPointer &channel, const std::string &entryText)

Adds an entry with a given text to the user actions log.

Parameters
  • channel: The TCP Channel connecting the GUI Server to the GUI Client that originated the action execution request.
  • entryText: A description of the action (and possibly its parameters)

void loggerMapConnectedHandler()

Wrapping requestNoWait

void postReconfigure()

Called if configuration changed from outside.

void startDeviceInstantiation()

Starts the deadline timer which throttles device instantiation.

void startNetworkMonitor()

Starts the deadline timer which triggers network stats collection

void startMonitorConnectionQueues(const karabo::data::Hash &currentSuspects)

Starts the deadline timer which monitors connection queues

Parameters
  • currentSuspects: Hash with pending message counts - keys are bad client addresses

void collectNetworkStats(const boost::system::error_code &error)

Perform network stats collection

void deferredDisconnect(WeakChannelPointer channel)

writes a message to the specified channel with the given priority

Parameters
  • channel:
  • message:
  • prio: Deferred disconnect handler.

void safeAllClientsWrite(const karabo::data::Hash &message, int prio = LOSSLESS)

writes message to all channels connected to the gui-server device

Parameters
  • message:
  • prio:

void sendLoginErrorAndDisconnect(const karabo::net::Channel::Pointer &channel, const std::string &userId, const std::string &cliVersion, const std::string &errorMsg)

Sends a login error message to the client currently connected and closes the connection after a time interval elapses.

Parameters
  • channel: the channel the client to be notified and disconnected is connected.
  • userId: the id of the user whose login attempt failed.
  • cliVersion: the version of the GUI client attempting to login.
  • errorMsg: the error message to be sent to the client.

void onError(const karabo::net::ErrorCode &e, WeakChannelPointer channel)

an error specified by ErrorCode e occurred on the given channel. After an error the GUI-server will attempt to disconnect this channel.

Parameters
  • e:
  • channel:

bool violatesReadOnly(const std::string &type, const karabo::data::Hash &info)

validate the incoming type and info hash if a readOnly command is requested to be executed

Return
bool whether the request violates read-only restrictions
Parameters
  • type:
  • info:

bool isProjectLoadingReplyType(const std::string &replyType)

Checks whether a given reply type requested by a GUI Client is for a request involved in the Load Project operation.

Return
bool is the reply type involved in the Load Project operation?
Parameters
  • replyType: the string that specifies the reply type.

bool violatesClientConfiguration(const std::string &type, WeakChannelPointer channel)

validates the client configuration

currently only validating the type versus the client version.

Return
bool whether the request violates client validation
Parameters
  • type:
  • channel:

void onGuiInfo(const karabo::data::Hash &hash)

an info further specified by hash occurred on a connection to a GUI client. The GUI-server will attempt to forward the info to the debug channel of the GUI client.

Parameters
  • hash:

void onConnect(const karabo::net::ErrorCode &e, karabo::net::Channel::Pointer channel)

connects a client on to the GUI server on channel. The channel is registered with two priority handlers: remove oldest and loss-less. The onRead and onError handlers are registered to handle incoming data and faults on the channel. Both upon successful completion and exceptions in the process the acceptor socket of the GUI-server is re-registered so that new client connections may be established.

Parameters
  • e: holds an error code if any error occurs when calling this slot
  • channel:

void registerConnect(const karabo::util::Version &version, const karabo::net::Channel::Pointer &channel, const std::string &userId = "", const std::string &oneTimeToken = "")

Creates an internal ChannelData structure mapped to the TCP Channel in charge of the connection between the GUI Client and the GUI Server. Also updates the number of connected clients property of the GUI Server.

Called after a successful user authorization based on a one-time token (when the GUI Server requires user authentication).

For GUI Servers that don’t require user authentication / authorization, it’s called right after the message of “type” “login” is received by the server and the client’s version is verified as one being supported by the server. The client’s version verification is also performed by a GUI Server that requires authentication (right before the token validation).

Note
The access level is not being saved in the internal ChannelData structure because all the access level enforcement is currently client side only. If any enforcement is required on the server side, the access level information must be stored in ChannelData and updated whenever the user downgrades his/her access level on the GUI client.
Parameters
  • version: the version of the connected GUI client
  • channel: the TCP channel for the connection being registered
  • userId: the ID of the user logged in the connected GUI Client
  • oneTimeToken: the one-time token resulting from the user authentication previously triggered by GUI client - the token is used by the GUI Server to validate the authentication and to authorize the user (send the user’s login access level back to the GUI Client).

void onWaitForLogin(const karabo::net::ErrorCode &e, const karabo::net::Channel::Pointer &channel, karabo::data::Hash &info)

Handler for login messages expected to be sent by a GUI Client right after it establishes a TCP connection to the GUI Server.

Keeps discarding and logging warnings for any message whose “type” is not “login”. When such an unexpected message is received, the GUI server binds this handler again to the TCP channel.

When a message of “type” of “login” is received, its handling is delegated to onLogin(const karabo::net::ErrorCode&, const karabo::net::Channel::Pointer&, karabo::data::Hash&)

Parameters
  • e: holds an error code if the eventloop cancels this task or the channel is closed
  • channel: the TCP channel for the recently established connection with a GUI client
  • info: a Hash containing the message “type” and, for “login” messages, some info related to the login process, like the version of the connected GUI client and some user related info, like the userID or the oneTimeToken the GUI Client received upon successfully authenticating the user logging in.

void onLogin(const karabo::net::Channel::Pointer &channel, const karabo::data::Hash &info)

Handles a login request of a user on a GUI client. If the login credentials are valid, the current system topology is sent to the client.

Note
for clients >= 2.20 a login message with no OneTimeToken is interpreted by an authenticated GUI Server as a request for a read-only session. The GUI Server will respond to such messages with Access Level OBSERVER and the read-only flag set to true. For login messages with OneTimeToken the read-only flag will be always set to false and the Access Level will be the one returned by the Karabo Authentication Server.
Parameters
  • channel: the TCP channel between the GUI client and the GUI server
  • info: Hash with information needed to validate the login request.

void onTokenAuthorizeResult(const WeakChannelPointer &channel, const std::string &userId, const karabo::util::Version &cliVersion, const std::string &oneTimeToken, const bool isLoginOverLogin, const karabo::net::OneTimeTokenAuthorizeResult &authResult)

Handles the result of the authorize one-time token operation performed as part of a GUI client login on behalf of an authenticated user.

Note
a login over an existing user session has the potentially “desired side-effect” of just “refreshing” an existing user session, by updating its start time. This is useful when the maximum retention time for a token session is about to expire - the user can be instructed to refresh his/her login to keep going on the same GUI Client session. This is the primary reason for the GUI Server not caring if the login over login corresponds to a user change or not.
Parameters
  • channel: the communication channel established with the GUI client logging in.
  • userId: the ID of the user on whose behalf the login is being made.
  • cliVersion: the version of the GUI client logging in.
  • oneTimeToken: the one-time token sent by the GUI client logging in.
  • isLoginOverLogin: is the token being authorized in the context of a login over an existing user session (true) or of a login that is starting a completely new user session?
  • authResult: the result of the one-time token authorization operation to be handled.

void onTemporarySessionExpiration(const ExpiredTemporarySessionInfo &info)

Handles a temporary session expired event communicated by the internal instance of the GuiServerTemporarySessionManager.

The expiration is handled by sending a message of type “onTemporarySessionExpired” to the client associated with the expired token. The message carries a Hash with paths “expiredToken” and “expirationTime”.

Parameters
  • info: data about the expired temporary session.

void onEndTemporarySessionNotice(const EminentExpirationInfo &info)

Handles a “temporary session about to expire” event.

The eminent temporary session end is handled by sending a message of type “onEndTemporarySessionNotice” to the client associated with the about to expire token. The message carries a Hash with paths “toExpireToken” and “secondsToExpiration”.

Parameters
  • info: data about the temporary session about to expire.

void onRead(const karabo::net::ErrorCode &e, WeakChannelPointer channel, karabo::data::Hash &info, const bool readOnly)

handles incoming data in the Hash info from channel. The further actions are determined by the contents of the type property in info. Valid types and there mapping to methods are given in the following table:

onRead allowed types
type resulting method call
reconfigure onReconfigure
execute onExecute
getDeviceConfiguration onGetDeviceConfiguration
getDeviceSchema onGetDeviceSchema
getClassSchema onGetClassSchema
initDevice onInitDevice
killServer onKillServer
killDevice onKillDevice
startMonitoringDevice onStartMonitoringDevice
stopMonitoringDevice onStopMonitoringDevice
getPropertyHistory onGetPropertyHistory
getConfigurationFromPast onGetConfigurationFromPast
subscribeNetwork onSubscribeNetwork
requestNetwork onRequestNetwork
info onGuiInfo
requestGeneric onRequestGeneric
subscribeLogs <no action anymore>
setLogPriority onSetLogPriority
beginTemporarySession onBeginTemporarySession
endTemporarySession onEndTemporarySession

Both upon successful completion of the request or in case of an exception the onRead function is bound to the channel again, maintaining the connection of the client to the gui-server.

Parameters
  • e: holds an error code if the eventloop cancel this task or the channel is closed
  • channel:
  • info:
  • readOnly:

void setTimeout(karabo::xms::SignalSlotable::Requestor &requestor, const karabo::data::Hash &input, const std::string &instanceKey)

Sets the appropriate timeout to a Requestor

If input has a “timeout” key, set the maximum value of that and the gui server timeout on the requestor, except if input.get<std::string>(instanceKey) is one instance of the classes in “ignoreTimeoutClasses”.

void forwardReconfigureReply(bool success, WeakChannelPointer channel, const karabo::data::Hash &input)

Callback helper for onReconfigure

Parameters
  • success: whether call succeeded
  • channel: who requested the call
  • input: will be copied to the key input of the reply message

void forwardHashReply(bool success, WeakChannelPointer channel, const karabo::data::Hash &info, const karabo::data::Hash &reply)

Callback helper for generic actions called by the gui server.

Parameters
  • success: whether call succeeded
  • channel: who requested the call
  • info: the input info Hash
  • reply: the reply from the remote device or an empty Hash on failure

void onRequestGeneric(WeakChannelPointer channel, const karabo::data::Hash &info)

Request a generic action internally. Generic interface to call slots that take a single Hash as argument and reply with a single Hash.

Parameters
  • channel: from which the request originates
  • info: is a Hash that should contain the slot information.
    • type: requestGeneric
    • instanceId: the instanceId to be called
    • slot: the slot name of the instance
    • empty: if this property is provided, the input Hash is not bounced back
    • replyType (optional): the value of the key type in the reply to the client
    • timeout (optional) [s]: account for the slot call a specified timeout in seconds!
    • args: The Hash containing the parameters for the slot call

The forwardHashReply method is used to relay information to the gui client.

Returns:

In the default case, the return Hash is composed as follows::

  • success: boolean to indicate if the generic request was successful
  • reason: information on the error if not successful otherwise empty
  • type: if specified in the input Hash, the replyType is used otherwise requestGeneric
  • request: the full input Hash information, including args
  • reply: The reply Hash of the instanceId

.. note: If the info Hash from the client provides an empty property, an empty Hash is sent back to the client instead of the input Hash.

void onReconfigure(WeakChannelPointer channel, const karabo::data::Hash &info)

Calls the Device::onReconfigure slot on the device specified in info.

The info should have the following entries:

  • string at deviceId defines the target device
  • Hash at configuration is the configuration update to apply
  • bool at reply: if given and true, success or failure will be reported back to channel by a message of type reconfigureReply that contains
    • input: the Hash given here as info
    • success: bool whether reconfiguration succeeded
    • failureReason: string with failure reason
  • optional int at timeout: if a reply should be reported back, defines seconds of timeout. In case timeout is missing, timeout errors will report success as true but provides a failureReason mentioning the timeout

Parameters
  • channel: to potentially send “reconfigureReply”
  • info:

void onBeginTemporarySession(WeakChannelPointer channel, const karabo::data::Hash &info)

Handles a message of type “beginTemporarySession” by starting a temporary session on top of the current user-authenticated session (if there’s one). The session begining is an asynchronous operation whose completion (either successful or not) will be handled by the onBeginTemporarySessionResult method.

Parameters
  • channel: the TCP channel connecting to the client that requested the begining of the temporary session.
  • info: a Hash which is supposed to contain an “temporarySessionToken” whose value is a one-time token that must be successfully authorized for the temporary session to be started.

void onBeginTemporarySessionResult(WeakChannelPointer channel, karabo::data::Schema::AccessLevel levelBeforeTemporarySession, const BeginTemporarySessionResult &result)

Handles the result of an “beginTemporarySession” request sent by a connected client.

Parameters
  • channel: the TCP channel connecting to the client that requested the temporary session that will be used to send a message of type “onBeginTemporarySession” with the begin operation results back to the client.
  • levelBeforeTemporarySession: sent by the client as part of the begin temporary session request to be sent back when the temporary session ends.
  • result: the results of the begin temporary session operation that will be sent back to the client.

void onEndTemporarySession(WeakChannelPointer channel, const karabo::data::Hash &info)

Handles a message of type “endTemporarySession” by ending the current temporary session (if there’s one). The end of the session is performed synchronously (there’s no I/O involved) and its results are transmitted back to the client through a message of type “onEndTemporarySession”.

Note
the hash with the results of the ending operation sent back to the requesting client has the fields “success”, “reason” and “temporarySessionToken” (an echo of the token provided in the request).
Parameters
  • channel: the TCP channel connecting to the client that requested the end of the temporary session. Will be used to send the response back to the client.
  • info: a Hash which is supposed to contain an “temporarySessionToken” whose value is a one-time token that must match the one associated to the temporary session being terminated.

void forwardExecuteReply(bool success, WeakChannelPointer channel, const karabo::data::Hash &input)

Callback helper for onExecute

Parameters
  • success: whether call succeeded
  • channel: who requested the call
  • input: will be copied to the key input of the reply message

void onExecute(WeakChannelPointer channel, const karabo::data::Hash &info)

Calls a command slot on a specified device.

The info should have the following entries:

  • string at deviceId defines the target device
  • string at command is the slot to call
  • bool at reply: if given and true, success or failure will be reported back to channel by a message of type executeReply that contains
    • input: the Hash given here as info
    • success: bool whether execution succeeded
    • failureReason: string with failure reason
  • optional int at timeout: if a reply should be reported back, defines seconds of timeout. In case timeout is missing, timeout errors will report success as true but provides a failureReason mentioning the timeout
    Parameters
    • channel:
    • info:

void onInitDevice(WeakChannelPointer channel, const karabo::data::Hash &info)

Enqueues a future device instantiation. The relevant information will be stored in m_pendingDeviceInstantiations and initSingleDevice will take care of the actual instantiation when it is called by the instantiation timer.

Parameters
  • channel:
  • info:

void initSingleDevice(const boost::system::error_code &error)

Instructs the server at serverId to try initializing the device at deviceId as given in info. The reply from the device server is registered to the initReply callback.

NOTE: This should only be called by m_deviceInitTimer

Parameters
  • error:

void initReply(WeakChannelPointer channel, const std::string &givenDeviceId, const karabo::data::Hash &givenConfig, bool success, const std::string &message, bool isFailureHandler)

is the callback for the onInitDevice method. It is called upon reply from the device server handling the initialization request. The reply is passed to the calling channel in form of a hash message with type=initReply, deviceId, success and message fields.

Parameters
  • channel:
  • givenDeviceId:
  • givenConfig:
  • success:
  • message:

void onGetDeviceConfiguration(WeakChannelPointer channel, const karabo::data::Hash &info)

requests the current device configuration for deviceId specified in info and sends it back in a hash message on channel. The message contains the following fields: type=deviceConfiguration, deviceId and configuration. The configuration is retrieved using the device client interface.

Parameters
  • channel:
  • info:

void onKillServer(WeakChannelPointer channel, const karabo::data::Hash &info)

instructs the server specified by serverId in info to shutdown.

Parameters
  • info:

void onKillDevice(WeakChannelPointer channel, const karabo::data::Hash &info)

instructs the device specified by deviceId in info to shutdown.

Parameters
  • info:

void onStartMonitoringDevice(WeakChannelPointer channel, const karabo::data::Hash &info)

Registers a monitor on the device specified by deviceId in info Upon changes of device properties they will be forwarded to channel from a handler for changes in configurations of monitored devices that is kept internally by the gui-server.

Only one channel per client is maintained for passing monitoring information and only one monitor is registered by the gui-server for any number of clients monitoring deviceId.

After successful registration the current device configuration is returned by calling onGetDeviceConfiguration for channel.

Parameters
  • channel:
  • info:

void onStopMonitoringDevice(WeakChannelPointer channel, const karabo::data::Hash &info)

De-registers the client connected by channel from the device specified by deviceId in info. If this is the last channel monitoring deviceId the device is removed from the set of devices monitored by the device-client.

Parameters
  • channel:
  • info:

void onGetClassSchema(WeakChannelPointer channel, const karabo::data::Hash &info)

requests a class schema for the classId on the server specified by serverId in info. This is done through the device client. A hash reply is sent out over channel containing type=classSchema, serverId, classId and schema.

Parameters
  • channel:
  • info:

void onGetDeviceSchema(WeakChannelPointer channel, const karabo::data::Hash &info)

requests a device schema for the device specified by deviceId in info. This is done through the device client. A hash reply is sent out over channel containing type=deviceSchema, deviceId, and schema.

Parameters
  • channel:
  • info:

void onGetPropertyHistory(WeakChannelPointer channel, const karabo::data::Hash &info)

requests the history for a property on deviceId in the time range t0 and t1 as specified in info. Additional the maximum number of data points may be specified in maxNumData. The request is asynchronously sent to the device logger logging information for deviceId. The reply from the logger is then forwarded to the client on channel using the propertyHistory history callback.

Parameters
  • channel:
  • info:

void propertyHistory(WeakChannelPointer channel, bool success, const std::string &deviceId, const std::string &property, const std::vector<karabo::data::Hash> &data)

Callback for onGetPropertyHistory. It forwards the history reply in data for the property on deviceId to the client connected on channel. The hash reply is of the format type=propertyHistory, deviceId, property, success, data and failureReason which states the failure reason if any.

Parameters
  • channel:
  • success: whether the request succeeded
  • deviceId:
  • property:
  • data:

void onGetConfigurationFromPast(WeakChannelPointer channel, const karabo::data::Hash &info)

Request configuration for a device at point in time time as specified in info. The info hash can as well have a preview boolean which is send back to the client. The request is asynchronously sent to the device logger logging information for deviceId. The reply from the logger is then forwarded to the client on channel using the configurationFromPast history callback in case of success or configurationFromPastError for failures.

void configurationFromPast(WeakChannelPointer channel, const std::string &deviceId, const std::string &time, const bool &preview, const karabo::data::Hash &config, const karabo::data::Schema&, const bool configAtTimepoint, const std::string &configTimepoint)

Success callback for onGetDeviceConfiguration

void configurationFromPastError(WeakChannelPointer channel, const std::string &deviceId, const std::string &time)

Failure callback for onGetDeviceConfiguration

std::string getDataReaderId(const std::string &deviceId) const

Helper for history retrieval functions

Return
id of DataLogReader device to ask for history
Parameters
  • deviceId: of the device whose history is searched for

void onSubscribeNetwork(WeakChannelPointer channel, const karabo::data::Hash &info)

registers the client connected on channel to a pipe-lined processing channel identified by channelName in info in case subscribe is true. In case the pipe-lined processing channel is already connected to the gui-server no further action is taken. Otherwise, a new connection is opened, set to copy and dropping behaviour in case the gui-server is busy, and with a maximum update frequency as defined by the delayOnInput property of the gui server. Network data from the pipe-lined processing connection is handled by the onNetworkData callback.

In this way only one connection to a given pipe-lined processing channel is maintained, even if multiple gui-clients listen to it. The gui-server thus acts as a kind of hub for pipe-lined processing onto gui-clients.

If subscribe is set to false, the connection is removed from the list of registered connections, but is kept open.

Parameters
  • channel:
  • info:

void onSubscribeLogs(WeakChannelPointer channel, const karabo::data::Hash &info)

Kept to reply back that log subscription not supported anymore after 2.16.X

Parameters
  • channel:
  • info:

void onSetLogPriority(WeakChannelPointer channel, const karabo::data::Hash &info)

sets the Log priority on a server. The info hash should contain a priority string and a instanceId string.

Parameters
  • channel:
  • info:

void forwardSetLogReply(bool success, WeakChannelPointer channel, const karabo::data::Hash &input)

Callback helper for onSetLogPriority

Parameters
  • success: whether call succeeded
  • channel: who requested the call
  • input: will be copied to the key input of the reply message

void onRequestNetwork(WeakChannelPointer channel, const karabo::data::Hash &info)

Receives a message from the GUI client that it processed network data from an output channel with name channelName in the info Hash.

Parameters
  • channel:
  • info:

void onNetworkData(const std::string &channelName, const karabo::data::Hash &data, const karabo::xms::InputChannel::MetaData &meta)

handles data from the pipe-lined processing channels the gui-server is subscribed to and forwards it to the relevant client channels, which have connected via onSubscribeNetwork. The incoming data is forwarded to all channels connected to this pipe-lined processing channel using the following hash message format: type=networkData, name is the channel name and data holding the data.

Parameters
  • channelName: name of the InputChannel that provides these data
  • data: the data coming from channelName
  • meta: corresponding meta data

void sendSystemTopology(WeakChannelPointer channel)

sends the current system topology to the client connected on channel. The hash reply contains type=systemTopology and the systemTopology.

Parameters
  • channel:

void instanceNewHandler(const karabo::data::Hash &topologyEntry)

sends the current system topology to the client connected on channel. The hash reply contains type=systemVersion and the systemVersion.

Parameters
  • channel:

void instanceChangeHandler(const karabo::data::Hash &instChangeData)

Handles events related to instances: new instance, instance updated, instance gone.

: Its signature matches karabo::core::InstanceChangeThrottler::InstanceChangeHandler).

void devicesChangedHandler(const karabo::data::Hash &what)

Acts upon incoming configuration updates from one or more devices. It is called back by a monitor registered on the device client. The reconfiguration contained in the what hash is forwarded to any channels connected to the monitor by onStartMonitoringDevice.

The message type of the hash sent out is type=”deviceConfigurations”. The hash has a second first level key, named “configurations”, whose value is a hash with the deviceIds as keys and the configuration changes for the corresponding deviceId as values.

Parameters
  • what: A hash containing all the configuration changes that happened to one or more monitored devices since the last update. Each node under the key “configurations” has the ‘deviceId’ as key and the changed configurations as a value of type Hash.

void slotGetClientSessions(const karabo::data::Hash &options)

Retrieve information about the current client sessions of the GUI server.

The reply is a hash with a single key, “clientSessions”, whose value is a vector of hashes with one hash per existing client connection. The hash for each connection has the following keys:

. "clientVersion": string with the version of the connected
  client;

. "sessionStartTime": UTC timestamp of session start time as
  an ISO8601 string;

. "sessionToken": one-time token for the client session.
  Will be empty if the GUI Server is not in authenticated
  mode;

. "temporarySessionStartTime": UTC timestamps of temporary
  session start time as an ISO8601 string. If there's no
  active temporary session on top of the client session, an
  empty string is returned;

. "temporarySessionToken": one-time token for the temporary
  session (an empty string if there's no active temporary
  session).
Parameters
  • options: Hash with a single key “onlyWithTempSession” and a boolean value that when “true” makes the slot include only client sessions with active temporary sessions in the reply.

void slotProjectUpdate(const karabo::data::Hash &info, const std::string &instanceId)

Called from projectManagers to notify about updated Projects

Parameters
  • info: the info hash containing the information about the updated projects
  • instanceId: the instance id of the project manager device

void slotDumpToLog()

Slot to dump complete debug info to log file

Same info as received from ‘slotDumpDebugInfo’ with empty input Hash

void slotNotify(const karabo::data::Hash &info)

Slot to send a notification message to all clients connected - replies empty Hash

Parameters
  • info: a Hash with following keys
    • ”message”: a string containing the notification type
    • ”contentType”: a string defining the type of notification as the GUI client understands it
      • ”banner” means message will go to the GUI banner. Therefore it will be stored in the “bannerMessage” property of the GuiServerDevice and sent to any client that connects.
      • other types will likely just be shown in a pop-up window of the client

void slotBroadcast(const karabo::data::Hash &info)

Slot to send a Hash to the GUI clients connected - replies empty Hash

WARNING: No checks are performed on this slot. This slot can possibly disconnect all clients. Do not use it unless you understand the risks.

Parameters
  • info: a Hash with at least the following keys.
    • ”message”: a Hash that will be sent to the client(s). It should contain a “type” string.
    • ”clientAddress”: a string containing the GUI client address as coded in the slotDumpDebugInfo results. If the value for this key is an empty string, all clients will be notified.

void requestScene(const karabo::data::Hash &info)

Slot to provide scene

Parameters
  • info: Hash with key “name” that provides string identifying which scene

karabo::data::Hash getDebugInfo(const karabo::data::Hash &info)

Helper for ‘slotDumpToLog’ and ‘slotDumpDebugInfo’

void slotDisconnectClient(const std::string &client)

Slot to force disconnection of client. Reply is whether specified client found.

Parameters
  • client: string to identify client, as can be received via slotDumpDebugInfo(Hash(“clients”, 0))

void typeAndInstanceFromTopology(const karabo::data::Hash &topologyEntry, std::string &type, std::string &instanceId)

Returns the instance type and instance id from a topology entry

Parameters
  • topologyEntry: a Hash of the topology format
  • type: string which will afterwards contain type
  • instanceId: string which will be filled with the instance id

bool allowLock() const

This device may not be locked

Return
false

void registerPotentialProjectManager(const karabo::data::Hash &topologyEntry)

Checks if an instance at instanceId is a ProjectManager. If so, register it to the list of known project services

Parameters
  • topologyEntry: the topology Hash, from which the class of instanceId will be deduced

std::vector<std::string> getKnownProjectManagers() const

Return a list of project services known to this GUI server

Return

bool checkProjectManagerId(WeakChannelPointer channel, const std::string &deviceId, const std::string &type, const std::string &reason)

Check if a given project manager identified by id is known in the distributed system

Return
true if the project manager id exists in the distributed system
Parameters
  • channel: to forward a failure message to if not
  • deviceId: of the project manager device
  • type: of the request

std::string getChannelAddress(const karabo::net::Channel::Pointer &channel) const

Utility for getting a “name” from client connections.

bool skipExecutionTimeout(const std::string &deviceId)

Helper Function to identify whether a device belongs to the timeout violation list TODO: remove this once “fast slot reply policy” is enforced

returns true if a .timeout() should be skipped on execution requestor

void recalculateTimingOutDevices(const karabo::data::Hash &topologyEntry, const std::vector<std::string> &timingOutClasses, bool clearSet)

Helper Function to recalculate the list of timeout violating devices from the list of offending classes TODO: remove this once “fast slot reply policy” is enforced

Private Static Attributes

const std::string m_errorDetailsDelim

In reported failure reasons, this delimiter comes between short message and details like a trace.

class GuiServerTemporarySessionManager
#include <GuiServerTemporarySessionManager.hh>

Manages temporary sessions on top of user-authenticated GUI Sessions.

Takes care of authorizing one-time temporary session tokens to start temporary sessions and of communicating temporary sessions about to expire or already expired.

Inherits from std::enable_shared_from_this< GuiServerTemporarySessionManager >

Public Functions

GuiServerTemporarySessionManager(const std::string &topic, const std::string &authServerUrl, unsigned int temporarySessionDurationSeconds, unsigned int temporarySessionEndNoticeSeconds, EminentExpirationHandler onEminentExpiration, ExpirationHandler onExpiration)

Construct a new Gui Server Temporary Session Manager object.

Parameters
  • topic: the Karabo topic against which temporary session tokens will be authorized.
  • authServerUrl: the URL of the authentication server to use for authorizing one-time temporary session tokens.
  • temporarySessionDurationSeconds: the duration, in seconds, to be enforced for temporary sessions.
  • temporarySessionEndNoticeSeconds: the time in advance, in seconds, to communicate about an eminent end of temporary session event.
  • onEminentExpiration: handler for temporary sessions about to expire.
  • onExpiration: handler for expired temporary sessions.

void beginTemporarySession(const std::string &temporarySessionToken, const BeginTemporarySessionHandler &onBeginTemporarySession)

Asynchronously starts a new temporary session for a given one-time temporary session token.

Note
Calls the registered BeginTemporarySessionHandler with the results of the beginTemporarySession operation.
Parameters
  • temporarySessionToken: the one-time temporary session token to be authorized and bound to the started temporary session.
  • onBeginTemporarySession: handler for begin temporary session events (either successful or failed).

EndTemporarySessionResult endTemporarySession(const std::string &temporarySessionToken)

Synchronously terminates a temporary session referenced by a given temporary session token.

Return
a structure with the endTemporarySession operation results.
Note
an error due to a beginTemporarySession token not found isn’t necessarily an error from the GUI client point of view. In the unlikely scenario of an endTemporarySession request that reaches the GUI server while the expiration check that will detect the expiration of the same token is already running, the end temporary session request will “fail” with a “token not found” message. It is up to the GUI client to decide what to do in such cases - maybe keep track of an “over the wire” end temporary session request token and ignore any error related to it if an expiration notification is received for that token between the request dispatch and the arrival of its response.
Parameters
  • temporarySessionToken: the one-time temporary session token bound to the session to be terminated.

Private Functions

void checkTemporarySessionsExpirations(const boost::system::error_code &error)

Checks the currently active temporary sessions removing the expired ones after invoking the registered expiration handlers for each of them.

Parameters
  • error: an error code sent by boost::asio that if different from 0 indicates that the timer pulse that should invoke this check at some future point has been cancelled.

void onTokenAuthorizeResult(const std::string &temporarySessionToken, const BeginTemporarySessionHandler &onBeginTemporarySession, const karabo::net::OneTimeTokenAuthorizeResult &authResult)

Handles the result of a temporary session token authorization request, updating the internal state of the manager and communicating the outcome of the begin temporary session request to the external requestor.

Parameters
  • temporarySessionToken: the one-time temporary session token whose authorization was requested.
  • onBeginTemporarySession: handler for begin temporary session events (either successful or failed).
  • authResult: the result of the authorization of the temporary session token provided by the external caller of the begin temporary session operation.

void scheduleNextExpirationsCheck()

Schedules the next expiration check if there’s any escalation to be checked.

Note
this method must be called with the m_tempSessionsMutex locked.

class IndexBuilderService
#include <FileLogReader.hh>

A singleton class for building logger indices from logger files. It calls karabo-idxbuild with a list of command line arguments.

Inherits from std::enable_shared_from_this< IndexBuilderService >

Public Functions

void buildIndexFor(const std::string &commandLineArguments)

Build an index by calling karabo-idxbuild with the supplied command line arguments

Parameters
  • commandLineArguments:

Public Static Functions

IndexBuilderService::Pointer getInstance()

Return a pointer to a singleton instance of IndexBuilderService. If no instance exists one is created.

Return

Private Functions

bool allowLock() const

This device may not be locked

Return
false

class InfluxDataLogger

Inherits from karabo::devices::DataLogger

Public Functions

void preDestruction()

Override preDestruction from Device class

Protected Functions

void initializeLoggerSpecific()

Do some actions here that may require asynchronous logic … and, finally, startConnection() should be called This function may be overridden by derived classes but at the end the ‘startConnection’ function should be called as a last step of initialization

void flushImpl(const std::shared_ptr<SignalSlotable::AsyncReply> &aReplyPtr)

“Flush” data accumulated in the internal cache to the external storage (file, database,…)

struct InfluxDeviceData

Inherits from karabo::devices::DeviceData

Public Functions

void handleChanged(const karabo::data::Hash &config, const std::string &user)

Called when configuration updates arrive for logging

Parameters
  • config: a Hash with the updates and their timestamps
  • the: user responsible for this update - if any

void login(const karabo::data::Hash &configuration, const std::vector<std::string> &sortedPaths)

Helper to store logging start event

Parameters
  • configuration: full device configuration received when logging starts
  • sortedPaths: full paths of configuration, sorted by increasing timestamp

void handleSchemaUpdated(const karabo::data::Schema &schema, const karabo::data::Timestamp &stamp)

Called when a Schema update arrive for logging

Parameters
  • schema: - the new one
  • stamp: - the timestamp to be assigned for that update

unsigned int newPropLogRate(const std::string &propPath, karabo::data::Epochstamp currentStamp, std::size_t currentSize)

Calculates what the value of the property logging rate of the device will be when the logging of a value with a given size and a given timestamp is taken into account.

Return
The updated value of the property logging rate, in bytes/sec, taking the logging of the value into account.
Parameters
  • prop: The path of the property whose current logging rate will be evaluated.
  • currentStamp: The current property update timestamp.
  • currentSize: The size for the new data to be logged - this is used along with the other records in the current log rating window to calculate the new value for the property logging rate.

unsigned int newSchemaLogRate(std::size_t schemaSize)

Calculates what the value of the schema logging rate of the device will be when the logging of a schema with a given size is taken into account. As schemas currently don’t have associated time information, the current system time is used for all the timing references.

Return
The updated value of the schema logging rate, in bytes/sec, taking the logging of the schema into account.
Parameters
  • schemaSize: The size for the new schema to be logged - this is used along with the other records in the current log rating window to calculate the new value for the schema logging rate.

bool logNewSchema(const std::string &schemaDigest, const std::vector<char> &schemaArchive)

Logs a new schema into the corresponding device’s __SCHEMA measurement. It is assumed that the verification of the uniquiness of the device schema has already been verified based on its digest.

Return
true If the new schema has been successfuly submitted for logging.
Return
false If the logging of the new schema has not been submitted for logging. Currently, this happens if logging the new schema would be above the allowed schema logging rate threshold for a device.
Parameters
  • schemaDigest: The digest (assumed unique) of the new schema to be saved.
  • schemaArchive: The serialised schema to be saved

void logRejectedData(const std::vector<RejectedData> &rejects, unsigned long long ts)

Logs the given set of rejected data in the BAD__DATA measurement and to the Karabo log. To avoid spanning of the Karabo log, log is emmitted for each device only once in a period of 30 secs.

Parameters
  • rejects: The rejected data to be logged.
  • ts: An epoch with the precision expected in the InfluxDb.

void logRejectedDatum(const RejectedData &rejects)

Logs the given rejected data record in the BAD__DATA measurement and to the Karabo log. To avoid spanning of the Karabo log, log is emmitted for each device only once in a period of 30 secs.

Parameters
  • rejects: The rejected data to be logged.

struct LoggingRecord
#include <InfluxDataLogger.hh>

The size, in characters, and the epoch seconds of a device log entry saved to Influx.

Used for calculating the logging rates associated to a device.

class InfluxLogReader

Inherits from karabo::devices::DataLogReader

Private Functions

void asyncDataCountForProperty(const std::shared_ptr<PropertyHistoryContext> &context)

Triggers the retrieval of the number of data points for a given device property during a time interval.

Parameters
  • context:

void onDataCountForProperty(const karabo::net::HttpResponse &dataCountResp, const std::shared_ptr<PropertyHistoryContext> &ctxt)

Handles the retrieval of the number of data points for an ongoing GetPropertyHistory process. Responsible for invoking the appropriate async method for retrieving the property values depending on the number of data points received.

Parameters
  • dataCountResponse:
  • context:

void asyncGetPropertyValues(const std::shared_ptr<PropertyHistoryContext> &ctxt)

Triggers the retrieval of the property values in an ongoing GetPropertyHistory process.

Parameters
  • context:

void asyncGetPropertyValuesMean(const std::shared_ptr<PropertyHistoryContext> &ctxt)

Triggers the retrieval of the property values mean in an ongoing GetPropertyHistory process. This is used when the number of available data points for the property is larger than the maximum requested by the slot caller and all values are scalar numbers. The UINT64 properties are included in this despite being reinterpreted as INT64 on the backend and possibly returning incorrect data.

Parameters
  • context:

void asyncGetPropertyValuesSamples(const std::shared_ptr<PropertyHistoryContext> &ctxt)

Triggers the retrieval of the property values samples in an ongoing GetPropertyHistory process. This is used when the number of available data points for the property is larger than the maximum requested by the slot caller.

Parameters
  • context:

void onPropertyValues(const karabo::net::HttpResponse &valuesResp, const std::string &columnPrefixToRemove, const std::shared_ptr<PropertyHistoryContext> &ctxt)

Handles the retrieval of the values of a property in an ongoing GetPropertyHistory process. Responsible for transforming the json formatted values received from InfluxDbClient into a vector of hashes suitable to be returned to the slot caller. Also responsible for replying to the slot caller.

Parameters
  • valuesResp:
  • columnPrefixToRemove:
  • context:

void onMeanPropertyValues(const karabo::net::HttpResponse &valuesResp, const std::shared_ptr<PropertyHistoryContext> &ctxt)

Handles the retrieval of the values of a property in an ongoing GetPropertyHistory process. Responsible for transforming the json formatted values received from InfluxDbClient into a vector of hashes suitable to be returned to the slot caller. This function extends the functionality of onPropertyValues while keeping the property history protocol.

Also responsible for replying to the slot caller.

Parameters
  • valuesResp:
  • context:

std::string unescapeLoggedString(const std::string &loggedStr)

Unescapes a logged string. A logged string has its new lines mangled, then its double slashes escaped and then its double quotes escaped. This functions applies those transformations in the reverse order.

Return
The unescaped original string.
Parameters
  • loggedStr: The string as it has been escaped by the Influx Logger.

bool preHandleHttpResponse(const karabo::net::HttpResponse &httpResponse, const karabo::xms::SignalSlotable::AsyncReply &asyncReply)

Performs an initial common handling of an HTTP response received by the Log Reader.

In the InfluxDb client <-> server communication context, any response with a status code greater or equal to 300 is considered an error and will be completely handled by this method. A specific status code, 503, indicates that the InfluxDb server was not available and puts the Log Reader in ERROR state. Any other error puts the Log Reader in ON state.

The error handling consists of sending the appropriate error reply to the caller of the InfluxLogReader slot affected by the error and of optionally disconnecting the InfluxDbClient used by the slot.

Return
true if the preHandle method completely processed the HttpResponse and no further action from the Log Reader is needed. This is the case for responses with status codes indicating errors. false if the response should still be processed by the response handler that called preHandleHttpResponse.
Parameters
  • httpResponse: the response that potentially indicates an error.
  • asyncReply: the reply to be sent to the caller of the slot where the error happened.

karabo::data::Epochstamp toEpoch(unsigned long long timeFromInflux) const

Convert a time point from influx to karabo Epochstamp

karabo::data::Hash buildInfluxClientConfig(const std::string &dbUrlForSlot) const

Builds and returns the configuration Hash for instantiating an InfluxDbClient to be used in the execution of one of the slots supported by the reader.

Return
the configuration Hash for the InfluxDbClient.
Parameters
  • dbUrlForSlot: the URL to be used in the configuration - each slot can use a different database URL.

class PropertyTest
#include <PropertyTest.hh>

The PropertyTest device includes all types Karabo knows about in it’s expected parameter section. It is a test device to assure changes to the framework do not result in broken types.

Inherits from karabo::core::Device

Private Functions

void orderTest_slotStart()

The order test started with this slot works as follows:

  • ’stringProperty’ defines the ‘other’ PropertyTest device supposed to send messages to us
  • ’int32Property’ defines how many messages it should send
  • the number of messages and our own id are transferred to the other device
  • we connect our ‘slotCount’ to the other’s ‘signalCount’
  • we call the other’s ‘slotStartCount’ which will trigger sending messages to us, alternating between direct calls to our ‘slotCount’ and emitting ‘signalCount’ with count arguments starting from 0
  • we keep track of all counts received and their order (so do not run with billions of counts!)
  • end of messaging is signaled to us via a call with count = -1
  • we publish the number of received messages and those counts (well, up to 1000 only) that are not in order as “orderTest.receivedCounts” and “orderTest.nonConsecutiveCounts”, respectively.

struct PropFileInfo
#include <FileLogReader.hh>

A compound structure holding data on an logger archive file.

The karabo::io Namespace

Warning

doxygennamespace: Cannot find namespace “karabo::io” in doxygen xml output for project “KARABO” from directory: /usr/src/app/checkouts/readthedocs.org/user_builds/karabo/checkouts/latest/doc/.build/html/reference/xml

The karabo::net Namespace

namespace karabo::net

Namespace for package net

Typedefs

typedef boost::system::error_code ErrorCode
typedef std::function<void(const ErrorCode&)> ErrorHandler
using karabo::net::tcp = typedef boost::asio::ip::tcp
using karabo::net::errorCode = typedef boost::beast::error_code
using karabo::net::flatBuffer = typedef boost::beast::flat_buffer
using karabo::net::getRequest = typedef boost::beast::http::request<boost::beast::http::empty_body>
using karabo::net::HttpHeader = typedef boost::beast::http::field
using karabo::net::HttpHeaders = typedef boost::beast::http::fields
using karabo::net::HttpResponseHandler = typedef std::function<void(const boost::beast::http::response<boost::beast::http::string_body>&)>
using karabo::net::postRequest = typedef boost::beast::http::request<boost::beast::http::string_body>
using karabo::net::response = typedef boost::beast::http::response<boost::beast::http::string_body>
using karabo::net::resolver = typedef boost::asio::ip::tcp::resolver
using karabo::net::results_type = typedef boost::asio::ip::tcp::resolver::results_type
using karabo::net::tcp_stream = typedef boost::beast::tcp_stream
using karabo::net::ssl_stream = typedef boost::beast::ssl_stream<boost::beast::tcp_stream>
using karabo::net::ssl_context = typedef boost::asio::ssl::context
using karabo::net::verb = typedef boost::beast::http::verb
using karabo::net::InfluxResponseHandler = typedef std::function<void(const HttpResponse&)>
using karabo::net::InfluxConnectedHandler = typedef std::function<void(bool)>
typedef std::shared_ptr<std::vector<char>> VectorCharPointer
typedef std::shared_ptr<Channel> ChannelPointer
using karabo::net::AuthOneTimeTokenHandler = typedef std::function<void(const OneTimeTokenAuthorizeResult&)>
using karabo::net::AsyncHandler = typedef std::function<void(const boost::system::error_code)>

Enums

enum AmqpCppErrc

Values:

eCreateChannelError = 1000
eCreateExchangeError
eCreateQueueError
eBindQueueError
eCreateConsumerError
eUnbindQueueError
eDrop
eMessageDrop
enum AsyncStatus

Values:

PENDING = 0
FAILED = -1
DONE = 1
enum ConnectionStatus

Values:

DISCONNECTED = 0
CONNECTING
CONNECTED
DISCONNECTING

Functions

boost::system::error_code make_error_code(AmqpCppErrc e)
std::ostream &operator<<(std::ostream &os, const HttpResponse &o)
InfluxDbClient::Pointer buildInfluxReadClient()

Instantiates an InfluxDbClient that connects to an InfluxDb reading node.

The connection parameters for the InfluxDb reading node are obtained via the following environment variables:

KARABO_INFLUXDB_QUERY_URL KARABO_INFLUXDB_DBNAME (with fallback to the Broker domain) KARABO_INFLUXDB_QUERY_USER KARABO_INFLUXDB_QUERY_PASSWORD

Return
A std::shared_ptr to the built InfluxDbClient instance.

std::ostream &operator<<(std::ostream &os, const NetworkInterface &ni)

Send a string representation of the NetworkInterface object to a stream. (Mostly for debug purposes)

static karabo::data::Hash extendHeaderForBufferSets(const karabo::data::Hash &hdr, const std::vector<karabo::data::BufferSet::Pointer> &body)

Helper to extend header for writing header and BufferSet pointers.

Return
extended header

std::string bareHostName()

Return the bare host name after stripping domain (exflxxx12345.desy.de => exflxxx12345)

Return

void runProtected(std::shared_ptr<boost::asio::io_service> service, const std::string &category, const std::string &errorMessage, unsigned int delayInMilliSec = 100)

Wrapper around boost::asio::io_service::run that catches exceptions, logs them as errors and continues after some delay.

Parameters
  • service: shared pointer to the io_service
  • category: the category used for logging
  • errorMessage: will be part of the logged error
  • delayInMilliSec: is the delay after each catch

std::tuple<std::string, std::string> parseGenericUrl(const std::string &url)

Parses a URL and returns a tuple.

The URL must of format: <scheme>:<scheme-dependent-part>

Return
tuple containing scheme and scheme dependent part
Parameters
  • url: A well formed URL

std::tuple<std::string, std::string, std::string, std::string, std::string> parseUrl(const std::string &url)

Parses a HTTP-like URL and returns a tuple.

The URL must of format: <scheme>://<domain>:<port>/<path>?<query>

Return
tuple containing scheme, domain, port, path and query
Parameters
  • url: A well formed URL

string urlencode(const std::string &value)
std::string getIpFromCIDRNotation(const std::string &addressRange)

Returns an IP string from a Classless Inter-Domain Routing specification

e.g. the string 192.168.0.0/24 represents the IP range between 192.168.0.0 and 192.168.0.255.

The function will ignore loopback interface and interfaces that are down. Only IP4 specifications are implemented.

Return
an IP address matching the input range or the input string if the input string does not specify a network range or if it does not match any external active interface

Variables

const size_t kDefaultQueueCapacity = 5000
class AmqpBroker

Inherits from karabo::net::Broker

Public Functions

Broker::Pointer clone(const std::string &instanceId)

The function creates broker communication object with the new identity by cloning the parent object. Concrete meaning of cloning strategy is an implementation detail.

Return
new broker communication object
Parameters
  • instanceId: - unique ID

void connect()

This function establishes connection to the broker otherwise throws exception

void disconnect()

Close broker connection

bool isConnected() const

Predicate to check broker connection status

Return
true if connection is open

std::string getBrokerUrl() const

Get active URI used for establishing connection to broker

Return
uri like “mqtt://localhost:1883”

std::string getBrokerType() const

Get type string identifying broker. Example: “AmqpBroker”

Return
the type defined by active uri

boost::system::error_code subscribeToRemoteSignal(const std::string &signalInstanceId, const std::string &signalFunction)

Establish logical signal-slot connection between 2 devices that is required by used protocol for registration

Parameters
  • signalInstanceId: device instance ID of a signal
  • signalFunction: signal name

boost::system::error_code unsubscribeFromRemoteSignal(const std::string &signalInstanceId, const std::string &signalFunction)

Close logical signal-slot connection. De-registration in broker specific API.

Parameters
  • signalInstanceId:
  • signalFunction:

void subscribeToRemoteSignalAsync(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler)

Establish signal-slot connection asynchronously

Parameters
  • signalInstanceId:
  • signalFunction:
  • completionHandler: this callback is called when complete

void unsubscribeFromRemoteSignalAsync(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler)

Unsubscribe from (remote) signal asynchronously

Parameters
  • signalInstanceId:
  • signalFunction:
  • completionHandler:

void startReading(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier = consumer::ErrorNotifier())

AMQP subscription: subscribe to the following exchanges… “m_domain.slots” with routingKey m_instanceId “m_domain.global_slots”

Parameters
  • handler: - success handler
  • errorNotifier: - error handler

void stopReading()

Stop processing messages coming via main path

void startReadingHeartbeats(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier = consumer::ErrorNotifier())

Heartbeat is used for tracking instances (tracking all instances or no tracking at all)

AMQP subscription Subscribe to the exchange “m_domain.signals” with the routing key: “*.signalHeartbeat” heartbeats of all known connections

Parameters
  • handler: - success handler
  • errorNotifier: - error handler

void write(const std::string &topic, const karabo::data::Hash::Pointer &header, const karabo::data::Hash::Pointer &body)

Write message to broker, blocks until written

Parameters
  • topic: Either the “domain” as passed to the Broker base class, the “domain” with the suffix “_beats”, or “karaboGuiDebug”
  • header: of the message - must contain
  • body: of the message

Public Static Functions

void expectedParameters(karabo::data::Schema &s)

AmqpBroker operates currently with the following set of …

Signals are sent to the exchange …

exchange = <domain>.signals routing_key = <signalInstanceId>.<signalName> < selector queue = <m_instanceId> < common queue

the signals are emitted to the exchange bound via routing_key to the queue. The slotInstanceIds should subscribe to the AMQP::topic type exchange with the ‘routing_key’ and queue = <slotInstanceId>

Special case of above signals… signalHeartbeat … exchange = <domain>.signals routing_key = <signalInstanceId>.signalHeartbeat queue = <m_instanceId>

Calls, commands, requests, replies are sent to

exchange = <domain>.slots routing_key = <slotInstanceId> queue = <m_instanceId> < common queue

all requests/calls/replies to the device send to this exchange The further message dispatching to slots is provided by using info in message header.

Broadcast messages should be sent to …

exchange = <domain>.global_slots routing_key = “” queue = <m_instanceId>

there is a way of implementing “broadcast” messages like in JmsBroker. In JMS it was enough to use “|*|” in header’s slotInstanceIds. In AMQP we have to be subscribed to such exchange (to receive broadcast messages). Known global slots: slotInstanceNew to announce the new device in Karabo network slotInstanceUpdated to announce the device info to be updated slotInstanceGone to announce device death, slotPing to trigger sending their status by all devices received such message

GUI debug

exchange = <domain>.karaboGuiDebug routing_key = “” queue = <as gui=”” debug=”” listener=”” defines>=”“>

GUI debugging channel

void defaultQueueArgs(AMQP::Table &args)

Fill argument with default AMQP message queue creation arguments

class AmqpClient
#include <AmqpClient.hh>

Class that exposes an AMQP client.

It will create a unique queue and consume from it exclusively and with automatic acknowledgment. Its queue name will start with the given instanceId and will potentially be suffixed by some characters to ensure uniqueness.

To actually receive messages via the handlers specified in the constructor, the client has to subscribe to exchanges, potentially with routing keys to select messages on the broker side.

Note
: This client does not know about Karabo “domains” (a.k.a. “topics”), i.e. exchanges and queues created are “shared” among all clients connected to the same broker.

Inherits from std::enable_shared_from_this< AmqpClient >

Public Types

enum ChannelStatus

Channel status tells what should be the next step to do in channel preparation

Values:

REQUEST
CREATE
CREATE_QUEUE
CREATE_CONSUMER
READY
enum ExchangeStatus

Exchange status tells about the status of a known exchange

Values:

DECLARING
READY
enum SubscriptionStatus

Subscription status tells in which status a registered subscription currently is

Values:

PENDING
CHECK_EXCHANGE
DECLARE_EXCHANGE
BIND_QUEUE
READY
UNBIND_QUEUE

Public Functions

AmqpClient(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, ReadHandler readHandler)

Create client with raw data interface from connection

Parameters
  • connection: the connection, all internal data access will run in its io context
  • instanceId: the client id - will usually be the name of the queue that will be subscribed
  • queueArgs: the arguments passed to queue creation
  • readHandler: a read handler for all received messages (if an invalid function, must call setReadHandler before the first subscription)

void setReadHandler(ReadHandler readHandler)

(Re-)set the read handler that will be called for all received messages

Parameters
  • readHandler: A valid read function (karabo::data::ParameterException if not valid)

void asyncSubscribe(const std::string &exchange, const std::string &routingKey, AsyncHandler onSubscriptionDone)

Asynchronously subscribes client

If subscription is reported to have failed, it will be tried again

Parameters
  • exchange: name of AMQP exchange that will be created if not yet existing
  • routingKey: the AMQP routing key
  • onSubscriptionDone: a valid handler called in AMQP io context (so please no mutex inside, please) when subscription established or failed

void asyncUnsubscribe(const std::string &exchange, const std::string &routingKey, AsyncHandler onUnsubscriptionDone)

Asynchronously unsubscribes client

Note: Success will be reported for an unsubscription from exchange/routing key that it was not subscribed before

Parameters
  • exchange: name of AMQP exchange that will be unsubscribed from
  • routingKey: the AMQP routing key to unsubscribe from
  • onUnsubscriptionDone: a valid handler called in AMQP io context (so please no mutex inside, please) when unsubscription succeeded or failed

void asyncUnsubscribeAll(AsyncHandler onUnsubscriptionsDone)

Asynchronously unsubscribes client from all subscriptions

Parameters
  • onUnsubscriptionDone: a valid handler called in AMQP io context (so please no mutex inside, please) when all unsubscription requests are done. If any of them failed, the error code passed is the one of the last failure

void asyncPublish(const std::string &exchange, const std::string &routingKey, const std::shared_ptr<std::vector<char>> &data, AsyncHandler onPublishDone)

Asynchronously publish data

Parameters
  • exchange: the exchange…
  • routingKey: …and the routingkey for the data
  • data: a raw data container fo the message to be published (must be non-zero pointer)
  • onPublishDone: handler called in AMQP io context (so please no mutex inside, please) when data published

void reviveIfReconnected()

Revice after connection was lost and re-established

Means to recreate channel, redo all subscriptions and publish postponed messages

To be called if AmqpConnection is connected again after connection loss Must be called within io context of AmqpConnection

Private Functions

void asyncPrepareChannel(AsyncHandler onChannelPrepared)

Prepare m_channel until it is ChannelStatus::READY

Must be called in the io context of the AmqpConnection

Parameters
  • onChannelPrepared: handler called when m_channel READY or if failure on the way

void moveChannelState()

Helper to move the created channel through its states, asynchronously calling itself. If READY (or failure), call and erase the m_channelPreparationCallback

void doPublish(const std::string &exchange, const std::string &routingKey, const std::shared_ptr<std::vector<char>> &data, const AsyncHandler &onPublishDone)

Helper to publish, must run in io context and only when channel is READY and exchange declared

void queueMessage(PostponedMessage &&message)

Queue message (or drop if queueu too long), must run in io context

void publishPostponed()

Helper to publish postponed messages until first found with an exchange that is not yet declared

Private Members

std::queue<PostponedMessage> m_postponedPubMessages

Messages postponed since channel not yet ready or exchange not yet declared.

class AmqpConnection
#include <AmqpConnection.hh>

Wraps the AMQP::TcpConnection and the single threaded io context where all calls to the amqp library must run.

AmqpConnection

Inherits from std::enable_shared_from_this< AmqpConnection >

Public Types

enum State

Connection states

Values:

eUnknown = 2000
eStarted
eNotConnected
eConnectionDone
eConnectionReady
eConnectionClosed
eConnectionError
eConnectionLost
using ChannelCreationHandler = std::function<void(const std::shared_ptr<AMQP::Channel>&, const std::string &errMsg)>

Handler for asyncCreateChannel

Either returns the channel or (if returned channel pointer is empty) state the failure reason.

Public Functions

AmqpConnection(std::vector<std::string> urls)

Constructing a connection and starting the thread of the io context

Parameters
  • urls: vector of broker urls to try to connect to in asyncConnect (throws karabo::util::NetworkException if vector is empty)

std::string getCurrentUrl() const

Return currently used broker URL (either already connected to it or the currently/next tried one)

bool isConnected() const

Whether connection established

std::string connectionInfo() const

Various info about internal connection (for debug logs)

void asyncConnect(AsyncHandler onComplete)

Asynchronously connect to any of the broker addresses passed to the constructor.

Addresses will be tried in the order they have been passed. Can be called from any thread.

Parameters
  • onComplete: AsyncHAndler called (not from within asyncConnect) about success or failure of connection attempt. If all addresses failed, the error code passed is the one of the last address passed to the constructor. The handler (if valid) will be called from within the internal io context, but not within the scope of asyncConnect.

void asyncCreateChannel(ChannelCreationHandler onComplete)

Trigger creation of an amqp channel and return it via the handler.

If not connected yet, try to connect first. Note that if connection lost, channel creation will not be tried again, but failure is reported.

Can be called from any thread.

Parameters
  • onComplete: A valid (!) ChannelCreationHandler that will be called from within the internal io context, but not within the scope of asyncCreateChannel.

void registerForReconnectInfo(std::weak_ptr<AmqpClient> client)

Register client to be informed about re-established connection after connection loss

void cleanReconnectRegistrations()

Clean clients registered to receive reconnect info, i.e. remove all dangling weak pointers

Can e.g. be called in the destructor of a client that registered before.

template <typename CompletionToken>
void post(CompletionToken &&task) const

Post a task to the io context

The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.

template <typename CompletionToken>
void dispatch(CompletionToken &&task) const

Detach a task to the io context, i.e. run it now or later depending on which thread we are in

The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.

Private Functions

void doAsyncConnect()

Helper to asyncConnect iterating over urls until success or all urls tried. Then calls m_onConnectionComplete.

Prerequisite: m_urlIndex < m_urls.size()

void informReconnection(const boost::system::error_code &ec)

Must run in io context

void triggerReconnection()

Must run in io context

const char *stateString(AmqpConnection::State state)

Convert State to a string (or rather const char*)

Private Members

std::set<std::weak_ptr<AmqpClient>, std::owner_less<std::weak_ptr<AmqpClient>>> m_registeredClients

Track clients to inform about reconnections.

class AmqpHashClient
#include <AmqpHashClient.hh>

Class that wraps around AmqpClient to provide a message interface with Hash header and body.

Deserialisation of incoming messages is done via a karabo::net::Strand, i.e. a running karabo::net::EventLoop is needed.

Inherits from std::enable_shared_from_this< AmqpHashClient >

Public Functions

void asyncSubscribe(const std::string &exchange, const std::string &routingKey, AsyncHandler onSubscriptionDone)

Asynchronously subscribes client by just forwarding to AmqpClient::asyncSubscribe

==> See docs of that.

void asyncUnsubscribe(const std::string &exchange, const std::string &routingKey, AsyncHandler onUnsubscriptionDone)

Asynchronously unsubscribes client by just forwarding to AmqpClient::asyncUnsubscribe

==> See docs of that.

void asyncUnsubscribeAll(AsyncHandler onUnsubscriptionDone)

Asynchronously unsubscribes client from all subscriptions by just forwarding to AmqpClient::asyncUnsubscribeAll

==> See docs of that.

void asyncPublish(const std::string &exchange, const std::string &routingKey, const data::Hash::Pointer &header, const data::Hash::Pointer &body, AsyncHandler onPublishDone)

Asynchronously publish data from header and body

Hashes are serialised such that AmqpClient::asyncPublish can be use internally. ==> See docs of that.

Public Static Functions

AmqpHashClient::Pointer create(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, HashReadHandler readHandler, ErrorReadHandler errorReadHandler)

Create client with message interface based on two Hashes (header and body).

Parameters
  • connection: the connection, all internal data access will run in its io context
  • instanceId: the client id - will usually be the name of the queue that will be subscribed
  • queueArgs: the arguments passed to queue creation
  • readHandler: a valid read handler for all received messages
  • errorReadHandler: a valid handler called when a received message could not be processed, e.g. due to serialisation problems

Private Functions

AmqpHashClient(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, HashReadHandler readHandler, ErrorReadHandler errorReadHandler)

Internal constructor, use static create instead: raw clients read handler has to be set after construction

void onRead(const std::shared_ptr<std::vector<char>> &data, const std::string &exchange, const std::string &routingKey)

Handler passed to raw client (i.e. runs in io context of connection).

Post arguments for deserialzation on the respective strand that runs in Karabo event loop

void deserialize(const std::shared_ptr<std::vector<char>> &data, const std::string &exchange, const std::string &routingKey)

Deserializes ‘data’ input into Hash for header and body, adds exchange and key to the header and calls handler passed to constructor

class Broker

Inherits from std::enable_shared_from_this< Broker >

Subclassed by karabo::net::AmqpBroker

Public Functions

virtual Broker::Pointer clone(const std::string &instanceId) = 0

The function creates broker communication object with the new identity by cloning the parent object. Concrete meaning of cloning strategy is an implementation detail.

Return
new broker communication object
Parameters
  • instanceId: - unique ID

virtual void connect() = 0

This function establishes connection to the broker otherwise throws exception

virtual void disconnect() = 0

Close broker connection

virtual bool isConnected() const = 0

Predicate to check broker connection status

Return
true if connection is open

virtual std::string getBrokerUrl() const = 0

Get active URI used for establishing connection to broker

Return
uri like “mqtt://localhost:1883”

virtual std::string getBrokerType() const = 0

Get type string identifying broker. Example: “AmqpBroker”

Return
the type defined by active uri

const std::string &getInstanceId() const

Get current instance ID associated with this broker object

Return
instanceId

const std::string &getDomain() const

Get the domain this broker is communicating to

Return
domain

void setConsumeBroadcasts(bool consumeBroadcasts)

Set flag defining the way how to handle broadcast messages. It influences on subscription to such messages, i.e. has to be called before startReading(..)

Parameters
  • consumeBroadcasts: true means subscription

virtual boost::system::error_code subscribeToRemoteSignal(const std::string &signalInstanceId, const std::string &signalFunction) = 0

Establish logical signal-slot connection between 2 devices that is required by used protocol for registration

Parameters
  • signalInstanceId: device instance ID of a signal
  • signalFunction: signal name

virtual boost::system::error_code unsubscribeFromRemoteSignal(const std::string &signalInstanceId, const std::string &signalFunction) = 0

Close logical signal-slot connection. De-registration in broker specific API.

Parameters
  • signalInstanceId:
  • signalFunction:

virtual void subscribeToRemoteSignalAsync(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler) = 0

Establish signal-slot connection asynchronously

Parameters
  • signalInstanceId:
  • signalFunction:
  • completionHandler: this callback is called when complete

virtual void unsubscribeFromRemoteSignalAsync(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler) = 0

Unsubscribe from (remote) signal asynchronously

Parameters
  • signalInstanceId:
  • signalFunction:
  • completionHandler:

virtual void startReading(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier = consumer::ErrorNotifier()) = 0

Set up handlers for processing messages arriving via main communication path

Parameters
  • handler: - read handler
  • errorNotifier: - error handler

virtual void stopReading() = 0

Stop processing messages coming via main path

virtual void startReadingHeartbeats(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier) = 0

Set up handlers for processing heartbeat messages arriving via special path.

Heartbeat is used for tracking instances (tracking all instances or no tracking at all)

Parameters
  • handler: - read message handler
  • errorNotifier: - error handler

virtual void write(const std::string &topic, const karabo::data::Hash::Pointer &header, const karabo::data::Hash::Pointer &body) = 0

Send message to broker

Parameters
  • topic:
  • header:
  • body:
  • priority:
  • timeToLive:

Public Static Functions

std::vector<std::string> brokersFromEnv()

Specifies the string of broker URLs from the environment variable KARABO_BROKER. If KARABO_BROKER is not defined, uses a hard coded fallback.

std::string brokerTypeFromEnv()

Specifies the broker type as the protocol of the broker URLs defined by brokersFromEnv(). Throws LogicException if broker addresses specified with different types or without protocol.

std::string brokerTypeFrom(const std::vector<std::string> &urls)

Specifies the broker type as the protocol of the given broker URLs. Throws LogicException if broker addresses specified with different types or without protocol.

std::string brokerDomainFromEnv()

Specify broker domain (i.e. topic for JmsBroker) from environment variables.

First source is KARABO_BROKER_TOPIC, as a fall back the environment variables LOGNAME, USER, LNAME and USERNAME are checked in that order.

class Channel
#include <Channel.hh>

Represents a communication channel used for p2p messaging on a connection to a remote instance. This is only an interface, see TcpChannel for a concrete implementation using the tcp protocol.

Inherits from std::enable_shared_from_this< Channel >

Subclassed by karabo::net::TcpChannel

Public Functions

virtual Connection::Pointer getConnection() const = 0

Return a pointer to the connection this channels belongs to

Return

virtual size_t readSizeInBytes()

Synchronously reads the message’s size. Will block until a message arrives on the socket.

Return
Size in bytes of incoming TCP message

virtual std::string consumeBytesAfterReadUntil(const size_t nBytes)

Synchronously reads size bytes and return them as a string. The reading will block until the bytes are read.

Note
reads up nBytes expecting no header. To be used ONLY after a readAsyncStringUntil operation in case some bytes must be read after readAsyncStringUntil has been used.
Parameters
  • size: This number of bytes will be copied into data

virtual void read(char *data, const size_t &size)

Synchronously reads size bytes into data. The reading will block until the data record is read.

Parameters
  • data: Pre-allocated contiguous block of memory
  • size: This number of bytes will be copied into data

virtual void read(std::vector<char> &data)

This function reads into a vector of chars The reading will block until the data record is read.

Parameters
  • data: A vector which will be updated accordingly

virtual void read(std::string &data)

This function reads into a string The reading will block until the data record is read. CAVEAT: As string is not guaranteed to be represented by a contiguous block of memory this function will always introduce a copy under the hood.

Parameters
  • data: A string which will be updated accordingly

virtual void read(karabo::data::Hash &data)

This function reads into a hash. The reading will block until the data record is read. The reading will block until the data record is read.

Parameters
  • data: Hash object which will be updated

virtual void read(karabo::data::Hash &header, char *data, const size_t &size)

Synchronously reads size bytes from socket into data and provides a header. The reading will block until the data record is read.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: Pre-allocated contiguous block of memory
  • size: This number of bytes will be copied into data

virtual void read(karabo::data::Hash &header, std::vector<char> &data)

This function reads into a header and a vector of chars. The reading will block until the data record is read.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: A vector which will be updated accordingly

virtual void read(karabo::data::Hash &header, std::string &data)

This function reads into header and a string The reading will block until the data record is read. CAVEAT: As string is not guaranteed to be represented by a contiguous block of memory this function will always introduce a copy under the hood.

Parameters
  • data: A string which will be updated accordingly

virtual void read(karabo::data::Hash &header, karabo::data::Hash &data)

This function reads into a header hash and a data hash. The reading will block until the data record is read. The reading will block until the data record is read.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: Hash object which will be updated to contain data information

virtual void readAsyncSizeInBytes(const ReadSizeInBytesHandler &handler)

In case a message arrived, handler will be called back The handler will inform about the number of bytes going to come in

Parameters
  • handler: Call-back function of signature: void (Channel::Pointer, const size_t&)

virtual void readAsyncRaw(char *data, const size_t &size, const ReadRawHandler &handler)

Asynchronously reads size number of bytes into pre-allocated data buffer A handler can be registered to inform about completion of writing NOTE: This function only makes sense calling after having used “readAsyncSizeInBytes”, which gives a chance to correctly pre-allocated memory in user-space.

Parameters
  • data: Pre-allocated contiguous block of memory
  • size: This number of bytes will be copied into data
  • handler: Call-back function of signature: void (Channel::Pointer)

virtual void readAsyncStringUntil(const std::string &terminator, const ReadStringHandler &handler)

Read a string until terminator string is found. (No header is expected).

Parameters
  • terminator: when this string found, read is done
  • handler: handler with signature ReadStringHandler, i.e. void (const boost::system::error_code&, std::string&) is called. second handler parameter is the read string with terminator stripped away

virtual void readAsyncVector(const ReadVectorHandler &handler)

Asynchronously reads data into a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)

virtual void readAsyncString(const ReadStringHandler &handler)

Asynchronously reads data into a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const std::string&)

virtual void readAsyncHash(const ReadHashHandler &handler)

Asynchronously reads data into a hash. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)

virtual void readAsyncHashPointer(const ReadHashPointerHandler &handler)

Asynchronously reads data into a hash. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)

virtual void readAsyncVectorPointer(const ReadVectorPointerHandler &handler)

Asynchronously reads data into a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)

virtual void readAsyncHashVector(const ReadHashVectorHandler &handler)

Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::vector<char>&)

virtual void readAsyncHashString(const ReadHashStringHandler &handler)

Asynchronously reads data into a hash header and a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::string&)

virtual void readAsyncHashHash(const ReadHashHashHandler &handler)

Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)

virtual void readAsyncHashPointerHashPointer(const ReadHashPointerHashPointerHandler &handler)

Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)

virtual void readAsyncHashVectorPointer(const ReadHashVectorPointerHandler &handler)

Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<char>&)

virtual void readAsyncHashVectorBufferSetPointer(const ReadHashVectorBufferSetPointerHandler &handler)

Asynchronously reads data into a hash header and into a vector of BufferSet pointers. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<karabo::data::BufferSet::Pointer>&)

virtual void write(const char *data, const size_t &size)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written

virtual void write(const std::vector<char> &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • data: vector of chars containing the data to be written

virtual void write(const std::string &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • data: string, where each character represents on byte of data to be written

virtual void write(const karabo::data::Hash &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types

virtual void write(const karabo::data::Hash &header, const char *data, const size_t &size)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written

virtual void write(const karabo::data::Hash &header, const std::vector<char> &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written

virtual void write(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written and BufferSet’s layout
  • body: vector of BufferSet pointers

virtual void write(const karabo::data::Hash &header, std::shared_ptr<const std::vector<char>> &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written, passed as a shared pointer

virtual void write(const karabo::data::Hash &header, const std::string &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: string, where each character represents on byte of data to be written

virtual void write(const karabo::data::Hash &header, const karabo::data::Hash &body)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types

virtual void writeAsyncRaw(const char *data, const size_t &size, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncVector(const std::vector<char> &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: vector of chars containing the data to be written
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncVectorPointer(const std::shared_ptr<std::vector<char>> &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: vector of chars containing the data to be written, passed as a shared pointer
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncHash(const karabo::data::Hash &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncHashRaw(const karabo::data::Hash &header, const char *data, const size_t &size, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncHashVector(const karabo::data::Hash &header, const std::vector<char> &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written
  • handler: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncHashVectorPointer(const karabo::data::Hash &header, const std::shared_ptr<std::vector<char>> &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written, passed as a shared pointer
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncHashHash(const karabo::data::Hash &header, const karabo::data::Hash &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • body: data contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
  • handler: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual void writeAsyncHashVectorBufferSetPointer(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body, const WriteCompleteHandler &handler)

Write header and vector<BufferSet::Pointer> asynchronously.

Upon write completion a handler function is called. Data inside the buffers must not be changed or deleted before this handler is called. Special care is needed if any Hash that had been serialised into the buffers contained an NDArray: The raw data of the array will be shared between the BufferSet and the Hash. Deletion of the Hash is safe, though.

Parameters
  • header: containing metadata for the data being written
  • body: data as a vector of BufferSet::Pointer
  • handler: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

virtual size_t dataQuantityRead()

Returns the number of bytes read since the last call of this method

virtual size_t dataQuantityWritten()

Returns the number of bytes written since the last call of this method

virtual void setTimeoutSyncRead(int milliseconds)

Set a timeout in when synchronous reads timeout if the haven’t been handled

Parameters
  • milliseconds:

virtual void close() = 0

Close this channel

virtual bool isOpen() = 0

Check if this channel is open

Return

virtual void writeAsync(const char *data, const size_t &size, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written
  • prio: the priority of this write operation

virtual void writeAsync(const std::vector<char> &data, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • data: vector of chars containing the data to be written
  • prio: the priority of this write operation

virtual void writeAsync(const std::shared_ptr<std::vector<char>> &data, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • data: vector of chars containing the data to be written, passed as a shared pointer
  • prio: the priority of this write operation

virtual void writeAsync(const std::string &data, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • data: passed as a string were each character represents one byte of the message
  • prio: the priority of this write operation

virtual void writeAsync(const karabo::data::Hash &data, int prio = 4, bool copyAllData = true)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
  • prio: the priority of this write operation
  • copyAllData: When false, raw data (ByteArray) inside an NDArray won’t be copied. For other kind of data, the value of this flag is ignored and a copy will take place.

virtual void writeAsync(const karabo::data::Hash &header, const char *data, const size_t &size, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • data: Pointer to a contiguous block of memory that should be written
  • header: containing metadata for the data being written
  • size: This number of bytes will be written
  • prio: the priority of this write operation

virtual void writeAsync(const karabo::data::Hash &header, const std::vector<char> &data, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written
  • prio: the priority of this write operation

virtual void writeAsync(const karabo::data::Hash &header, const std::shared_ptr<std::vector<char>> &data, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written, passed as a shared pointer
  • prio: the priority of this write operation

virtual void writeAsync(const karabo::data::Hash &header, const std::string &data, int prio = 4)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • header: containing metadata for the data being written
  • data: passed as a string were each character represents one byte of the message
  • prio: the priority of this write operation

virtual void writeAsync(const karabo::data::Hash &header, const karabo::data::Hash &data, int prio = 4, bool copyAllData = true)

Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion

Parameters
  • header: containing metadata for the data being written
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
  • prio: the priority of this write operation.
  • copyAllData: When false, raw data (ByteArray) inside an NDArray won’t be copied. For other kind of data, the value of this flag is ignored and a copy will take place.

virtual void setAsyncChannelPolicy(int priority, const std::string &new_policy, const size_t capacity = 0)

Set the policy of how data is queue on this channel for the queue of the given priority. Policies are:

  • “LOSSLESS”: all data is queue and the queue increases in size within incoming data
  • ”REJECT_NEWEST”: if the queue’s fixed capacity is reached new data is rejected
  • ”REMOVE_OLDEST”: if the queue’s fixed capacity is the oldest data is rejected

NOTE: This method can potentially modify the capacity of a queue which is in use! This is undefined behavior. Users are encouraged to only call this method when intializing a Channel object instance.

Parameters
  • priority: of the queue to set the policy for
  • policy: to set for this queue
  • capacity: is an optional capacity for the queue

class Connection
#include <Connection.hh>

This class serves as the interface for all connections. A connection is only established upon call of the start() function.

Inherits from std::enable_shared_from_this< Connection >

Subclassed by karabo::net::TcpConnection

Public Functions

virtual ChannelPointer start() = 0

Starts the connection

virtual int startAsync(const ConnectionHandler &handler)

Starts the connection asynchronously

Parameters
  • handler: A callback with the following signature: void myHandler(ErrorCode, ChannelPointer)

virtual void stop() = 0

Stops the connection

virtual ChannelPointer createChannel() = 0

This function creates a “channel” for the given connection.

Return
Pointer to Channel

class ConnectionHandler
#include <AmqpConnection.hh>

Declare our custom ConnectionHandler to implement their callbacks. Every callback delegates processing to externally assigned callback if it was set. ConnectionHandler object calls its callbacks as follows… while connecting… onAttached, onConnected, onReady in case of error … onError, (onLost), onDetached in case of normal closure … onClosed, onDetached.

onAttached is called if new connection is associated with the handler onConnected is called if physical (TCP) connection is successfully established. onReady is called if login is successful onClosed is called in case of normal connection closure onError is called if errors encountered on connection onLost is called if onError was called and earlier onConnected was called. onDetached id called as a last step of connection loss

Inherits from LibBoostAsioHandler

class EventLoop
#include <EventLoop.hh>

Karabo’s central event loop. Asynchronous events are passed throughout the distributed system by posting to the loop.

Public Static Functions

void addThread(const int nThreads = 1)

Add a number of threads to the event loop, increasing the number of thread available to handle events posted to the loop

Parameters
  • nThreads:

void removeThread(const int nThreads = 1)

Remove a number of threads from the event loop, reducing the number of threads available to handle events posted to the loop

Parameters
  • nThreads:

template <class Function>
void post(Function &&func, unsigned int delayMs = 0)

Post a task on the underlying io event loop for later execution

Parameters
  • func: a functor not taking any argument, but with any return type
  • delayMs: execution will be delayed by given time (in milliseconds)

boost::asio::io_context &getIOService()

Return the Eventloop’s underlying boost::asio::io_service

Return

void work()

Start the event loop and block until EventLoop::stop() is called.

The system signals SIGINT and SIGTERM will be caught and trigger the following actions:

  • a signal handler set via setSignalHandler is called,
  • and EventLoop::stop() is called.

Must not be called in parallel to itself or to run().

If one or more tasks are in deadlock and thus their threads cannot be joined at the end, a karabo::data::TimeoutException is thrown.

void run()

Start the event loop and block until all work posted to its io service is completed or until EventLoop::stop() is called.

Must not be called in parallel to itself or to work().

If one or more tasks are in deadlock and thus their threads cannot be joined at the end, a karabo::data::TimeoutException is thrown.

void stop()

Stop the event loop, canceling any remaining work, i.e. unblocking run()

size_t getNumberOfThreads()

Return the number of threads currently available to the event loop for distributing work

Return

void setSignalHandler(const SignalHandler &handler)

Set the handler to be called if a system signal is caught.

See work() about which signals are caught.

Parameters
  • handler: function with signature ‘void (int signal)’

static bool setCatchExceptions(bool flag)

Public flag to change behaviour of catching exceptions in threads

By default, flag is true and the event loop runs its threads such that any exception is caught and logged as error. If flag is false, exceptions are rethrown after logging them.

Return
previous value of flag
Parameters
  • flag: that should be valid now

Private Functions

void clearThreadPool()

Clears the thread pool and joins the threads

If joining fails repeatedly, throws karabo::data::TimeoutException.

class HttpClient

Public Functions

HttpClient(const std::string &baseURL, bool verifyCerts = false)

Creates a web client capable of submitting GET and POST requests to a given URL, over a secure or a plain connection.

Parameters
  • baseURL: the base URL to be prepended to each request path.
  • verifyCerts: when set to false, allows self generated server certificates when connecting securely (by bypassing certificate origin verification).

class Impl

Implementation of the WebClient class.

class HttpRequestRunner

Inherits from std::enable_shared_from_this< HttpRequestRunner >

class HttpsRequestRunner

Inherits from std::enable_shared_from_this< HttpsRequestRunner >

class InfluxDbClient
#include <InfluxDbClient.hh>

This class uses HTTP protocol for communications with InfluxDB server. The protocol follows request/response pattern and before sending next request the response from the current one should be received. Only one request/response session per connection is allowed. To follow this rule the internal queue is used. Any request (functor) first is pushed into internal queue and then checked the internal state if some current request/response session is ongoing. If not, the next request is popped from front side of internal queue and executed. The internal flag (state) is raised. When response callback is called it checks if the internal queue has next entry and if so this entry is popped and executed. If not, the internal flag is lowered. For the time being the internal queue has no limits defined so it is possible that if the client cannot cope with input load rate some overflow condition can be encountered. The practice should show how we can handle these problems.

Inherits from std::enable_shared_from_this< InfluxDbClient >

Public Functions

void startDbConnectIfDisconnected(const InfluxConnectedHandler &hook = InfluxConnectedHandler())

Check if connection is lost and try to re-establish connection to InfluxDB server

Parameters
  • hook: function that will be called when connection is established

bool isConnected()

Returns true if connection is established to InfluxDB server

std::string influxVersion()

The version of the InfluxDb server the client is connected to.

Return
std::string the connected InfluxDb server version (empty if no server is currently connected).

std::string serverUrl()

The url of the InfluxDb server the client is connected to (or supposed to connect to).

Return
std::string the InfluxDb server url.

void queryDb(const std::string &statement, const InfluxResponseHandler &action)

HTTP request “GET /query …” to InfluxDB server is registered in internal queue. Can be called with connection to InfluxDB or without. Blocking if no connection exists. Otherwise non-blocking.

Parameters
  • statement: is SELECT expression.
  • action: callback: void(const HttpResponse&) is called when response comes from InfluxDB server

void postQueryDb(const std::string &statement, const InfluxResponseHandler &action)

HTTP request “POST /query …” to InfluxDB server is registered in internal queue.

Parameters
  • statement: SELECT, SHOW, DROP and others QL commands
  • action: callback: void(const HttpResponse&) is called when response comes from InfluxDB server

void flushBatch(const InfluxResponseHandler &respHandler = InfluxResponseHandler())

Flushes the contents of the write buffer to the InfluxDb.

Parameters
  • respHandler: If defined, the handler function will be called with the response sent by Influx after it accepted and processed the current batch of updates. If not defined, the flushBatch will work in a call-and-forget mode.

void getPingDb(const InfluxResponseHandler &action)

HTTP request “GET /ping …” to InfluxDB server is registered in internal queue.

Parameters
  • action: callback: void(const HttpResponse&) is called when response comes from InfluxDB server

bool connectWait(std::size_t millis)

Returns true if connection is established in “millis” time range, otherwise timeout condition comes up and returns false.

Return
true if connection established, or false in case of timeout
Parameters
  • millis: time in milliseconds to wait for connection to be established

Public Static Functions

std::string generateUUID()

Returns UUID used as Request-ID for HTTP requests

Private Functions

void writeDb(const std::string &message, const std::string &requestId)

Writing HTTP request

Parameters
  • message: formed in compliance of HTTP protocol Malformed requests resulting in response code 4xx
  • requestId: unique id of the HTTP request to be sent to Influx.

void onDbConnect(const karabo::net::ErrorCode &ec, const karabo::net::Channel::Pointer &channel, const InfluxConnectedHandler &hook)

Low-level callback called when connection to InfluxDB is established

void onDbRead(const karabo::net::ErrorCode &ec, const std::string &data)

Low-level callback called when reading is done

void onDbWrite(const karabo::net::ErrorCode &ec, std::shared_ptr<std::vector<char>> p)

Low-level callback called when writing into networks interface is done

void postWriteDb(const std::string &batch, const InfluxResponseHandler &action)

HTTP request “POST /write …” to InfluxDB server is registered in internal queue.

Parameters

void postQueryDbTask(const std::string &statement, const InfluxResponseHandler &action)

Actual “POST /query …” is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call

void getPingDbTask(const InfluxResponseHandler &action)

Actual “GET /ping …” is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call

void postWriteDbTask(const std::string &batch, const InfluxResponseHandler &action)

Actual “POST /write …” is accomplished. Non-blocking call, Connection should be established before this call

void queryDbTask(const std::string &statement, const InfluxResponseHandler &action)

Actual “GET /query …” is accomplished. If no connection to DB, this call is blocked until the connection is established. Otherwise the call is non-blocking.

void onResponse(const HttpResponse &o, const InfluxResponseHandler &action)

Generic wrap callback is called and call in turn the user “action”.

void tryNextRequest(std::unique_lock<std::mutex> &requestQueueLock)

Try to take the next request from internal queue and execute it. Set internal state to be “active” if it was not. Helper function.

Parameters
  • requestQueueLock: must be locked scoped_lock of m_requestQueueMutex, will be unlocked afterwards

void sendToInfluxDb(const std::string &msg, const InfluxResponseHandler &action, const std::string &requestId)

Send HTTP request to InfluxDb. Helper function.

Wraps the given InfluxResponseHandler within a callback to onResponse. It will be up to onResponse to call the action InfluxResponseHandler and keep the consumption of requests submitted to the InfluxDbClient going.

std::string getRawBasicAuthHeader()

Gets the raw form of the http Authorization header with values of dbUser and dbPassword separated by a colon and base64 encoded.

Return
The raw form of the Authorization header.

void handleHttpReadError(const std::string &errMsg, const std::string &requestId, bool logAsError = true)

Handle unrecoverable read and parsing errors while processing HTTP responses from Influx.

The recovery involves recycling the network connection, as there is no way to recover synchronism in the read operation within the current connection after those kind of errors happen. Also generates an HTTP response with status code 700 and an error message to communicate to users of the InfluxDbClient instance.

Parameters
  • errMsg: the error message to be put in the generated 700 coded http response.
  • requestId: the unique identifier of the HTTP request whose response could not be processed (needed to update the internal bookeeping of the InfluxDb client).
  • logEsError: if true (default), log as info, else as error

class LosslessQueue
#include <Queues.hh>

The LosslessQueue implements a queue that guarantees to preserve messages.

Inherits from karabo::net::Queue

Subclassed by karabo::net::RejectNewestQueue

Public Functions

size_t size()

Return the size of the queue, i.e. the number of messages it holds

Return

size_t max_size()

Return the maximum allowed size of this queue

Return

void set_capacity(size_t capacity)

Set the capacity in terms of messages this queue can hold

Parameters
  • capacity:

size_t capacity()

Return this queues message capacity

Return

void clear()

Clear this queue

bool empty()

Check if this queue is empty, i.e. size is 0

Return

bool full()

Check if this queue is full, i.e. if it has reached its maximum capacity

Return

void resize(size_t new_size)

Resize the queue to a new size

Parameters
  • new_size:

const Message::Pointer &front()

Return the first element in the queue

Return

void push_back(const Message::Pointer &entry)

Add an element to the end of the queue, increases the size by one

Parameters
  • entry:

void pop_front()

Pop the first element from the queue, decreases the size by one

class Message
#include <Queues.hh>

This class represents a message in the distributed Karabo system.

Public Functions

const karabo::data::BufferSet::Pointer &body() const

Return the body of the message

Return

const VectorCharPointer &header() const

Return the header of the message

Return

class NetworkInterface

Public Functions

NetworkInterface(const std::string &name_or_ip, bool exclude_loopback = true)

Construct a NetworkInterface object from an interface or IP address.

const std::string &name() const

Return the interface name for the object (for instance, ‘lo’ or ‘enp4s0’)

const std::string &presentationIP() const

Return the presentation address for the object.

The presentation address is the IP address (four numbers between 0 and 255, separated with ‘.’)

Private Functions

void constructFromInterface(const struct ifaddrs *ifa)

Fill the object fields with the information specified in the ifaddrs object passed as parameter.

struct OneTimeTokenAuthorizeResult
#include <UserAuthClient.hh>

The results of a one-time token validation / authorization.

Subclassed by karabo::devices::BeginTemporarySessionResult

class Queue
#include <Queues.hh>

This class defines the interface for message queues in the Karabo distributed system.

Subclassed by karabo::net::LosslessQueue, karabo::net::RemoveOldestQueue

Public Functions

virtual size_t size() = 0

Return the size of the queue, i.e. the number of messages it holds

Return

virtual size_t max_size() = 0

Return the maximum allowed size of this queue

Return

virtual void set_capacity(size_t capacity) = 0

Set the capacity in terms of messages this queue can hold

Parameters
  • capacity:

virtual size_t capacity() = 0

Return this queues message capacity

Return

virtual void clear() = 0

Clear this queue

virtual bool empty() = 0

Check if this queue is empty, i.e. size is 0

Return

virtual bool full() = 0

Check if this queue is full, i.e. if it has reached its maximum capacity

Return

virtual void resize(size_t new_size) = 0

Resize the queue to a new size

Parameters
  • new_size:

virtual const Message::Pointer &front() = 0

Return the first element in the queue

Return

virtual void push_back(const Message::Pointer &entry) = 0

Add an element to the end of the queue, increases the size by one

Parameters
  • entry:

virtual void pop_front() = 0

Pop the first element from the queue, decreases the size by one

class RejectNewestQueue
#include <Queues.hh>

The RejectNewestQueue implements a queue that will reject new entries when it has reached its maximum capacity.

Inherits from karabo::net::LosslessQueue

Public Functions

void set_capacity(size_t capacity)

Set the capacity in terms of messages this queue can hold

Parameters
  • capacity:

size_t capacity()

Return this queues message capacity

Return

size_t max_size()

Return the maximum allowed size of this queue

Return

void push_back(const Message::Pointer &entry)

Add an element to the end of the queue, increases the size by one

Parameters
  • entry:

class RemoveOldestQueue
#include <Queues.hh>

The RemoveOldestQueue implements a queue that removes the oldest element in the queue when it has reached is maximum capacity and a new element is pushed to it.

Inherits from karabo::net::Queue

Public Functions

size_t size()

Return the size of the queue, i.e. the number of messages it holds

Return

size_t max_size()

Return the maximum allowed size of this queue

Return

void set_capacity(size_t capacity)

Set the capacity in terms of messages this queue can hold

Parameters
  • capacity:

size_t capacity()

Return this queues message capacity

Return

void clear()

Clear this queue

bool empty()

Check if this queue is empty, i.e. size is 0

Return

bool full()

Check if this queue is full, i.e. if it has reached its maximum capacity

Return

void resize(size_t new_size)

Resize the queue to a new size

Parameters
  • new_size:

const Message::Pointer &front()

Return the first element in the queue

Return

void push_back(const Message::Pointer &entry)

Add an element to the end of the queue, increases the size by one

Parameters
  • entry:

void pop_front()

Pop the first element from the queue, decreases the size by one

struct RemoveThreadException
#include <EventLoop.hh>

An exception that is thrown if a thread cannot be removed from the EventLoop

Inherits from std::exception

class Strand
#include <Strand.hh>

A poor man’s substitute for boost::asio::strand because that does not guarantee that handlers posted on different strands can run in parallel (“strand collision”). Compared to boost::asio::strand, this

  • lacks dispatch: we usually do not want that in Karabo since it allows the handler to be called now in this scope
  • lacks running_in_this_thread: probably not too important
  • has a more restrictive wrap: would be useful to support more, but a proper implementation would also need dispatch

Every handler posted will be put into a FIFO queue and the FIFO will be emptied in the background by posting the handlers to the given boost::asio::io_context (either from net::EventLoop, passed in constructor, or defined by setContext(..)).

NOTE: Do not create a Strand on the stack, but do it on the heap using the Configurator:

auto stack = karabo::data::Configurator<Strand>::create(Hash(…));

Otherwise ‘enable_shared_from_this’ does not work which is needed to guarantee (via usage of karab::util::bind_weak) that an internal Strand method is executed on the event loop when the Strand is already destructed.

Inherits from std::enable_shared_from_this< Strand >

Public Functions

Strand(boost::asio::io_context &ioContext)

Contructor only kept for backward compatibility.

Better use karabo::data::Configurator<Strand>::create(“Strand”, Hash())

Strand(const karabo::data::Hash &config)

Construct the Strand.

The boost::asio::io_context of the karabo::net::EventLoop will be used.

Best use this constructor indirectly via karabo::data::Configurator<Strand>::create(“Strand”, cfg) which will validate cfg and create the Strand properly on the heap.

Keys of cfg are “maxInARow” (unsigned int) and “guaranteeToRun” (bool), see expectedParameters.

void setContext(boost::asio::io_context &ioContext)

Set the context to which the handlers are to be posted.

No concurrency protection: Must be called directly after Strand creation, before it is used.

void post(const std::function<void()> &handler)

Post a handler to the io_context with the guarantee that it is not executed before any other handler posted before has finished. Handlers posted on different Strands can always be run in parallel.

Note that “guaranteeToRun” flag of the constructor determines what happens with yet unhandled handlers when the Strand is destructed.

Parameters
  • handler: function without arguments and return value - will be copied

void post(std::function<void()> &&handler)

Post a handler to the io_context with the guarantee that it is not executed before any other handler posted before has finished. Handlers posted on different Strands can always be run in parallel.

Note that “guaranteeToRun” flag of the constructor determines what happens with yet unhandled handlers when the Strand is destructed.

Parameters
  • handler: function without arguments and return value as r-value reference - will be moved to avoid a copy

std::function<void()> wrap(std::function<void()> handler)

This function is used to create a new handler function object that, when invoked, will pass the wrapped handler to the Strand’s post function (instead of using dispatch as boost::io_service::strand::wrap does).

Return
A function object that, when invoked, passes the wrapped handler to the Strand’s post function.
Parameters
  • handler: The handler to be wrapped. The strand will make a copy of the handler object. Compared to boost::io_service::strand::wrap, the handler signature is much more restricted, i.e. must be void().

boost::asio::io_context &getContext() const

This function may be used to obtain the io_context object that the strand uses to post handlers.

Return
A reference to the io_context of the Strand. Ownership is not transferred to the caller.

boost::asio::io_context &get_io_service() const

Deprecated.

Use getContext() instead.

Private Functions

void startRunningIfNeeded()

Helper for post - to be called under protection of m_tasksMutex!

void run()

Helper to run one task after another until tasks queue is empty.

class TcpChannel

Inherits from karabo::net::Channel

Public Functions

Connection::Pointer getConnection() const

Pointer to connection of this channel - empty if that is not alive anymore

size_t readSizeInBytes()

Synchronously reads the TCP message’s size. Will block until a message arrives on the socket.

Return
Size in bytes of incoming TCP message

void readAsyncSizeInBytes(const ReadSizeInBytesHandler &handler)

In case a TCP message arrived, handler will be called back The handler will inform about the number of bytes going to come in The handler must have the following signature: void handler(Channel::Pointer, const size_t&)

Parameters
  • handler: Callback with signature: void (Channel::Pointer, const size_t&)

std::string consumeBytesAfterReadUntil(const size_t nBytes)

Synchronously reads size bytes and return them as a string. The reading will block until the bytes are read.

Note
reads up nBytes expecting no header. This method should ONLY be used in association with readAsyncStringUntil - they consume from the same boost::asio::streamBuffer and should not be used in the same context of the other read* methods.
Parameters
  • size: This number of bytes will be copied into data

void read(char *data, const size_t &size)

Synchronously reads size bytes from TCP socket into data.

Parameters
  • data: Pre-allocated contiguous block of memory
  • size: This number of bytes will be copied into data

void read(std::vector<char> &data)

This function reads from a channel into vector of chars The reading will block until the data record is read. The vector will be updated accordingly (must not be pre-allocated before)

Return
void

void read(std::shared_ptr<std::vector<char>> &data)

This function reads from a channel into shared pointer of vector of chars The reading will block until the data record is read. The shared pointer of vector will be updated accordingly (must not be pre-allocated before)

Return
void

void read(karabo::data::Hash &data)

This function reads from a channel into vector of chars The reading will block until the data record is read. The size of data record is the first 4 bytes in a channel stream. The hash will be updated accordingly.

Return
void

void read(karabo::data::Hash &header, char *data, const size_t &size)

Synchronously reads size bytes from socket into data and provides a header.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: Pre-allocated contiguous block of memory
  • size: This number of bytes will be copied into data

void read(karabo::data::Hash &header, std::vector<char> &data)

This function reads into a header and a vector of chars. The reading will block until the data record is read.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: A vector which will be updated accordingly

void read(karabo::data::Hash &header, std::shared_ptr<std::vector<char>> &data)

This function reads into a header and shared pointer of vector of chars. The reading will block until the data record is read.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: A shared pointer of a vector which will be updated accordingly

void read(karabo::data::Hash &header, karabo::data::Hash &data)

This function reads into a header hash and a data hash. The reading will block until the data record is read. The reading will block until the data record is read.

Parameters
  • header: Hash object which will be updated to contain header information
  • data: Hash object which will be updated to contain data information

void readAsyncRaw(char *data, const size_t &size, const ReadRawHandler &handler)

Asynchronously reads size number of bytes into pre-allocated data buffer A handler can be registered to inform about completion of writing

Parameters
  • data: Pre-allocated contiguous block of memory
  • size: This number of bytes will be copied into data
  • handler: Function of signature: <void (Channel::Pointer)> which will be call-backed upon read completion

void readAsyncStringUntil(const std::string &terminator, const ReadStringHandler &handler)

wrapper around boost::asio::async_read_until

Read a string until terminator string is found. (No header is expected).

Note
This method should ONLY be used in association with consumeBytesAfterReadUntil - they consume from the same boost::asio::streambuf and should not be used in the same context of the other read* methods.
Parameters
  • terminator: when this string found, read is done
  • handler: handler with signature ReadStringHandler, i.e. void (const boost::system::error_code&, std::string&) is called. second handler parameter is the read string including the terminator

void readAsyncString(const ReadStringHandler &handler)

Asynchronously reads data into a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const std::string&)

void readAsyncVector(const ReadVectorHandler &handler)

Asynchronously reads data into a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)

void readAsyncHash(const ReadHashHandler &handler)

Asynchronously reads data into a hash. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)

void readAsyncHashPointer(const ReadHashPointerHandler &handler)

Asynchronously reads data into a hash. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)

void readAsyncVectorPointer(const ReadVectorPointerHandler &handler)

Asynchronously reads data into a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)

void readAsyncHashVector(const ReadHashVectorHandler &handler)

Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::vector<char>&)

void readAsyncHashString(const ReadHashStringHandler &handler)

Asynchronously reads data into a hash header and a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.

Parameters
  • handler: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::string&)

void readAsyncHashHash(const ReadHashHashHandler &handler)

Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)

void readAsyncHashPointerHashPointer(const ReadHashPointerHashPointerHandler &handler)

Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)

void readAsyncHashVectorPointer(const ReadHashVectorPointerHandler &handler)

Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<char>&)

void readAsyncHashVectorBufferSetPointer(const ReadHashVectorBufferSetPointerHandler &handler)

Asynchronously reads data into a hash header and into a vector of BufferSet pointers. All memory management is done by the API.

Parameters
  • handler: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<karabo::data::BufferSet::Pointer>&)

void write(const char *data, const size_t &size)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written

void write(const karabo::data::Hash &header, const char *data, const size_t &size)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written

void write(const karabo::data::Hash &data)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types

void write(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written and BufferSet’s layout
  • body: vector of BufferSet pointers

void write(const karabo::data::Hash &header, const karabo::data::Hash &body)

Synchronous write. The function blocks until all bytes are written.

Parameters
  • header: containing metadata for the data being written
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types

void writeAsyncRaw(const char *data, const size_t &size, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

void writeAsyncVector(const std::vector<char> &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: vector of chars containing the data to be written
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

void writeAsyncHash(const karabo::data::Hash &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • data: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

void writeAsyncHashRaw(const karabo::data::Hash &header, const char *data, const size_t &size, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • data: Pointer to a contiguous block of memory that should be written
  • size: This number of bytes will be written
  • handler: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

void writeAsyncHashVector(const karabo::data::Hash &header, const std::vector<char> &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • data: vector of chars containing the data to be written
  • handler: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

void writeAsyncHashVectorBufferSetPointer(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body, const WriteCompleteHandler &handler)

Write header and vector<BufferSet::Pointer> asynchronously.

Upon write completion a handler function is called. Data inside the buffers must not be changed or deleted before this handler is called. Special care is needed if any Hash that had been serialised into the buffers contained an NDArray: The raw data of the array will be shared between the BufferSet and the Hash. Deletion of the Hash is safe, though.

Parameters
  • header: containing metadata for the data being written
  • body: data as a vector of BufferSet::Pointer
  • handler: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

void writeAsyncHashHash(const karabo::data::Hash &header, const karabo::data::Hash &data, const WriteCompleteHandler &handler)

Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called

Parameters
  • header: containing metadata for the data being written
  • body: data contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
  • handler: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.

size_t dataQuantityRead()

Returns the number of bytes read since the last call of this method

size_t dataQuantityWritten()

Returns the number of bytes written since the last call of this method

void close()

Close this channel

bool isOpen()

Check if this channel is open

Return

karabo::data::Hash queueInfo()

Records the sizes of the write queues in a Hash. Useful for debugging devices with multiple channels open (like the GuiServerDevice…)

std::string remoteAddress() const

Address of the remote endpoint

void writeAsync(const char *data, const size_t &size, int prio)

Writes a copy from the data array.

void writeAsync(const std::vector<char> &data, int prio)

Writes a copy of the data vector.

void writeAsync(const std::shared_ptr<std::vector<char>> &data, int prio)

Sends the vector pointed by data. The data vector must not be changed after the call to writeAsync.

void writeAsync(const std::string &data, int prio)

Writes a copy of the data string.

void writeAsync(const karabo::data::Hash &data, int prio, bool copyAllData)

When copyAllData is false, elements of type NDArray in the hash won’t be copied before being sent.

void writeAsync(const karabo::data::Hash &header, const char *data, const size_t &size, int prio)

Writes copies of the header hash and the data array.

void writeAsync(const karabo::data::Hash &header, const std::vector<char> &data, int prio)

Writes copies of the header hash and of the data vector.

void writeAsync(const karabo::data::Hash &header, const std::shared_ptr<std::vector<char>> &data, int prio)

Writes a copy of the header hash. Sends the vector pointed by data, not a copy of it. The data vector must not be changed after the call to writeAsync.

void writeAsync(const karabo::data::Hash &header, const std::string &data, int prio)

Writes copies of the header hash and of the data string.

void writeAsync(const karabo::data::Hash &header, const karabo::data::Hash &data, int prio, bool copyAllData)

When copyAllData is false, elements of type NDArray in the body hash won’t be copied before being sent. copyAllData doesn’t influence the handling of the header hash.

void setAsyncChannelPolicy(int priority, const std::string &new_policy, const size_t capacity = 0)

Set the policy of how data is queue on this channel for the queue of the given priority. Policies are:

  • “LOSSLESS”: all data is queue and the queue increases in size within incoming data
  • ”REJECT_NEWEST”: if the queue’s fixed capacity is reached new data is rejected
  • ”REMOVE_OLDEST”: if the queue’s fixed capacity is the oldest data is rejected

NOTE: This method can potentially modify the capacity of a queue which is in use! This is undefined behavior. Users are encouraged to only call this method when intializing a Channel object instance.

Parameters
  • priority: of the queue to set the policy for
  • policy: to set for this queue
  • capacity: is an optional capacity for the queue

Public Static Functions

karabo::data::Hash getChannelInfo(const std::shared_ptr<karabo::net::TcpChannel> &ptr)

This function returns low level info about connection like … “localIp”, “localPort”, “remoteIp”, “remotePort” that constitute active connection. in form of Hash container

Return
Hash with 4 key/value pairs
Parameters

Private Functions

void bytesAvailableHandler(const boost::system::error_code &e)

Internal default handler

Parameters
  • channel:

void onSizeInBytesAvailable(const ErrorCode &error, const ReadSizeInBytesHandler &handler)

This function calls the corresponding handler

Parameters
  • handler:
  • error:

void onVectorBufferSetPointerAvailable(const ErrorCode &error, size_t length, const std::vector<karabo::data::BufferSet::Pointer> &buffers, const ReadVectorBufferSetPointerHandler &handler)

Internal handler called after filling the buffer set

Parameters
  • error: error code
  • length: number of bytes read == total size of buffer set
  • buffers: vector of buffer set pointers with the data
  • handler: to be called

void byteSizeAvailableHandler(const size_t byteSize)

Internal default handler

Parameters
  • byteSize:

karabo::data::BufferSet::Pointer bufferSetFromString(const std::string &str)

Creates a buffer set with the given string stored in its sole buffer.

Return
shared_ptr to the buffer set with the string stored.
Note
actually places a copy of the string into the buffer set.
Parameters
  • str: the string to be stored in the buffer set.

karabo::data::BufferSet::Pointer bufferSetFromPointerToChar(const char *data, size_t size)

Creates a buffer set with contents of a given buffer of chars stored in its sole buffer.

Return
shared_ptr to the buffer set with the input buffer contents stored.
Note
actually places a copy of the contents of the input buffer into the buffer set.
Parameters
  • data: a pointer to the first char in the input sequence.

karabo::data::BufferSet::Pointer bufferSetFromVectorCharPointer(const VectorCharPointer &dataVect)

Creates a buffer set with characters in a given vector of chars stored in its sole buffer.

Return
shared_ptr to the buffer set with the character in the vector stored.
Parameters
  • data: a pointer to the vector of chars to be stored in the buffer set.

karabo::data::BufferSet::Pointer bufferSetFromHash(const karabo::data::Hash &data, bool copyAllData)

Creates a buffer set with a given hash stored in its sole buffer.

Return
pBuffSet a shared pointer that will be pointed to the newly created buffer set with the hash.
Parameters
  • data: the hash to be stored in the buffer set.
  • copyAllData: if false no copy of any NDArray internal to the hash will be made upon storing the hash in the bufferset (the buffer set will actually become one of the “owners” of the NDArray).

unsigned int storeCompleteHandler(const WriteCompleteHandler &handler)

Helper to store async write completion handler

Return
index under which handler is stored in internal map
Parameters
  • handler: write completion handler to be stored

void applySocketKeepAlive()

Helper to apply the TCP keep-alive settings to the socket if configured to do so.

Requires that m_socketMutex is locked.

class TcpConnection
#include <TcpConnection.hh>

a class for handling tcp connections

This class serves as the interface for all connections. A connection is only established upon call of the start() function. It is a factory class and thus can be configured using its expected parameters

Inherits from karabo::net::Connection

Public Functions

Channel::Pointer start()

Starts the connection

int startAsync(const ConnectionHandler &slot)

Starts the connection asynchronously providing a slot

void stop()

Closes the connection

ChannelPointer createChannel()

This function creates a “channel” for the given connection.

Return
Pointer to Channel

class UserAuthClient

Public Functions

void authorizeOneTimeToken(const std::string &token, const std::string &topic, const AuthOneTimeTokenHandler &handler)

Validate and authorize, asynchronously, a given one-time token against a given topic.

Parameters
  • token: the token to be validated and authorized.
  • topic: the topic against which the user linked to a valid token will be authorized.
  • handler: the handler to be called when the token is processed.

namespace consumer

Typedefs

using karabo::net::consumer::MessageHandler = typedef std::function<void(karabo::data::Hash::Pointer, karabo::data::Hash::Pointer)>
using karabo::net::consumer::ErrorNotifier = typedef std::function<void(Error, const std::string& description)>

Enums

enum Error

Values:

drop = 0
type

messages have been dropped

unknown

status reported is not specially treated or unknown

message of wrong type (i.e. non binary format, serialisation failure, …) received and dropped

The karabo::xms Namespace

namespace karabo::xms

Namespace for package xms

Namespace for package io

Typedefs

typedef InputChannelElement INPUT_CHANNEL_ELEMENT
typedef InputChannelElement INPUT_CHANNEL
typedef std::function<void(const std::vector<karabo::data::Hash>&)> ShowConnectionsHandler
typedef std::function<void(const std::vector<unsigned long long>&, const std::vector<unsigned long long>&)> ShowStatisticsHandler
typedef std::function<std::string(const std::vector<std::string>&)> SharedInputSelector
typedef OutputChannelElement OUTPUT_CHANNEL_ELEMENT
typedef OutputChannelElement OUTPUT_CHANNEL

Functions

Hash getHeartbeatInfo(const Hash &instanceInfo)

Variables

const int kMaxServerInitializationAttempts = 2000
const int msPingTimeoutInIsValidInstanceId = 1000

Milliseconds of timeout when asking for validity of my id at startup:

const char *beatsTopicSuffix = "_beats"
const int channelReconnectIntervalSec = 6
const int getOutChannelInfoTimeoutMsec = 3000
class InputChannel
#include <InputChannel.hh>

The InputChannel class is used to receive data from pipelined processing OutputChannels.

The InputChannel class is used to receive data from pipelined processing OutputChannels. It additionally supports receiving meta data associated with each data token read. Specifically, the meta information contains source information, i.e. what produced the data token, and timing information, e.g. train ids.

void onInput(const InputChannel::Pointer& input) {

     for (unsigned int i = 0; i != input->size(); ++i) {
          Hash h;
          const InputChannel::MetaData& meta = input->read(i);
          std::cout<<"Source: <<meta.getSource()<<std::endl;
          std::cout<<"TrainId: <<meta.getTimestamp().getTrainId()<<std::endl;
     }
}

Inherits from std::enable_shared_from_this< InputChannel >

Public Functions

InputChannel(const karabo::data::Hash &config)

If this object is constructed using the factory/configuration system this method is called

See
expectedParameters) and default-filled configuration
Parameters
  • input: Validated (

void reconfigure(const karabo::data::Hash &config, bool allowMissing = true)

Reconfigure InputChannel Disconnects any previous “connectedOutputChannels” if not in config[“connectedOutputChannels”].

Parameters
  • config: as needed by the constructor
  • allowMissing: if true, lack of keys “dataDistribution”, “minData”, “onSlowness”, “delayOnInput” and “respondToEndOfStream” is OK and their respective previous configuration is kept, if false, an exception is thrown when these keys are missing in config

void registerInputHandler(const InputHandler &ioInputHandler)

Register handler to be called when new data has arrived. For each index i from 0 to < size(), data and meta data can be received via read(i, dataHash) and readMetaData(i, metaData), respectively.

Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.

void registerDataHandler(const DataHandler &ioDataHandler)

Register handler to be called for each data item that arrives.

Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.

void registerEndOfStreamEventHandler(const InputHandler &endOfStreamEventHandler)

Register handler to be called when connected output channels inform about end-of-stream. If connected to more than one output channel, the handler is called if the last of them sends the end-of-stream signal.

Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when the end-of-stream signal arrives, i.e. this registration must not be called if (being) connected to any outpu channel.

InputChannel::Handlers getRegisteredHandlers() const

Get handlers registered for data, input and end-of-stream handling.

Do not call concurrrently with the corresponding register[Data|Input|EndOfStreamEvent]Handler() methods.

size_t dataQuantityRead()

Returns the number of bytes read since the last call of this method

See karabo::util::TcpChannel::dataQuantityRead()

size_t dataQuantityWritten()

Returns the number of bytes written since the last call of this method

See karabo::util::TcpChannel::dataQuantityWritten()

std::map<std::string, karabo::data::Hash> getConnectedOutputChannels()

Returns a map of between “output channel string” and “output channel info” Hash outputChannelString (STRING) represented like “instanceId:channelName” outputChannelInfo contains connection parameters or is empty, depending on connection state. This contains all output channels that the InputChannel is configured for, irrespective whether currently connected or not.

Return
map.

std::unordered_map<std::string, karabo::net::ConnectionStatus> getConnectionStatus()

Provide a map between the output channels that are configured and their connection status.

Return
map

const InputChannel::MetaData &read(karabo::data::Hash &data, size_t idx = 0)

Read data from the InputChannel - to be called inside an InputHandler callback

Kept for backward compatibility only since internally the data is copied! Use one of the other read methods instead.

Return
meta data associated to the data token. Lifetime of the object corresponds to live time of the InputHandler callback.
Parameters
  • data: reference that will hold the data
  • idx: of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens

karabo::data::Hash::Pointer read(size_t idx = 0)

Read data from the InputChannel - to be called inside an InputHandler callback

Return
the data as a pointer
Parameters
  • idx: of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens

karabo::data::Hash::Pointer read(size_t idx, MetaData &source)

Read data and meta data from the InputChannel - to be called inside an InputHandler callback

Return
the data as a pointer
Parameters
  • idx: of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens
  • source: reference that will hold the meta data

size_t size()

Number of data tokens - to be called inside an InputHandler callback

void connect(const karabo::data::Hash &outputChannelInfo, const std::function<void(const karabo::net::ErrorCode&)> &handler = std::function<void(const karabo::net::ErrorCode&)>())

Asynchronously connect this input channel to the output channel described by the first argument

Parameters
  • outputChannelInfo: Hash with three keys
    • ”outputChannelString”: a string matching one of the configured “connectedOutputChannels”
    • ”connectionType”: a string - currently only “tcp” supported
    • ”hostname”: a string telling which host/ip address to connect to
    • ”port”: an unsigned int telling the port
    • ”memoryLocation: string “remote” or “local” to tell whether other end is in another process or can share memory
  • handler: indicates asynchronously (like via EventLoop::post) the success of the connection request

void disconnect(const std::string &connectionString)

Disconnect and clean internals

Parameters
  • connectionString: One of the “connectedOutputChannels” given at construction

void updateOutputChannelConfiguration(const std::string &outputChannelString, const karabo::data::Hash &config = karabo::data::Hash())

Update list of output channels that can be connected

Parameters
  • outputChannelString: string that can later be used as key “outputChannelString” of Hash argument to connect
  • config: kept for backward compatibility

const std::vector<InputChannel::MetaData> &getMetaData() const

Get the current meta data for input data available on this input channel. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

Return

std::vector<unsigned int> sourceToIndices(const std::string &source) const

Return the list of indices of the data tokens (for read(index) ) for a given source identifier. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

Return
Parameters
  • source:

std::vector<unsigned int> trainIdToIndices(unsigned long long trainId) const

Return the list of indices of the data tokens (for read(index) ) for a given train id. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

Return
Parameters
  • source:

const InputChannel::MetaData &indexToMetaData(unsigned int index) const

Return the data source identifier pertinent to a data token at a given index. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

Return
Parameters
  • index:
Exceptions
  • A: KARABO_LOGIC_EXCEPTION of there is no meta data available for the given index.

Public Static Functions

void expectedParameters(karabo::data::Schema &expected)

Necessary method as part of the factory/configuration system

Parameters
  • expected: [out] Description of expected parameters for this object (Schema)

Private Functions

void disconnectImpl(const std::string &outputChannelString)

Disconnect internals - needs protection by m_outputChannelsMutex

Parameters
  • outputChannelString: One of the “connectedOutputChannels” given at construction

void prepareData()

Prepares data and metadata from the active chunk.

prepareData assumes that it has exclusive access to the m_activeChunk member variable when it’s called. Is up to the caller to guarantee that assumption.

Private Members

InputHandler m_inputHandler

Callback on available data (per InputChannel)

DataHandler m_dataHandler

Callback on available data (per item in InputChannel)

std::unordered_map<std::string, std::pair<unsigned int, std::function<void(const karabo::net::ErrorCode&)>>> m_connectionsBeingSetup

All ‘outputChannelString’ for that a connection attempt is currently ongoing, with their handlers and ids

struct Handlers
#include <InputChannel.hh>

Container for InputChannel handlers that concern data handling.

Public Functions

Handlers(const DataHandler &data, const InputHandler &eos = InputHandler())

Construct with data and end-of-stream handlers - input handler could be specified afterwards

Handlers(const InputHandler &input, const InputHandler &eos = InputHandler())

Construct with input and end-of-stream handlers - data handler could be specified afterwards

class Memory
#include <Memory.hh>

The Memory class is an internal utility for InputChannel and OutputChannel to provide static shared memory.

Public Static Functions

void read(karabo::data::Hash &data, const size_t dataIdx, const size_t channelIdx, const size_t chunkIdx)

Read the contents of a single Hash out of the cache. The passed in Hash will be cleared first.

Parameters
  • data:
  • dataIdx:
  • channelIdx:
  • chunkIdx:

Memory::DataPointer read(const size_t dataIdx, const size_t channelIdx, const size_t chunkIdx)

Read the contents of a single Hash out of the cache. A pointer tag_of a newly created Hash will be returned.

Parameters
  • dataIdx:
  • channelIdx:
  • chunkIdx:

void write(const karabo::data::Hash &data, const size_t channelIdx, const size_t chunkIdx, const MetaData &metaData, bool copyAllData = true)

Write the contents of a single Hash into the cache. The Hash will be serialized before control is returned to the caller. Note that the data of an NDArray inside the Hash will not be copied, i.e. the Memory internal buffer will point to the same memory as the NDArray, except if copyAllData = true. it is safe to mutate the Hash after writing it.

Parameters
  • data: input
  • channelIdx: where to store the serialised data
  • chunkIdx: where to store the serialised data
  • metaData: of the data
  • copyAllData: defines whether all data (incl. NDArray data) is copied into the internal buffer (default: true)

void assureAllDataIsCopied(const size_t channelIdx, const size_t chunkIdx)

Ensure that the data of given chunk is not shared with anyone else, i.e. copy data if needed.

Parameters
  • channelIdx:
  • chunkIdx:

const std::vector<Memory::MetaData> &getMetaData(const size_t channelIdx, const size_t chunkIdx)

Return a vector of MetaData objects for the data tokens in the bucket identified by channelIdx and chunkIdx.

Return
Parameters
  • channelIdx:
  • chunkIdx:

class MetaData

Inherits from Hash

Public Functions

MetaData(const std::string &source, const karabo::data::Timestamp &timestamp)

Constructor to directly set meta data entries

Parameters
  • source: an identifier of the data producer
  • timestamp: a timestamp relevant for this data token.

void setSource(const std::string &source)

Set data source, i.e. identifier of the data producer

Parameters
  • source:

const std::string &getSource() const

Get data source, i.e. identifier of the data producer

Return

void setTimestamp(const karabo::data::Timestamp &timestamp)

Set the timestamp relevant to this data token

Parameters
  • timestamp:

const karabo::data::Timestamp getTimestamp() const

Get the timestamp relevant to this data token

Return

class OutputChannel
#include <OutputChannel.hh>

An OutputChannel for passing data to pipelined processing.

The OutputChannel class is used for writing data to pipelined processing inputs. It supports tracking of meta data for each data token written to it. Specifically, it e.g. allows for keeping track of data producers, here called sources, and timing and train information. Meta data information enables aggregation of multiple data source into one output channel interaction with a remote host, as well as aggregation of multiple train-related data of the same source. A mixture of both scenarios is possible.

An example of these use cases

OutputChannel::Pointer output = ... //

Hash data1;
....
OutputChannel::MetaData meta1("THIS/IS/SOURCE/A/channel1", karabo::data::Timestamp());
output->write(data1, meta1)

Hash data2_10;
....
OutputChannel::MetaData meta2_10("THIS/IS/SOURCE/B/channel2", timestampForTrain10);
output->write(data2_10, meta2)
OutputChannel::MetaData meta2_11("THIS/IS/SOURCE/B/channel2", timestampForTrain11);
output->write(data2_11, meta2_11)

Hash data_this_source;
...
// not passing any meta data to write will default the source to [deviceId]/[channelName]
// and the timestamp to the current timestamp
output->write(data_this_source);

// now actually send over the network
output->update();

Inherits from std::enable_shared_from_this< OutputChannel >

Public Functions

OutputChannel(const karabo::data::Hash &config)

If this object is constructed using the factory/configuration system this method is called.

The initialize() method must not be called if constructed this way.

Deprecated: Tcp server initialization is triggered, but there is no control when and whether it succeeded. So better use the constructor with additional int argument (and set it to zero).

See
expectedParameters) and default-filled configuration
Parameters
  • config: Validated (

OutputChannel(const karabo::data::Hash &config, int autoInit)

Recommended constructor, allowing guaranteed-to-work initialization.

The recommended way to call it is via the Configurator and with autoInit == 0, followed by calling initialize():

Hash config(<here state=”” non-default=”” config=”” parameters>=”“>); OutputChannel::Pointer output = Configurator<OutputChannel>::create(“OutputChannel”, cfg, 0); output->initialize();

Caveat: Make sure you do not pass a ‘bool’ instead of an ‘int’ as argument to create(..) since then the other constructor is chosen and the value of the ‘bool’ determines whether to validate cfg or not.

See
expectedParameters) and default-filled configuration
Parameters
  • config: Validated (
Parameters
  • autoInit: If set to 0 (strongly recommended), the constructor does not yet try to initiate the TCP server initialization and the initialize() method has to be called as “second constructor”. The advantage is that the initialization cannot fail on busy systems and one has control when the server is available for remote connections. If autoInit != 0, this constructor behaves as the other constructor and initialize() must not be called.

void initialize()

“Second constructor”, to be called after construction with second argument autoInit == 0.

Initializes the underlying Tcp server connection and makes it available for others.

May throw a karabo::util::NetworkException, e.g. if a non-zero port was defined in the input configuration and that is not available since used by something else.

karabo::data::Hash getInitialConfiguration() const

returns the initial readonly configuration parameters

Returns a Hash containing the initial information that should not be updated via ShowConnectionHandler and ShowStatisticsHandler. Currently only the address key is included.

std::string getInstanceIdName() const

Concatenation of instance id and name

bool hasRegisteredCopyInputChannel(const std::string &instanceId) const

Check whether an InputChannel with given id is registered to receive all data

i.e. an InputChannel with “dataDistribution == copy”

Return
bool whether InputChannel of specified type is connected
Parameters

bool hasRegisteredSharedInputChannel(const std::string &instanceId) const

Check whether an InputChannel with given id is registered to receive a share of the data

i.e. an InputChannel with “dataDistribution == shared”

Return
bool whether InputChannel of specified type is connected
Parameters

void write(const karabo::data::Hash &data, const Memory::MetaData &metaData, bool = false)

Writes a Hash containing data to the output channel. Sending to the network happens when update() is called.

Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Parameters
  • data: input Hash object
  • metaData: a MetaData object containing meta data for this data token.

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

void write(const karabo::data::Hash &data, bool = false)

Writes a Hash containing data to the output channel. Sending to the network happens when update() is called. Metadata is initialized to default values. Namely the sending devices device id and the output channel’s name are used as data source.

Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Parameters
  • data: input Hash object

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

void write(const karabo::data::Hash::Pointer &data, const Memory::MetaData &metaData)

Writes a Hash containing data to the output channel. Sending to the network happens when update() is called. Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Parameters
  • data: shared pointer to input Hash object
  • metaData: a MetaData object containing meta data for this data token.

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

void write(const karabo::data::Hash::Pointer &data)

Writes a Hash containing data to the output channel. Sending to the network happens asynchronously. Metadata is initialized to default values. Namely the sending devices device id and the output channel’s name are used as data source. Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Parameters
  • data: shared pointer to input Hash object

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

void update(bool safeNDArray = false)

Update the output channel, i.e. send all data over the wire that was previously written by calling write(…). This is a synchronous method, i.e. blocks until all data is actually sent (or dropped or queued).

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

Parameters
  • safeNDArray: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are ‘safe’, i.e. their memory will not be referred to elsewhere after update is finished. Default is ‘false’, ‘true’ can avoid safety copies of NDArray content when data is queued or sent locally.

void asyncUpdate(bool safeNDArray = false, std::function<void()> &&writeDoneHandler = []() {})

Semi-asynchronously update the output channel, i.e. start asynchronous sending of data over the wire that was previously written to the output channel’s‘ buffer by calling write(…), but block as long as required to really start sending. The start of sending data is delayed

  • for any connected input channel that is currently not ready to receive more data, but is configured with “dataDistribution” as “copy” and with “onSlowness” as “wait”,
  • or if none of the connected input channels that are configured with “dataDistribution” as “shared” are currently ready to receive data and if this output channel is configured with “noInputShared” as “wait”.

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

Parameters
  • safeNDArray: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are ‘safe’, i.e. their memory will not be referred to elsewhere before ‘readyHandler’ is called. Default is ‘false’, ‘true’ can avoid safety copies of NDArray content when data is queued or sent locally.
  • writeDoneHandler: callback when data (that is not queued) has been sent and thus even NDArray data inside it can be re-used again (except if safeNDArray was set to ‘true’ in which case its memory may still be used in a queue).

void asyncUpdateNoWait(std::function<void()> &&readyForNextHandlerstd::function<void()> &&writeDoneHandlerbool safeNDArray, )

Expert method

Asynchronously update the output channel, i.e. asynchronously send all data over the wire that was previously written by calling write(…) without any blocking.

This method must not be called again before either ‘readyForNextHandler’ or ‘writeDoneHandler’ have been called. If next data should be sent, but neither handler has been called yet, one has to block or skip the data. In the latter case, the wish of a connected input channel that is configured to make the output “wait” if not ready, is ignored (policy violation).

Both handlers have to be valid function pointers.

TODO: Provide a handler called when sending data is completed, including any queued data, and thus NDArray data can be re-used again even if safeNDArray=false (i.e. buffers could be re-used).

Parameters
  • readyForNextHandler: callback when asyncUpdateNoWait may be called again (this can only be delayed if any blocking input channel [“wait”] is connected)
  • writeDoneHandler: callback when sending is finished (as confirmed by Tcp) or stopped due to disconnection, or data is internally queued. So now all NDArray inside the Hash passed to write(..) before can be re-used again (except if safeNDArray was set to ‘true’ in which case its memory may still be used in a queue)
  • safeNDArray: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are ‘safe’, i.e. their memory will not be referred to elsewhere after update is finished. False triggers a data copy if data needs to be queued.

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

void signalEndOfStream()

Synchronously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.

Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.

void asyncSignalEndOfStream(std::function<void()> &&readyHandler)

Asynchonously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.

Thread safety: All the ‘write(..)’ methods, ‘update()’/’asyncUpdate(cb)’ and ‘signalEndOfStream()’/’asyncSignalEndOfStream(cb)’ must not be called concurrently.

Parameters
  • readyHandler: callback when notification has been sent or queued

void registerSharedInputSelector(SharedInputSelector &&selector)

Register handler that selects which of the connected input channels that have dataDistribution = “shared” is to be served.

The handler will be called during update(..)/asyncUpdateNoWait with the ids of the connected “shared” input channels (e.g. “deviceId:input”) as argument. The returned channel id will receive the data. If an empty string or an unknown id is returned, the data will be dropped.

Parameters
  • selector: takes vector<string> as argument and returns string

void disable()

Shut down all underlying connections, object will not be usable afterwards.

Needed if stray shared pointers may be kept somewhere.

Public Static Functions

void expectedParameters(karabo::data::Schema &expected)

Necessary method as part of the factory/configuration system

Parameters
  • expected: [out] Description of expected parameters for this object (Schema)

Private Functions

void eraseOldChannel(InputChannels &channelContainer, const std::string &instanceId, const karabo::net::Channel::Pointer &newChannel) const

Erase instance with ‘instanceId’ from ‘channelContainer’ if existing - if same as ‘newChannel’, do not close

void pushShareNext(const std::string &instanceId)

Helper to indicate that given shared input is ready to receive more data

Requires m_registeredInputsMutex to be locked

std::string popShareNext()

Helper to provide id of shared input that is ready to receive more data

Requires m_registeredInputsMutex to be locked

bool isShareNextEmpty() const

Helper to tell whether none of the shared inputs is ready to receive more data

Requires m_registeredInputsMutex to be locked

bool hasSharedInput(const std::string &instanceId)

Helper to query whether given shared input is ready to receive more data

Requires m_registeredInputsMutex to be locked

void eraseSharedInput(const std::string &instanceId)

Helper to indicate that given shared input is currently not ready to receive more data

Requires m_registeredInputsMutex to be locked

void pushCopyNext(const std::string &instanceId)

Helper to indicate that given copy input is ready to receive more data

Requires m_registeredInputsMutex to be locked

bool eraseCopyInput(const std::string &instanceId)

Erase instance from container of copy channels that are ready to receive data

Requires m_registeredInputsMutex to be locked

Return
whether instanceId could be removed (i.e. was actually ready to receive)
Parameters
  • instanceId:

bool updateChunkId()

helper to set new m_chunkId

Return
true if new m_chunkId is valid (i.e. not equal m_invalidChunkId)

void ensureValidChunkId(std::unique_lock<std::mutex> &lockOfRegisteredInputsMutex)

helper for asyncUpdate() to ensure that at the end m_chunkId is valid - may block a while

Parameters
  • lockOfRegisteredInputsMutex: a scoped_lock locking m_registeredInputsMutex (may be unlocked and locked again during execution)

void asyncPrepareCopy(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock)

Figure out how to treat copy inputs, return via appending to reference arguments

Requires m_registeredInputsMutex to be locked

void asyncPrepareDistribute(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock, bool &queue, bool &block)

Figure out how to treat shared inputs, return via (appending to) reference arguments

Requires m_registeredInputsMutex to be locked

bool asyncPrepareDistributeEos(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock)

Figure out how to send EndOfStream for shared outputs, return via reference arguments

Requires m_registeredInputsMutex to be locked

Return
whether to queue for shared queue

void asyncPrepareDistributeSelected(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock)

Figure out how to treat shared inputs if sharedInputSelector is registered

Requires m_registeredInputsMutex to be locked

void asyncPrepareDistributeLoadBal(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock, bool &queue, bool &block)

Figure out how to treat shared inputs when load-balancing

Requires m_registeredInputsMutex to be locked

void resetSendOngoing(const std::string &instanceId)

Helper that sets the sendOngoing flag to false for given instanceId

void asyncSendOne(unsigned int chunkId, InputChannelInfo &channelInfo, std::function<void()> &&doneHandler)

Helper to asynchronously send chunk data to channel in given channelInfo

Parameters
  • chunkId: The chunk to send
  • channelInfo: Container with info about channel to send to
  • doneHandler: Callback when sending done or failed

void awaitUpdateFuture(std::future<void> &fut, const char *which)

Helper for waiting for future that in case of long delay adds a thread to unblock

Throws TimeoutException if not unblocked after two minutes.

std::string debugId() const

Provide a string identifying this output channel (useful in DEBUG logging)

class Signal

Public Functions

template <typename… Args>
void setSignature()

Use like setSignature<int, data::Hash, std::string>() to ensure that any emitted signal has to take arguments of these three types in that order.

bool registerSlot(const std::string &slotInstanceId, const std::string &slotFunction)

Register a slot to receive an emitted signal

Return
bool whether freshly registered (false means: was already registered)
Parameters
  • slotInstanceId: id of the instance of the slot
  • slotFunction: name of the slot

bool unregisterSlot(const std::string &slotInstanceId, const std::string &slotFunction = "")

Undo registration of a slot

Return
bool whether slot registration could be undone, i.e. false if slot was not registered
Parameters
  • slotInstanceId: instance id of the slot to be removed
  • slotFunction: the slot - if empty string, remove all registered slots of slotInstanceId

void setTopic(const std::string &topic)

This function allows to use a specific topic to which all messages are emitted If the setter is not called, the topic of SignalSlotable will be used NOTE: The idea is to keep a door open for a later change where each emit will use a topic identical to the signal name. In that case the setter can just be removed.

Parameters
  • topic: The topic name

class SignalSlotable
#include <SignalSlotable.hh>

The SignalSlotable class. This class implements the so-called “Signal-Slot” design pattern orginally termed by the Qt-Gui framework. However, signals and slots are not restricted to a local application but can be connected and triggered across the network. This allows for programming with network components in the same intuitive (event-driven) way as Qt allows to do with its local components (e.g. widgets).

Moreover does this implementation (unlike Qt) not require any pre-processing. Another additional feature is the ability to setup new signals and/or slots at runtime.

Furthermore, this class implements functions for the common request/response patterns.

For a full documentation of the signal-slot component see the documentation in the software-guide.

Inherits from std::enable_shared_from_this< SignalSlotable >

Subclassed by karabo::core::Device, karabo::core::DeviceServer

Public Types

typedef std::function<void()> AsyncErrorHandler

An AsyncErrorHandler takes no argument, but it will be called such that it can rethrow and then catch exceptions. The caught exception indicates the failure reason, e.g.:

void asyncErrorHandler () { try { throw; } catch (std::exception& e) { // or any other exception type - or several catch statements KARABO_LOG_FRAMEWORK_WARN << “Probem when trying to do something: ” << e.what(); } }

Public Functions

SignalSlotable()

This constructor does nothing. Call init() afterwards for setting up.

SignalSlotable(const std::string &instanceId, const karabo::net::Broker::Pointer &connection, const int heartbeatInterval = 30, const karabo::data::Hash &instanceInfo = karabo::data::Hash())

Creates a functional SignalSlotable object using an existing connection.

Don’t call init() afterwards.

Parameters
  • instanceId: The future instanceId of this object in the distributed system
  • connection: An existing broker connection
  • heartbeatInterval: The interval (in s) in which a heartbeat is emitted
  • instanceInfo: A hash containing any important additional information

SignalSlotable(const std::string &instanceId, const karabo::data::Hash &brokerConfiguration = karabo::data::Hash(), const int heartbeatInterval = 30, const karabo::data::Hash &instanceInfo = karabo::data::Hash())

Creates a function SignalSlotable object allowing to configure the broker connection.

Don’t call init() afterwards.

Parameters
  • instanceId: The future instanceId of this object in the distributed system
  • brokerConfiguration: A single keyed Hash where the key is the broker type and the Hash at that key is the configuration for the respective broker type (a given instanceId it will be replaced by the first constructor argument). Can be empty or can contain an empty Hash() at the single key, i.e. will be expanded from defaults.
  • heartbeatInterval: The interval (in s) in which a heartbeat is emitted
  • instanceInfo: A hash containing any important additional information

void init(const std::string &instanceId, const karabo::net::Broker::Pointer &connection, const int heartbeatInterval, const karabo::data::Hash &instanceInfo, bool consumeBroadcasts = true)

Initializes the SignalSlotable object (only use in conjunction with empty constructor).

Parameters
  • instanceId: The future instanceId of this object in the distributed system
  • connection: An existing broker connection
  • heartbeatInterval: The interval (in s) in which a heartbeat is emitted
  • instanceInfo: A hash containing any important additional information
  • consumeBroadcasts: if true (default), receive messages addressed to everybody (i.e. to ‘*’) on its own. If false, some other mechanism has to ensure to deliver these.

void start()

This function starts the communication.

After a call to this non-blocking function the object starts listening to messages. The uniqueness of the instanceId is validated (throws SignalSlotException if not unique) and if successful the object registers with a call to “slotInstanceNew” to the distributed system.

bool eraseTrackedInstance(const std::string &instanceId)

Erase instance from container of tracked instances

To be called if one is tracking instances and is sure that the given instance is not alive anymore (e.g. if another instance in the same process is dead as well). If erroneously called, the next arriving heartbeat of the instance will trigger an instanceNew event

Return
whether instanceId was tracked before
Parameters
  • instanceId: that shall be treated as not alive anymore

std::vector<string> getAvailableSignals(const std::string &instanceId, int timeout = 100)

This is a synchronous call with timeout in milliseconds return vector of device signals.

Return
vector of device’s signal names
Parameters
  • instanceId: of the device
  • timeout: in milliseconds

std::vector<string> getAvailableSlots(const std::string &instanceId, int timeout = 100)

This is a synchronous call with timeout in milliseconds return vector of device slots.

Return
vector of device’s slot names
Parameters
  • instanceId: of the device
  • timeout: in milliseconds

const std::string &getUserName() const

Retrieves currently logged in username (empty if not logged in)

Return
string username

const std::string &getInstanceId() const

Access to the identification of the current instance using signals and slots

Return
instanceId

void updateInstanceInfo(const karabo::data::Hash &update, bool remove = false)

Update and publish the instanceInfo

Parameters
  • update: a Hash containing new or updated keys - or keys to remove
  • remove: if false (default), merge ‘update’ to existing instance info, otherwise subtract it

const SignalSlotable::SlotInstancePointer &getSenderInfo(const std::string &slotFunction)

This function must only be called within a slotFunctions body. It returns the current object handling the callback which provides more information on the sender.

Return
instance of a Slot object (handler object for this callback)
Parameters
  • slotFunction: The string-ified name of the slotFunction you are currently in

bool connect(const std::string &signalInstanceId, const std::string &signalSignature, const std::string &slotInstanceId, const std::string &slotSignature)

This function tries to establish synchronously a connection between a signal and a slot, identified both by their respective instance IDs and signatures. Moreover, this SignalSlotable obeys (throughout its full lifetime or until “disconnect” is called with the same arguments) the responsibility to keep this connection alive, i.e. to reconnect if either signal or slot instance come back after they have shutdown or if they come up the first time.

Return
whether connection is already successfully established
Parameters
  • signalInstanceId: is the instance ID of the signal (if empty use this instance)
  • signalSignature: is the signature of the signal
  • slotInstanceId: is the instance ID of the slot (if empty use this instance)
  • slotSignature: is the signature of the slot

bool connect(const std::string &signal, const std::string &slot)

This function tries to establish a connection between a signal and a slot as “connect” with four arguments does, so see there for more details. If signal or slot instance IDs are not specified, they are interpreted as local and automatically assigned a “self” instanceId

Return
whether connection is already succesfully established
Parameters
  • signal: <signalInstanceId>:<signalSignature>
  • slot: <slotInstanceId>:<slotSignature>

void asyncConnect(const std::string &signalInstanceId, const std::string &signalSignature, const std::string &slotInstanceId, const std::string &slotSignature, const std::function<void()> &successHandler = std::function<void()>()const AsyncErrorHandler &failureHandler = AsyncErrorHandler(), int timeout = 0, )

This function tries to establish asynchronously a connection between a signal and a slot, identified both by their respective instance IDs and signatures. Moreover, this SignalSlotable obeys (throughout its full lifetime or until “disconnect” is called with the same arguments) the responsibility to keep this connection alive, i.e. to reconnect if either signal or slot instance come back after they have shutdown or if they come up the first time.

Parameters
  • signalInstanceId: is the instance ID of the signal (if empty use this instance)
  • signalSignature: is the signature of the signal
  • slotInstanceId: is the instance ID of the slot (if empty use this instance)
  • slotSignature: is the signature of the slot
  • successHandler: is called when connection is established (maybe be empty [=default])
  • failureHandler: is called when connection could not be established, in the same way as an Requestor::AsyncErrorHandler - if Signal or Slot do not exist, the exception is a SignalSlotException
  • timeout: in milliseconds for internal async requests - non-positive (default) means the very long default timeout

void asyncConnect(const std::vector<SignalSlotConnection> &signalSlotConnections, const std::function<void()> &successHandler = std::function<void()>()const AsyncErrorHandler &failureHandler = AsyncErrorHandler(), int timeout = 0, )

This function tries to establish asynchronously a connection between several signals and slots.

One of the two handlers will be called exactly once. The failureHandler will be called if any signal slot connection failed, no matter whether other connections succeeded or not.

Parameters
  • signalSlotConnections: e.g. vector<SignalSlotConnection>{SignalSlotConnection(“sigInst”, “signal”, “slotInst”, “slot”), …}
  • successHandler: is called when all connections are established (maybe be empty [=default])
  • failureHandler: is called when any of the connections could not be established, no matter whether the others failed or not, in the same way as a Requestor::AsyncErrorHandler.
  • timeout: in milliseconds for internal async requests - non-positive (default) means the very long default timeout

bool disconnect(const std::string &signalInstanceId, const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction)

Disconnects a slot from a signal, identified both by their respective instance IDs and signatures. In case the connection was established by this instance, also erase it from the list of connections that have to re-established in case signal or slot instances come back after a shutdown.

Return
whether connection is successfully stopped, e.g. false if there was no such connection or if remote signal instance ID did not confirm in time
Parameters
  • signalInstanceId: is the instance ID of the signal (if empty use this instance)
  • signalSignature: is the signature of the signal
  • slotInstanceId: is the instance ID of the slot (if empty use this instance)
  • slotSignature: is the signature of the slot

void asyncDisconnect(const std::string &signalInstanceId, const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction, const std::function<void()> &successHandler = std::function<void()>()const AsyncErrorHandler &failureHandler = AsyncErrorHandler(), int timeout = 0, )

This function tries to disconnect a previously established connection between a signal and a slot. These two are identified both by their respective instance IDs and signatures. In case the connection was established by this instance, the function also erases it from the list of connections that have to be re-established in case signal or slot instances come back after a shutdown.

Parameters
  • signalInstanceId: is the instance ID of the signal (if empty use this instance)
  • signalSignature: is the signature of the signal
  • slotInstanceId: is the instance ID of the slot (if empty use this instance)
  • slotSignature: is the signature of the slot
  • successHandler: is called when connection is successfully stopped (maybe be empty [=default])
  • failureHandler: is called when the disconnection failed (maybe be empty [=default])
  • timeout: in milliseconds for internal async requests - non-positive (default) means the very long default timeout

template <typename… Args>
void emit(const std::string &signalFunction, const Args&... args) const

Emits a signal, i.e. publishes the given payload Emitting a signal is a fire-and-forget activity. The function returns immediately.

Parameters
  • signalFunction: The name of the previously registered signal
  • args...: A variadic number of arguments to be published

template <typename… Args>
void call(const std::string &instanceId, const std::string &functionName, const Args&... args) const

Calls a (remote) function. Calling a remote function is a fire-and-forget activity. The function returns immediately after sending the message.

Parameters
  • instanceId: Instance to be called
  • functionName: Function on instance to be called (must be a registered slot)
  • args: Arguments with which to call the slot

template <typename… Args>
void reply(const Args&... args)

Place the reply of a slot call

To be used inside a method registered as a slot. The reply is not directly sent, but it is registered to be sent once all methods registered to the slot (usually only one) have finished execution. So if called several times in a slot, the last call defines the actual reply.

If this method is not called inside a slot, an “empty” reply will be send without arguments. But note that Device::updateState(const State s, …) implicitly calls reply(s.name()).

See about AsyncReply to avoid blocking the thread in case reply values are known only later, e.g. after some IO operations.

Parameters
  • args: 0 to 4 objects of the types known to serialisation, e.g. float, vector<long long>, Hash,…

void registerSlot(const std::function<void()> &slotconst std::string &funcName, )

Register a new slot function for a slot. A new slot is generated if so necessary. It is checked that the signature of the new slot is the same as an already registered one.

InputChannel::Pointer createInputChannel(const std::string &channelName, const karabo::data::Hash &config, const DataHandler &onDataAvailableHandler = DataHandler(), const InputHandler &onInputAvailableHandler = InputHandler(), const InputHandler &onEndOfStreamEventHandler = InputHandler(), const InputChannel::ConnectionTracker &connectTracker = InputChannel::ConnectionTracker())

Create and register an InputChannel together with handlers

Return
the created InputChannel - better do not store it anywhere, it can be received via getInputChannel(channelName)
Parameters
  • channelName: name of the channel, e.g. its path in the schema
  • config: is a Hash with a Hash at ‘channelName’ which will be passed to InputChannel::create
  • onDataAvailableHandler: is a DataHandler called for each data item coming through the pipeline
  • onInputAvailableHandler: is an InputHandler called when new data arrives - user has to loop over all items
  • onEndOfStreamEventHandler: is an InputHandler called when EOS is received
  • connectTracker: will be called whenever the connection status of the created channel changes

bool removeInputChannel(const std::string &channelName)

Remove the InputChannel created via createInputChannel

Return
true if such a channel existed and could be removed
Parameters
  • channelName: identifies the channel (first argument that was given to createInputChannel)

OutputChannel::Pointer createOutputChannel(const std::string &channelName, const karabo::data::Hash &config, const OutputHandler &onOutputPossibleHandler = OutputHandler())

Create an OutputChannel under the given name

If there is already one for than name, that one (and thus all other copies of its shared_ptr) will be disabled to disconnect any connection.

Return
pointer to created channel - do not store anywhere! If needed, retrieve again via getOutputChannel(channelName).
Parameters
  • channelName: the name for the channel
  • config: must have a Hash at key channelName - that is passed (after removeal of the “schema” key) to Configurator<OutputChannel>::create
  • onOutputPossibleHandler: ?

bool removeOutputChannel(const std::string &channelName)

Remove the OutputChannel created via createOutputChannel

Before removal, it (and thus all other copies of its shared_ptr) will be disabled to disconnect any connection.

Return
true if such a channel existed and could be removed
Parameters
  • channelName: identifies the channel (first argument that was given to createOutputChannel)

OutputChannel::Pointer getOutputChannel(const std::string &name)

Access pointer to OutputChannel with given name. Throws ParameterException if no such output channel.

Return
OutpuChannel::Pointer
Parameters
  • name: of output channel (e.g. path in expectedParameters)

OutputChannel::Pointer getOutputChannelNoThrow(const std::string &name)

Access pointer to OutputChannel with given name.

Return
OutputChannel::Pointer - empty if no channel of that name
Parameters
  • name: of output channel (e.g. path in expectedParameters)

InputChannel::Pointer getInputChannel(const std::string &name)

Access pointer to InputChannel with given name. Throws ParameterException if no such input channel.

Return
InputChannel::Pointer

InputChannel::Pointer getInputChannelNoThrow(const std::string &name)

Access pointer to InputChannel with given name.

Return
InputChannel::Pointer - empty if no channel of that name
Parameters
  • name: of input channel (e.g. path in expectedParameters)

void connectInputChannel(const InputChannel::Pointer &channel, int trails = 8)

Deprecated, use asyncConnectInputChannel!

Connects an input channel to those as defined on the input channel’s configuration. The function is asynchronous, but gives no feedback about success or failure.

void asyncConnectInputChannel(const InputChannel::Pointer &channel, const std::function<void(bool)> &handler, const std::vector<std::string> &outputChannelsToIgnore = std::vector<std::string>(), )

Connect input channel to output channels defined in its configuration.

Proper asynchronous implementation with feedback handler

Parameters
  • channel: pointer to InputChannel
  • handler: to report success or failure. In the latter case the argument is false and more information about the failure can be retrieved via try { throw; } catch (const std::exception&e) { const std::string reason(e.what());} in the same way as in SignalSlotable::AsyncErrorHandler
  • outputChannelsToIgnore: outputChannels that shall not be connected, e.g. because they are already connected (defaults to empty vector)

void connectInputChannels(const boost::system::error_code &e)

Trigger connection of all not connected input channel connections

Re-triggers itself regularly via internal m_channelConnectTimer

Parameters
  • e: do nothing if evaluates to true

Protected Functions

string fetchInstanceId(const std::string &signalOrSlotId) const

Parses out the instanceId part of signalId or slotId

Return
A string representing the instanceId
Parameters
  • signalOrSlotId:

void registerBroadcastHandler(std::function<void(const karabo::data::Hash::Pointer &header, const karabo::data::Hash::Pointer &body)> handler)

Register a handler to be called for every received message that is addressed to everybody. NOTE: This is not thread safe - call before SignalSlotable::start starts receiving messages.

Parameters
  • handler: with header and body (as Hash::Pointer) of the message

Private Functions

template <typename… Args>
SignalSlotable::SignalInstancePointer addSignalIfNew(const std::string &signalFunction)

Helper for registerSignal: If signalFunction is not yet known, creates a signal corresponding to the template argument signature and adds it to the internal container. Otherwise an empty pointer is returned.

Return
pointer to new Signal or empty pointer
Parameters
  • signalFunction:

void ensureInstanceIdIsValid(const std::string &instanceId)

If instanceId has invalid characters, throws SignalSlotException.

void ensureInstanceIdIsUnique(const std::string &instanceId)

If instanceId not unique in system, throws SignalSlotException.

std::tuple<karabo::data::Hash::Pointer, std::string, bool> registerAsyncReply()

Internal method to provide info for AsyncReply object

Return
tuple of slot header, slot name and whether it is a global slot call

void reconnectSignals(const std::string &newInstanceId)

Calls connect for all signal-slot connections that involve ‘newInstanceId’ (be it on signal or slot side) and for which this instance is responsible, i.e. its “connect” has been called for this connection before (with or without immediate success). Calling “disconnect” stops this responsibility.

void registerForShortcutMessaging()

Register myself for short-cut messaging (i.e. bypass broker if in same process). Must not be called before instance ID is checked to be unique in overall system.

void deregisterFromShortcutMessaging()

Deregister myself from short-cut messaging.

void registerNewSlot(const std::string &funcName, SlotInstancePointer instance)

Register a new slot instance under name funcName. This will raise an error if the slot already exists.

void slotConnectToSignal(const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction)

Register signal-slot connection on signal side.

void slotSubscribeRemoteSignal(const std::string &signalInstanceId, const std::string &signalFunction)

Slot to subscribe to remote signal.

void slotUnsubscribeRemoteSignal(const std::string &signalInstanceId, const std::string &signalFunction)

Slot to un-subscribe from remote signal.

bool instanceHasSlot(const std::string &slotInstanceId, const std::string &unmangledSlotFunction)

True if instance with ID ‘slotInstanceId’ has slot ‘slotFunction’. Internally uses “slotHasSlot” for remote instances, but shortcuts if ID is the own one. Always true if ‘slotInstanceId == “*”’ (i.e. global slot).

void slotHasSlot(const std::string &unmangledSlotFunction)

Slot to tell whether instance has a slot of given name.

void handleInputConnected(bool success, const std::string &channel, const std::shared_ptr<std::mutex> &mut, const std::shared_ptr<std::vector<karabo::net::AsyncStatus>> &status, size_t i, size_t numOutputsToIgnore)

helper for connectInputChannels()

bool tryToUnregisterSlot(const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction)

Try to undo registration of a slot “slotInstanceId.slotFunction”. Thread-safe, locks m_signalSlotInstancesMutex.

Return
bool true if signal existed and given slot was registered before
Parameters
  • signalFunction: name of local signal
  • slotInstanceId: instance id that carries the slot
  • slotFunction: the slot - if empty, all registered slots of slotInstanceId

void addReceiveAsyncErrorHandles(const std::string &replyId, const std::shared_ptr<boost::asio::steady_timer> &timer, const AsyncErrorHandler &errorHandler)

For the given replyId of a ‘request.receiveAsync’, register error handling, i.e. the timer for timeout and the handler for remote exceptions.

Private Static Functions

void callErrorHandler(const AsyncErrorHandler &handler, const std::string &message)

Helper that calls ‘handler’ such that it can do

try { throw; } catch (const SignalSlotException &e) { <action>}

Parameters
  • message: text given to the SignalSlotException

class AsyncReply
#include <SignalSlotable.hh>

A functor to place an asynchronous reply during slot execution.

  • Create only inside a slot call of a SignalSlotable
  • Can be copied around as wished
  • Call either the operator or the error(..) method exactly once for one of the copies.
  • Call these methods from another context/thread, i.e. between creation and use of its methods you have to leave the thread at least once)
  • Must not be used once the SignalSlotable object that created it is (being) destructed (e.g. protect by bind_weak to member functions of the SignalSlotable)

Public Functions

AsyncReply(SignalSlotable *signalSlotable)

Construct functor for an asynchronous reply. Create only within a slot call of a SignalSlotable

Parameters
  • signalSlotable: pointer to the SignalSlotable whose slot is currently executed (usually: this)

template <typename… Args>
void operator()(const Args&... args) const

Place the reply - almost like using SignalSlotable::reply in the synchronous case. The difference is that here the reply is immediately sent and cannot be overwritten by a following call.

Parameters
  • args: 0-4 objects of the types known to serialisation, e.g. float, vector<long long>, Hash,…

void error(const std::string &message, const std::string &details = std::string()) const

If a proper reply cannot be placed, please use this to reply an error

Parameters
  • message: is the short text for the RemoteException
  • details: further details, usually an exception trace (e.g. data::Exception::detailedMsg()), default is empty for backward compatibility reasons

class Requestor

Private Functions

std::pair<karabo::data::Hash::Pointer, karabo::data::Hash::Pointer> receiveResponseHashes()

Receives the reply for the current request, returning shared pointers to the reply’s header and body hashes.

Return
A pair with a shared pointer to the header hash of the reply in the first position and a shared pointer to the body in the second.

void registerErrorHandler(const AsyncErrorHandler &errorHandler)

Register handler for errors in async requests, e.g. timeout or remote exception.

void getSignalInstanceId(const karabo::data::Hash::Pointer &header, std::string &result)

Extracts the value of the SignalInstanceId path in a response header hash.

Parameters
  • header: response header hash from where the value will be extracted
  • result: the string that will be assigned the value of the SignalInstanceId path

struct SignalSlotConnection
#include <SignalSlotable.hh>

A structure to keep information of a signal-slot connection.

class Slot

Subclassed by karabo::xms::SlotN< Ret, Args >

Public Functions

karabo::data::Hash::Pointer getHeaderOfSender() const

Return message header that triggered calling this slot Valid while callRegisteredSlotFunctions is processed.

class SLOT_ELEMENT

Inherits from karabo::xms::SlotElementBase< SLOT_ELEMENT >

template <class A1>
class SLOT_ELEMENT1

Inherits from karabo::xms::SlotElementBase< SLOT_ELEMENT1< A1 > >

template <class Derived>
class SlotElementBase

Inherits from karabo::data::GenericElement< Derived >

Public Functions

Derived &allowedStates(const std::vector<karabo::data::State> &value)

The allowedStates method serves for setting up allowed states for the element

Return
reference to the Element (to allow method’s chaining)
Parameters
  • states: A string describing list of possible states.
  • sep: A separator symbol used for parsing previous argument for list of states

template <typename Ret, typename… Args>
class SlotN

Inherits from karabo::xms::Slot

Public Functions

virtual void doCallRegisteredSlotFunctions(const karabo::data::Hash &body)

To be called under protection of m_registeredSlotFunctionsMutex

Parameters
  • body: a Hash with up to four keys “a1” to “a4” with the expected types behind

class Statics
#include <Statics.hh>

The Statics class.

Public Functions

Statics()

Default constructor.

virtual ~Statics()

Destructor.

The karabo::util Namespace

namespace karabo::util

Namespace for package packageName

Typedefs

using karabo::util::InfluxResultSet = typedef std::pair< std::vector<std::string>, std::vector<std::vector<boost::optional<std::string> >> >
using karabo::util::JsonType = typedef std::optional<nlohmann::json::value_t>

Functions

template <class T>
void addPointerToHash(Hash &hash, const std::string &path, T *const &value, const Dims &dims, const char separator = Hash::k_defaultSep)
template <class T>
void getPointerFromHash(const Hash &hash, const std::string &path, T *&value, Dims &dims, const char separator = data::Hash::k_defaultSep)
void setDims(Hash &hash, const std::string &path, const Dims &dims, const char separator = Hash::k_defaultSep)
data::Epochstamp stringDoubleToEpochstamp(const std::string &timestampAsDouble)

Convert an std::string that represents a double of the seconds since Unix epoch to an Epochstamp

void getLeaves(const data::Hash &configuration, const data::Schema &schema, std::vector<std::string> &result, const char separator)
void getLeaves_r(const data::Hash &hash, const data::Schema &schema, std::vector<std::string> &result, std::string prefix, const char separator, const bool fullPaths)
void parseSingleJsonResult(const nl::json &respObj, InfluxResultSet &influxResult, const std::string &columnPrefixToRemove)
void jsonResultsToInfluxResultSet(const std::string &jsonResult, InfluxResultSet &influxResult, const std::string &columnPrefixToRemove)

Utility function to convert a string into an InfluxResultSet

One or multiple concatenated JSON objects containing the results of an InfluxDB query are decoded and filled into a InfluxResultSet object.

Parameters
  • jsonResult: : the string containing the JSON object(s)
  • influxResult: : the result
  • columnPrefixToRemove: : remove this prefix from the column names. InfluxQL selector functions (e.g. SAMPLE) are prepended to the column name. Use this argument to remove said prefixes.
Exceptions
  • karabo::util::NotSupportedException: in case the column mismatch nlohmann::json exceptions in case of malformatted JSON objects.

boost::optional<std::string> jsonValueAsString(nl::json value)
std::string toInfluxDurationUnit(const TIME_UNITS &karaboDurationUnit)
std::string epochAsMicrosecString(const data::Epochstamp &ep)
void getLeaves(const karabo::data::Hash &configuration, const karabo::data::Schema &schema, std::vector<std::string> &result, const char separator = karabo::data::Hash::k_defaultSep)
void getLeaves_r(const karabo::data::Hash &hash, const karabo::data::Schema &schema, std::vector<std::string> &result, std::string prefix, const char separator, const bool fullPaths)
std::string toInfluxDurationUnit(const karabo::data::TIME_UNITS &karaboDurationUnit)
std::string epochAsMicrosecString(const karabo::data::Epochstamp &ep)
boost::optional<std::string> jsonValueAsString(nlohmann::json value)

Utility function to convert a json object.

Return
a boost::optional<std::string> a null boost::optional<std::string> matches a null JSON value
Parameters
  • value:

void processJson(const nlohmann::json &j, Hash &result)
void processJsonObject(nlohmann::json::const_iterator &it, Hash &result)
void processJsonArray(nlohmann::json::const_iterator &it, Hash &result)
void processJsonValue(nlohmann::json::const_iterator &it, Hash &result)
JsonType getArrayType(const nlohmann::json &j)
karabo::data::Hash jsonToHash(const std::string &j)

Converts a JSON string representation into a Hash object.

This function parses a JSON string and constructs a corresponding Hash object representing the JSON structure. The JSON string is expected to follow the JSON standard format.

JSON Arrays of Mixed types are unsupported.

JSON types are mapped to C++ types as below:

  • JSON Integer -> long long
  • JSON Decimal -> double
  • JSON empty array -> empty std::vector<std::string>

Return
A Hash object representing the parsed JSON structure.
Parameters
  • jsonString: A string containing the JSON representation to be converted.
Exceptions
  • KARABO_PARAMETER_EXCEPTION: if the provided JSON string is invalid or cannot be parsed into a Hash object.

Hash generateAutoStartHash(const karabo::data::Hash &initHash)

Generates an auto-start configuration Hash based on the provided initialization Hash.

This function takes an initialization Hash representing the initial configuration of components and constructs an auto-start configuration Hash based on it.

The initialization Hash is expected to have been generated from the JSON for the init string used to autostart devices on a karabo cpp server.

An example conversion should look like this:

initHash: ‘data_logger_manager_1’ + ‘classId’ => DataLoggerManager STRING ‘serverList’ => karabo/dataLogger STRING ‘schema_printer1’ + ‘classId’ => SchemaPrinter STRING

Above transforms to autoStart hash: ‘autoStart’ @ [0] ‘DataLoggerManager’ + ‘deviceId’ => data_logger_manager_1 STRING ‘serverList’ => karabo/dataLogger STRING [1] ‘SchemaPrinter’ + ‘deviceId’ => schema_printer1 STRING

Return
A Hash object representing the auto-start configuration based on the provided initialization Hash.
Parameters
  • initHash: The initialization Hash containing the initial configuration of components.

karabo::data::Hash generateAutoStartHash(const karabo::data::Hash &initHash)

Generates an auto-start configuration Hash based on the provided initialization Hash.

This function takes an initialization Hash representing the initial configuration of components and constructs an auto-start configuration Hash based on it.

The initialization Hash is expected to have been generated from the JSON for the init string used to autostart devices on a karabo cpp server.

An example conversion should look like this:

initHash: ‘data_logger_manager_1’ + ‘classId’ => DataLoggerManager STRING ‘serverList’ => karabo/dataLogger STRING ‘schema_printer1’ + ‘classId’ => SchemaPrinter STRING

Above transforms to autoStart hash: ‘autoStart’ @ [0] ‘DataLoggerManager’ + ‘deviceId’ => data_logger_manager_1 STRING ‘serverList’ => karabo/dataLogger STRING [1] ‘SchemaPrinter’ + ‘deviceId’ => schema_printer1 STRING

Return
A Hash object representing the auto-start configuration based on the provided initialization Hash.
Parameters
  • initHash: The initialization Hash containing the initial configuration of components.

template <typename Ret, typename… Args, typename Obj>
std::function<Ret(Args...)> karabo::util::exec_weak_impl(Ret(Obj::*)(Args...) const f, const Obj * o)

Provides a wrapper with the same signature as f, but securing shared ownership of an object of type Obj before execution of f. Class Obj needs to derive somewhere in its inheritance tree from enable_shared_from_this();

Return
a wrapped version of f.
Parameters
  • f: a const member function, can have any argument types and any return value
  • o: a pointer to an object that has a member function f

template <typename Ret, typename… Args, typename Obj>
std::function<Ret(Args...)> karabo::util::exec_weak_impl(Ret(Obj::*)(Args...) f, Obj * o)

Provides a wrapper with the same signature as f, but securing shared ownership of an object of type Obj before execution of f. Class Obj needs to derive somewhere in its inheritance tree from enable_shared_from_this();

Return
a wrapped version of f.
Parameters
  • f: a non-const member function, can have any argument types and any return value
  • o: a pointer to an object that has a member function f

template <typename F, typename Obj, typename… P>
auto bind_weak(const F &f, Obj *const o, const P... p)

Weakly binds member function f to an object of class Obj, but assures shared ownership of the object while f is executed. This means that during the lifetime of calling f, the object cannot be destroyed, but destruction is not blocked if f is not being executed but only bound. Class Obj needs to derive from std::enable_shared_from_this and the object pointer o has to be held by a shared_ptr. This means that you cannot use bind_weak within the constructor of Obj nor for objects constructed on the stack. Note that f may have any default constructable return type: If the bound functor will be called when the object is already destroyed, the functor returns a default constructed object of the return type.

Below is an example of how to bind to a boost::asio interface.

void Device::executeStepFunction(int arg, const boost::system::error_code& error) { …. m_timer.async_wait(bind_weak(&Device::executeStepFunction, this, arg + 1, _1)); …. }

Return
: bound functor, compatible with boost bind.
Parameters
  • f: function to bind, give as &Foo::bar
  • o: pointer to object to bind to
  • p: parameters as one would give to std::bind. Placeholders are fully supported.

template <typename F, typename Tuple>
void call(F f, Tuple &&t)

Call a function f with arguments unpacked from a std::tuple

Parameters
  • f:
  • t:

template <class… Ts>
void unpack(const Hash &hash, Ts&... args)

Unpack the hash (typically coming from the network) into the parameters given by reference.

template <typename… Args>
auto unpack(const karabo::data::Hash &h)

Unpack parameters into a tuple holding only references

Return
std::tuple<Args&…>
Parameters
  • h: Hash with keys a1, a2, etc. encoding function arguments

template <class… Ts>
void pack(Hash &hash, const Ts&... args)

Pack the parameters into a hash for transport over the network.

Parameters
  • hash: Will be filled with keys a1, a2, etc. and associated values
  • args: Any type and number of arguments to associated to hash keys

std::ostream &operator<<(std::ostream &os, const TimeProfiler &profiler)
bool operator==(const karabo::util::Version &v1, const karabo::util::Version &v2)
bool operator>(const karabo::util::Version &v1, const karabo::util::Version &v2)
bool operator!=(const karabo::util::Version &v1, const karabo::util::Version &v2)
bool operator>=(const karabo::util::Version &v1, const karabo::util::Version &v2)
bool operator<(const karabo::util::Version &v1, const karabo::util::Version &v2)
bool operator<=(const karabo::util::Version &v1, const karabo::util::Version &v2)

Variables

char const *const INFLUX_DURATION_UNIT = "u"
unsigned int const INFLUX_PRECISION_FACTOR = 1000000
char const *const DATALOGMANAGER_ID = "Karabo_DataLoggerManager_0"
char const *const DATALOGGER_PREFIX = "DataLogger-"
char const *const DATALOGREADER_PREFIX = "DataLogReader-"
char const *const DATALOG_LINE_REGEX = "^([TZ0-9\\.]+)\\|([0-9\\.]+)\\|([0-9]+)\\|(.+)\\|([A-Z][0-9A-Z_]+)\\|(.*)\\|([a-z0-9_]*)\\|([A-Z]+)$"
char const *const DATALOG_LOGOUT_REGEX = "^([TZ0-9\\.]+)\\|([0-9\\.]+)\\|([0-9]+)\\|\\.\\|(![\\s\\S])\\|(.*)\\|([a-z0-9_]*)\\|([A-Z]+)$"
char const *const DATALOG_INDEX_LINE_REGEX = "^([A-Z=\\+\\-]+)[\\s]+([TZ0-9\\.]+)[\\s]+([0-9\\.]+)[\\s]+(.+)$"
char const *const DATALOG_INDEX_TAIL_REGEX = "^([0-9]+)[\\s]+([0-9]+)[\\s]+([a-z0-9_\\.]*)[\\s]+([0-9]+)$"
char const *const DATALOG_NEWLINE_MANGLE = ".KRB_NEWLINE."
const unsigned int MAX_INFLUX_VALUE_LENGTH = 921600u
int key = 0
template <typename From, typename To, typename = void>
struct can_static_cast

Inherits from false_type

template <typename From, typename To>
template<>
struct can_static_cast<From, To, std::void_t<decltype(static_cast<To>(std::declval<From>()))>>

Inherits from true_type

template <typename Base, typename Derived>
struct is_virtual_base_of

Inherits from std::conjunction< std::is_base_of< Base, Derived >, std::negation< can_static_cast< Base *, Derived *> > >

struct MetaData
#include <DataLogUtils.hh>

A structure defining meta data as used by the data loggers

struct MetaSearchResult
#include <DataLogUtils.hh>

A structure defining meta data as used by the data logger’s search results

class PluginLoader
#include <PluginLoader.hh>

The PluginLoader class.

template <typename, typename>
struct static_or_dyn_cast
#include <MetaTools.hh>

Helper functions to compile-time distinguish if a dynamic_cast is needed.

class TimeProfiler

Public Functions

TimeProfiler(const std::string &name)

Constructor creates a profiler with a given name

Parameters
  • name: profiler’s name

const std::string &getName() const

Returns the profiler name

Return
std::string profiler’s name

void open()

Initialize the profiler internal structure

void close()

Finalize the profiler internal structure

void startPeriod()

Start a new unnamed period (i.e. detail) and append it to the current open period. Unnamed periods are leaves, thus do cover other sub-periods.

void startPeriod(const std::string &periodname)

Start an new period with the given name. Named periods can be nested, i.e. named periods can cover other named and anonymous periods

Parameters
  • periodname: period’s name

void stopPeriod()

Close the last open period

void stopPeriod(const std::string &periodname)

Stops period “periodname” and all nested periods

Parameters
  • periodname: period’s name

const TimePeriod getPeriod(const std::string &periodname) const

Return the time period period “periodname” as Hash

Return
TimePeriod object
Parameters
  • periodname: period’s name

const TimePeriod getPeriod() const

Returns the overall profiler period, i.e. from open to close.

Return
TimePeriod object

const Hash &getPeriodAsHash(const std::string &periodname) const

Return the time period period “periodname” as Hash

Return
Hash object
Parameters
  • periodname: period’s name

const Hash &getPeriodAsHash() const

Returns the overall profiler period.

Return
Hash object

operator karabo::data::Hash()

Serialize the profiler into Hash object.

std::string format(const std::string &fmt, int level = std::numeric_limits<int>::max()) const

Serialize the profiler into string using specific time format.

Return
string object holding the string representation
Parameters
  • format: time format
  • level: deepest level

std::string sql() const

Serialize the profiler as SQL insert query, in preparation to be inserted into database.

Return
string object holding the SQL query string

void serialize(std::ostream &os, int level = std::numeric_limits<int>::max()) const

Serialize the profiler into ostream object using default time format, i.e X.Y (where X is total seconds, and Y is fraction in nanoseconds)

Parameters
  • os: output stream object
  • format: time format
  • level: deepest level

class Version
#include <Version.hh>

A class providing versioning information for the Karabo framework.

Public Functions

Version(const std::string &version)

Creates an Version object from a string.

The version string should match a Major.Minor.Patch flavor Alpha, Beta, Release Candidates and Post-releases should be labeled following the PEP440 guidelines.

Parameters
  • version:

Public Static Functions

const Version &getKaraboVersion()

Gets a Version object of the curent Karabo’s Framework

Return
Version object

std::string getVersion()

Returns a string describing the current version of the Framework Equivalent of calling. karabo::util::Version::getKaraboVersion().getString();

Return
std::string

namespace detail

Functions

void unpack_r(const Hash &hash, char i)
template <class Tfirst, class… Trest>
void unpack_r(const Hash &hash, char i, Tfirst &first, Trest&... rest)
void pack_r(Hash &hash, char i)
template <class Tfirst, class… Trest>
void pack_r(Hash &hash, char i, const Tfirst &first, const Trest&... rest)