C++ API¶
The karabo::core Namespace¶
-
namespace
karabo
::
core
¶ Namespace for package core
Enums
-
enum
Capabilities
¶ Values:
-
PROVIDES_SCENES
= (1u << 0)¶
-
PROVIDES_MACROS
= (1u << 1)¶
-
PROVIDES_INTERFACES
= (1u << 2)¶
-
Functions
-
const std::string karabo::core::DEFAULT_CONFIG_MANAGER_ID("KaraboConfigurationManager")
Variables
-
const int
kMaxCompleteInitializationAttempts
= 2500¶
- template <typename T>
-
class
BaseWorker
¶ - #include <Worker.hh>
WorkerBase class contains one queue: request and can start auxiliary thread that will run on opposite end of the queue: Main thread Queue Auxiliary thread Methods push(…) > request > receive(…)
Public Functions
-
BaseWorker
(const std::function<void()> &callbackint timeout = -1, int repetition = -1, )¶ Construct worker with callback and time and repetition parameters
- Parameters
callback
: this function will be called periodicallytimeout
: time in milliseconds auxiliary thread is waiting on the request queue; 0 means nowait mode; ,-1 means waiting foreverrepetition
: -1 means cycling forever; >0 means number of cycles.
-
BaseWorker &
set
(const std::function<void()> &callbackint timeout = -1, int repetition = -1, )¶ Set parameters defining the behavior of the worker
- Parameters
callback
: function to be called will boolean parameter signaling repetition counter expirationtimeout
: timeout for receiving from queuerepetition
: repetition counter
-
BaseWorker &
setTimeout
(int timeout = -1)¶ Set parameters defining the behavior of the worker
- Parameters
timeout
: timeout for receiving from queue
-
BaseWorker &
setRepetition
(int repetition = -1)¶ Set parameters defining the behavior of the worker
- Parameters
repetition
: repetition counter
-
BaseWorker &
start
()¶ Starts auxiliary thread that works on far ends of the queues Default settings are “waiting forever” and “repeat forever”
-
BaseWorker &
stop
()¶ Stop thread activity. If “request” queue still has some entries they will be received before thread exits. After requesting a stop the new entries can not be put (just ignored) into request queue.
-
BaseWorker &
abort
()¶ This function stops thread immediately despite the fact like nonempty queue.
-
void
join
()¶ It will block until the auxiliary thread is joined.
-
void
push
(const T &t)¶ Call this function from the main thread to put new data block to the request queue
- Parameters
t
: data of type T to put into the request queue
Private Functions
-
void
run
()¶ Receive data block of type T from request queue. The behavior is defined by two parameters: m_timeout and m_repetition. In case of successful receiving the m_callback function will be called.
-
-
class
Device
¶ - #include <Device.hh>
all Karabo devices derive from this class
The Device class is the base class for all Karabo devices. It provides for a standardized set of basic properties that all devices share and provides interaction with its configuration and properties.
Devices are defined by their expected parameters; a list of properties and commands that are known to the distributed system at static time. These parameters describe a devices Schema, which in turn describes the possible configurations of the device.
Inherits from karabo::xms::SignalSlotable
Subclassed by karabo::devices::DataLogger, karabo::devices::DataLoggerManager, karabo::devices::DataLogReader, karabo::devices::GuiServerDevice, karabo::devices::PropertyTest
Public Functions
-
Device
(const karabo::data::Hash &configuration)¶ Construct a device with a given configuration. The configuration Hash may contain any of the following entries:
serverId: a string representing the server this device is running on. If not given the device assumes to run in stand-alone mode.
deviceId: a string representing this device’s id, part of the unique identifier in the distributed system. If not given it defaults to none.
- Parameters
configuration
:
-
~Device
()¶ The destructor will reset the DeviceClient attached to this device.
-
void
registerInitialFunction
(const std::function<void()> &func)¶ Register a function to be called after construction
Can be called by each class of an inheritance chain. Functions will be called in order of registration.
-
void
finalizeInternalInitialization
(const karabo::net::Broker::Pointer &connection, bool consumeBroadcasts, const std::string &timeServerId)¶ This method is called to finalize initialization of a device. It is needed to allow user code to hook in after the base device constructor, but before the device is fully initialized.
- Parameters
connection
: The broker connection for the device.consumeBroadcasts
: If false, do not listen directly to broadcast messages (addressed to ‘*’). Whoever sets this to false has to ensure that broadcast messages reach the Device in some other way, otherwise the device will not work correctly.timeServerId
: The id of the time server to be used by the device - usually set by the DeviceServer.
-
DeviceClient &
remote
()¶ This function allows to communicate to other (remote) devices. Any device contains also a controller for other devices (DeviceClient) which is returned by this function.
- Return
- DeviceClient instance
- template <class ValueType>
-
void
set
(const std::string &key, const ValueType &value)¶ Updates the state/properties of the device. This function automatically notifies any observers in the distributed system.
- Parameters
key
: A valid parameter of the device (must be defined in the expectedParameters function)value
: The corresponding value (of corresponding data-type)
- template <class ItemType>
-
void
setVectorUpdate
(const std::string &key, const std::vector<ItemType> &updates, VectorUpdate updateType, const karabo::data::Timestamp ×tamp)¶ Concurrency safe update of vector property
Does not work for Hash (i.e. table element) due to Hash equality just checking similarity. Removing might be unreliable for VECTOR_FLOAT or VECTOR_DOUBLE due to floating point equality issues.
- Parameters
key
: of the vector property to updateupdates
: items to remove from property vector (starting at the front) or to add (at the end)updateType
: indicates update type - applied individually to all items in ‘updates’timestamp
: timestamp to assign to updated vector property (e.g. getTimestamp())
- template <class ValueType>
-
void
set
(const std::string &key, const ValueType &value, const karabo::data::Timestamp ×tamp)¶ Updates the state of the device. This function automatically notifies any observers in the distributed system.
Any updates are validated against the device schema and rejected if they are not appropriate for the current device state or are of wrong type. During validation alarm bounds are evaluated and alarms on properties will be raised if alarm conditions are met. Additionally, the distributed system is notified of these alarms.
- Parameters
key
: A valid parameter of the device (must be defined in the expectedParameters function)value
: The corresponding value (of corresponding data-type)timestamp
: The time of the value change
-
void
writeChannel
(const std::string &channelName, const karabo::data::Hash &data)¶ Writes a hash to the specified channel. The hash internally must follow exactly the data schema as defined in the expected parameters.
If ‘data’ contains an ‘NDArray’, consider to use the overload of this method that has the ‘safeNDArray’ flag. If that flag can be set to ‘true’, data copies can be avoided:
writeChannel(channelName, data, getActualTimestamp(), true);
Thread safety: The ‘writeChannel(..)’ methods and ‘signalEndOfStream(..)’ must not be called concurrently for the same ‘channelName’.
- Parameters
channelName
: The output channel namedata
: Hash with the data
-
void
writeChannel
(const std::string &channelName, const karabo::data::Hash &data, const karabo::data::Timestamp ×tamp, bool safeNDArray = false)¶ Writes a hash to the specified channel. The hash internally must follow exactly the data schema as defined in the expected parameters.
Thread safety: The ‘writeChannel(..)’ methods and ‘signalEndOfStream(..)’ must not be called concurrently for the same ‘channelName’.
- Parameters
channelName
: The output channel namedata
: Hash with the datatimestamp
: A user provided timestamp (if e.g. retrieved from h/w)safeNDArray
: Boolean that should be set to ‘true’ if ‘data’ contains any ‘NDArray’ and their data is not changed after this ‘writeChannel’. Otherwise, data will be copied if needed, i.e. when the output channel has to queue or serves inner-process receivers.
-
void
signalEndOfStream
(const std::string &channelName)¶ Signals an end-of-stream event (EOS) on the output channel identified by channelName Thread safety: The ‘writeChannel(..)’ methods and ‘signalEndOfStream(..)’ must not be called concurrently for the same ‘channelName’.
- Parameters
channelName
: the name of the output channel.
-
void
set
(const karabo::data::Hash &hash)¶ Updates the state/properties of the device with all key/value pairs given in the hash.
Any updates are validated against the device schema and rejected if they are not appropriate for the current device state or are of wrong type. During validation alarm bounds are evaluated and alarms on properties will be raised if alarm conditions are met. Additionally, the distributed system is notified of these alarms. For those paths in ‘hash’ which do not already have time stamp attributes assigned as tested by Timestamp::hashAttributesContainTimeInformation(hash.getAttributes(<path>))), the actual timestamp is chosen.
NOTE: This function will automatically and efficiently (only one message) inform any observers.
- Parameters
hash
: Hash of updated internal parameters (must be in current full schema, e.g. since declared in the expectedParameters function)
-
void
set
(const karabo::data::Hash &hash, const karabo::data::Timestamp ×tamp)¶ Updates the state of the device with all key/value pairs given in the hash
Any updates are validated against the device schema and rejected if they are not appropriate for the current device state or are of wrong type. During validation alarm bounds are evaluated and alarms on properties will be raised if alarm conditions are met. Additionally, the distributed system is notified of these alarms.
NOTE: This function will automatically and efficiently (only one message) inform any observers.
- Parameters
hash
: Hash of updated internal parameters (must be in current full schema, e.g. since declared in the expectedParameters function)timestamp
: to indicate when the set occurred - but is ignored for paths in ‘hash’ that already have time stamp attributes as tested by Timestamp::hashAttributesContainTimeInformation(hash.getAttributes(<path>)))
- template <class ValueType>
-
void
setNoValidate
(const std::string &key, const ValueType &value)¶ Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.
- Parameters
key
: identifying the property to updatevalue
: updated value
- template <class ValueType>
-
void
setNoValidate
(const std::string &key, const ValueType &value, const karabo::data::Timestamp ×tamp)¶ Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.
- Parameters
key
: identifying the property to updatevalue
: updated valuetimestamp
: optional timestamp to indicate when the set occurred.
-
void
setNoValidate
(const karabo::data::Hash &hash)¶ Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.
NOTE: This function will automatically and efficiently (only one message) inform any observers.
- Parameters
config
: Hash of updated internal parameters (must be declared in the expectedParameters function)
-
void
setNoValidate
(const karabo::data::Hash &hash, const karabo::data::Timestamp ×tamp)¶ Updates the state of the device with all key/value pairs given in the hash. In contrast to the set function, no validation is performed.
NOTE: This function will automatically and efficiently (only one message) inform any observers.
- Parameters
config
: Hash of updated internal parameters (must be declared in the expectedParameters function)timestamp
: optional timestamp to indicate when the set occurred.
- template <class T>
-
T
get
(const std::string &key) const¶ Retrieves the current value of any device parameter (that was defined in the expectedParameters function)
- Return
- value of the requested parameter
- Parameters
key
: A valid parameter of the device (must be defined in the expectedParameters function)
- template <class T>
-
T
getAs
(const std::string &key) const¶ Retrieves the current value of any device parameter (that was defined in the expectedParameters function) The value is casted on the fly into the desired type. NOTE: This function is considerably slower than the simple get() functionality
- Return
- value of the requested parameter
- Parameters
key
: A valid parameter of the device (must be defined in the expectedParameters function)
-
karabo::data::Schema
getFullSchema
() const¶ Retrieves all expected parameters of this device
- Return
- Schema object containing all expected parameters
-
void
appendSchema
(const karabo::data::Schema &schema, const bool = false)¶ Append a schema to the existing device schema
- Parameters
schema
: to be appended - may also contain existing elements to overwrite their attributes like min/max values/sizes, alarm ranges, etc. If it contains Input-/OutputChannels, they are (re-)created. If previously an InputChannel existed under the same key, its data/input/endOfStream handlers are kept for the recreated InputChannel.unused
: parameter, kept for backward compatibility.
-
void
updateSchema
(const karabo::data::Schema &schema, const bool = false)¶ Replace existing schema descriptions by static (hard coded in expectedParameters) part and add additional (dynamic) descriptions. Previous additions will be removed.
- Parameters
schema
: additional, dynamic schema - may also contain existing elements to overwrite their attributes like min/max values/sizes, alarm ranges, etc. If it contains Input-/OutputChannels, they are (re-)created (and previously added ones removed). If previously an InputChannel existed under the same key, its data/input/endOfStream handlers are kept for the recreated InputChannel.unused
: parameter, kept for backward compatibility.
- template <class AliasType>
-
AliasType
getAliasFromKey
(const std::string &key) const¶ Converts a device parameter key into its aliased key (must be defined in the expectedParameters function)
- Return
- Aliased representation of the parameter
- Parameters
key
: A valid parameter of the device (must be defined in the expectedParameters function)
- template <class AliasType>
-
std::string
getKeyFromAlias
(const AliasType &alias) const¶ Converts a device parameter alias into the original key (must be defined in the expectedParameters function)
- Return
- The original name of the parameter
- Parameters
key
: A valid parameter-alias of the device (must be defined in the expectedParameters function)
- template <class T>
-
const bool
aliasHasKey
(const T &alias) const¶ Checks if the argument is a valid alias of some key, i.e. defined in the expectedParameters function
- Return
- true if it is an alias found in one of three containers of parameters: “reconfigurable”, “initial” or “monitored”, otherwise false
- Parameters
alias
: Arbitrary argument of arbitrary type
-
bool
keyHasAlias
(const std::string &key) const¶ Checks if some alias is defined for the given key
- Return
- true if the alias exists
- Parameters
key
: in expectedParameters mapping
-
karabo::data::Types::ReferenceType
getValueType
(const std::string &key) const¶ Checks the type of any device parameter (that was defined in the expectedParameters function)
- Return
- The enumerated internal reference type of the value
- Parameters
key
: A valid parameter of the device (must be defined in the expectedParameters function)
-
karabo::data::Hash
getCurrentConfiguration
(const std::string &tags = "") const¶ Retrieves the current configuration. If no argument is given, all parameters (those described in the expected parameters section) are returned. A subset of parameters can be retrieved by specifying one or more tags.
- Return
- A Hash containing the current value of the selected configuration
- Parameters
tags
: The tags (separated by comma) the parameter must carry to be retrieved
-
karabo::data::Hash
getCurrentConfigurationSlice
(const std::vector<std::string> &paths) const¶ Retrieves a slice of the current configuration.
- Return
- Hash with the current values and attributes (e.g. timestamp) of the selected configuration
- Parameters
paths
: of the configuration which should be returned (as declared in expectedParameters) (method throws ParameterExcepton if a non-existing path is given)
-
karabo::data::Hash
filterByTags
(const karabo::data::Hash &hash, const std::string &tags) const¶ Return a tag filtered version of the input Hash. Tags are as defined in the device schema
- Return
- a filtered version of the input Hash.
- Parameters
hash
: to filtertags
: to filter by
-
const std::string &
getServerId
() const¶ Return the serverId of the server this device is running on
- Return
-
const karabo::data::State
getState
()¶ Return a State object holding the current unified state of the device.
- Return
-
void
updateState
(const karabo::data::State ¤tState)¶ Update the state of the device, using “actual timestamp”.
Will also update the instanceInfo describing this device instance (if new or old State are ERROR).
- Parameters
currentState
: the state to update to
-
void
updateState
(const karabo::data::State ¤tState, const karabo::data::Hash &other)¶ Update the state of the device, using “actual timestamp”.
Will also update the instanceInfo describing this device instance (if new or old State are ERROR).
- Parameters
currentState
: the state to update toother
: a Hash to set other properties in the same state update message, time stamp attributes to its paths have precedence over the actual timestamp
-
void
updateState
(const karabo::data::State ¤tState, const karabo::data::Timestamp ×tamp)¶ Update the state of the device, using given timestamp.
Will also update the instanceInfo describing this device instance (if new or old State are ERROR).
- Parameters
currentState
: the state to update totimestamp
: time stamp to assign to the state property and the properties in ‘other’ (if the latter do not have specified timestamp attributes)
-
void
updateState
(const karabo::data::State ¤tState, karabo::data::Hash other, const karabo::data::Timestamp ×tamp)¶ Update the state of the device, using given timestamp.
Will also update the instanceInfo describing this device instance (if new or old State are ERROR).
- Parameters
currentState
: the state to update toother
: a Hash to set other properties in the same state update message, time stamp attributes to its paths have precedence over the given ‘timestamp’timestamp
: time stamp to assign to the state property and the properties in ‘other’ (if the latter do not have specified timestamp attributes)
-
void
execute
(const std::string &command) const¶ Execute a command on this device
- Parameters
command
:
- template <class A1>
-
void
execute
(const std::string &command, const A1 &a1) const¶ Execute a command with one argument on this device
- Parameters
command
:a1
:
- template <class A1, class A2>
-
void
execute
(const std::string &command, const A1 &a1, const A2 &a2) const¶ Execute a command with two arguments on this device
- Parameters
command
:a1
:a2
:
- template <class A1, class A2, class A3>
-
void
execute
(const std::string &command, const A1 &a1, const A2 &a2, const A3 &a3) const¶ Execute a command with three arguments on this device
- Parameters
command
:a1
:a2
:a3
:
- template <class A1, class A2, class A3, class A4>
-
void
execute
(const std::string &command, const A1 &a1, const A2 &a2, const A3 &a3, const A4 &a4) const¶ Execute a command with four arguments on this device
- Parameters
command
:a1
:a2
:a3
:a4
:
-
karabo::data::AlarmCondition
getAlarmCondition
() const¶ Get the current alarm condition the device is in
- Return
-
void
setAlarmCondition
(const karabo::data::AlarmCondition &condition, bool needsAcknowledging = false, const std::string &description = std::string())¶ Set the global alarm condition
- Parameters
condition
: to setneedsAcknowledging
: if this condition will require acknowledgment on the alarm servicedescription
: an optional description of the condition. Consider including remarks on how to resolve
-
const karabo::data::AlarmCondition &
getAlarmCondition
(const std::string &key, const std::string &sep = ".") const¶ Get the alarm condition for a specific property
- Return
- the alarm condition of the property
- Parameters
key
: of the property to get the condition forsep
: optional separator to use in the key path
-
void
slotTimeTick
(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)¶ A slot called by the device server if the external time ticks update to synchronize this device with the timing system.
- Parameters
id
: current train idsec
: current system secondsfrac
: current fractional secondsperiod
: interval between subsequent ids in microseconds
-
virtual void
onTimeTick
(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)¶ A hook which is called if the device receives external time-server update, i.e. if slotTimeTick on the device server is called. Can be overwritten by derived classes.
- Parameters
id
: train idsec
: unix secondsfrac
: fractional seconds (i.e. attoseconds)period
: interval between ids in microseconds
-
virtual void
onTimeUpdate
(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)¶ If the device receives time-server updates via slotTimeTick, this hook will be called for every id in sequential order. The time stamps (sec + frac) of subsequent ids might be identical - though they are usually spaced by period. Can be overwritten in derived classes.
- Parameters
id
: train idsec
: unix secondsfrac
: fractional seconds (i.e. attoseconds)period
: interval between ids in microseconds
-
void
appendSchemaMaxSize
(const std::string &path, unsigned int value, bool emitFlag = true)¶ Append Schema to change/set maximum size information for path - if paths does not exist, throw exception
This is similar to the more general appendSchema, but dedicated to a common use case.
Caveat: This does not recreate an output channel if its schema is changed
- Parameters
path
: indicates the parameter which should be a Vector- or TableElementvalue
: is the new maximum size of the parameteremitFlag
: indicates if others should be informed about this Schema update. If this method is called for a bunch of paths, it is recommended to set this to true only for the last call.
Public Static Functions
-
void
expectedParameters
(karabo::data::Schema &expected)¶ The expected parameter section of the Device class, known at static time. The basic parameters described here are available for all devices, many of them being expert or admin visible only.
- Parameters
expected
: a Schema to which these parameters will be appended.
Protected Functions
-
karabo::data::Timestamp
getActualTimestamp
() const¶ Returns the actual timestamp. The Trainstamp part of Timestamp is extrapolated from the last values received via slotTimeTick (or zero if no time ticks received yet). To receive time ticks, the server of the device has to be connected to a time server.
- Return
- the actual timestamp
-
karabo::data::Timestamp
getTimestamp
(const karabo::data::Epochstamp &epoch) const¶ Returns the Timestamp for given Epochstamp. The Trainstamp part of Timestamp is extrapolated forward or backward from the last values received via slotTimeTick (or zero if no time ticks received yet). To receive time ticks, the server of the device has to be connected to a time server.
- Return
- the matching timestamp, consisting of epoch and the corresponding Trainstamp
- Parameters
epoch
: for that the time stamp is searched for
Private Functions
-
void
setNoLock
(const karabo::data::Hash &hash, const karabo::data::Timestamp ×tamp)¶ Internal method for set(Hash, Timestamp), requiring m_objectStateChangeMutex to be locked
-
void
setNoValidateNoLock
(const karabo::data::Hash &hash, const karabo::data::Timestamp ×tamp)¶ Internal version of setNoValidate(hash, timestamp) that requires m_objectStateChangeMutex to be locked
-
void
initChannels
(const karabo::data::Schema &schema, const std::string &topLevel = "")¶ Called to setup pipeline channels, will recursively go through the given schema, assuming it to be at least a part of the schema of the device. Needs to be called with m_objectStateChangeMutex being locked.
- Parameters
schema
: the schema to traverse
- Parameters
topLevel
: std::string: empty or existing path of full
- schema of the device
-
void
prepareOutputChannel
(const std::string &path)¶ Create OutputChannel for given path and take care to set handlers needed Needs to be called with m_objectStateChangeMutex being locked.
- Parameters
path
:
-
void
prepareInputChannel
(const std::string &path)¶ Create InputChannel for given path and take care to set handlers needed Needs to be called with m_objectStateChangeMutex being locked.
- Parameters
path
:
-
void
slotCallGuard
(const std::string &slotName, const std::string &callee)¶ This function is called by SignalSlotable to verify if a slot may be called from remote. The function only checks slots that are mentioned in the expectedParameter section (“DeviceSlots”) The following checks are performed:
- Parameters
slotName
: name of the slotcallee
: the calling remote, can be unknown
1) Is this device locked by another device? If the lockedBy field is non-empty and does not match the callee’s instance id, the call will be rejected
2) Is the slot callable from the current state, i.e. is the current state specified as an allowed state for the slot.
-
void
slotClearLock
()¶ Clear any lock on this device
-
karabo::data::Hash
getTimeInfo
()¶ Internal method to retrieve time information of this device.
-
void
slotGetTime
(const karabo::data::Hash&)¶ Returns the actual time information of this device.
This slot reply is a Hash with the following keys:
time
and its attributes provide an actual timestamp with train Id information.timeServerId
the id of the time server configured for the DeviceServer of this device; “None” when there’s no time server configured.reference
and its attributes provide the latest timestamp information received from the timeserver.
- Parameters
info
: an unused (at least for now) Hash parameter that has been added to fit the generic slot call (Hash in, Hash out) of the GUI server/client protocol.
-
void
slotGetSystemInfo
(const karabo::data::Hash&)¶ Returns the actual system information of this device.
This slot reply is a Hash with the following keys:
user
the actual user running this devicebroker
the current connected broker node
- Parameters
info
: an unused (at least for now) Hash parameter that has been added to fit the generic slot call (Hash in, Hash out) of the GUI server/client protocol.
and all keys provided by
slotGetTime
.
Private Members
-
karabo::data::Validator
m_validatorIntern
¶ Validators to validate…
-
karabo::data::Validator
m_validatorExtern
¶ …internal updates via ‘Device::set’
-
std::shared_ptr<DeviceClient>
m_deviceClient
¶ …external updates via ‘Device::slotReconfigure’
-
-
class
DeviceClient
¶ - #include <DeviceClient.hh>
This class can be used to (remotely) control devices of the distributed system Synchronous calls (i.e. get()) are in fact asynchronous under the hood.
The Karabo DeviceClient provides a high-level interface for common calls to (remote) devices in the distributed system. In principle functionality implemented in the DeviceClient can be fully implemented in the Device using low level SignalSlotable calls alone, but device developers are discouraged from this approach, especially if synchronous behavior is acceptable or even desired.
In the context of a Device the DeviceClient is available using the Device::remote() function; it then shares the SignalSlotable instance of the device, e.g. there is no instantiation overhead.
Inherits from std::enable_shared_from_this< DeviceClient >
Public Functions
-
DeviceClient
(const std::string &instanceId = std::string(), bool implicitInit = true, const karabo::data::Hash &serviceDeviceIds = karabo::data::Hash())¶ Constructor which establishes an own connection to the communication system. This constructor is intended for stand-alone C++ device clients. Once we care about authentication, this has to be added here.
- Parameters
instanceId
: The id with which the client should participate in the system. If not unique or invalid, constructor will throw an exception. If empty (i.e. default), an id will be generated from host name and process id.implicitInit
: If true (default for backward compatibility - but NOT recommended!), the constructor will implicitly try to trigger a call to initialize() via the event loop. Since this can fail silently, it is strongly recommended to use implicitInit = false and call the initialize() method right after the constructor.serviceDeviceIds
: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.
Constructor using instantiated signalSlotable object (shared communication - take care that the signalSlotable is kept alive since the DeviceClient will only keep a weak pointer)
- Parameters
signalSlotable
: An instance of the SignalSlotable lassimplicitInit
: If true (default for backward compatibility - but NOT recommended!), the constructor will implicitly try to trigger a call to initialize() via the event loop. Since this can fail silently, it is strongly recommended to use implicitInit = false and call the initialize() method right after the constructor.serviceDeviceIds
: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.
-
DeviceClient
(const std::string &instanceId, const karabo::data::Hash &serviceDeviceIds)¶ Constructor aimed at cases where a specific DataLoggerManagerId is required. Requires an explicit call to DeviceClient::initialize() after the construction takes place.
- Parameters
instanceId
: The id with which the client should participate in the system. If not unique or invalid, constructor will throw an exception. If empty, an id will be generated from host name and process id.serviceDeviceIds
: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.
Constructor using instantiated signalSlotable object (shared communication - take care that the signalSlotable is kept alive since the DeviceClient will only keep a weak pointer) and aimed at cases where a specific DataLoggerManagerId is required. Requires an explicit call to DeviceClient::initialize() after the construction takes place.
- Parameters
signalSlotable
: An instance of the SignalSlotable lassserviceDeviceIds
: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported.
-
void
initialize
()¶ Second constructor. It is strongly recommended to use the constructors with implicitInit = false and explicitely call initialize() after the construction.
-
const std::string &
getInstanceId
()¶ InstanceId of underlying communication object (i.e. SignalSlotable)
-
void
setInternalTimeout
(const unsigned int internalTimeout)¶ Sets the internal timeout for any request/response like communications
- Parameters
internalTimeout
: The default timeout in ms
-
int
getInternalTimeout
() const¶ Retrieves the currently set internal timeout
- Return
- default timeout in ms
-
void
setAgeing
(bool toggle)¶ Set ageing on or off (on by default)
- Return
-
void
setDeviceMonitorInterval
(long int milliseconds)¶ Set interval to wait between subsequent (for the same instance) calls to handlers registered via registerDeviceMonitor. Changes received within that interval will be cached and, in case of several updates of the same property within the interval, only the most up-to-date value will be handled. If negative, switch off caching and call handler immediately.
-
std::pair<bool, std::string>
exists
(const std::string &instanceId)¶ Allows asking whether an instance is online in the current distributed system
- Return
- Parameters
boolean
: indicating whether existing and hostname if exists
-
void
enableInstanceTracking
()¶ Enables tracking of new and departing device instances
The handlers registered with registerInstance[New|Gone|Updated]Monitor will be called accordingly. If the handler for instanceNew is registered before calling this method, it will be called for each device currently in the system.
NOTE: Use wisely! There is a performance cost to tracking all devices since it means subscribing to the heartbeats of all servers and devices in the system.
-
Hash
getSystemInformation
()¶ Returns the full information about the current (runtime) distributed system
- Return
- a Hash containing the full system description
-
Hash
getSystemTopology
()¶ Returns only the topology of the current system (no instance configurations or descriptions)
- Return
- Hash containing the topology of the runtime system
-
std::vector<std::string>
getServers
()¶ Retrieves all servers currently existing in the distributed system
- Return
- array of server ids
-
std::vector<std::string>
getClasses
(const std::string &deviceServer)¶ Retrieves all device classes (plugins) available on a given device server
- Return
- array of device classes
- Parameters
deviceServer
: device server id
-
std::vector<std::string>
getDevices
(const std::string &deviceServer)¶ Retrieves all devices (instances) available on a given device server
- Return
- array of device instanceIds
- Parameters
deviceServer
: device server id
-
std::vector<std::string>
getDevices
()¶ Retrieves all devices in the distributed system.
- Return
- array of device instanceIds
-
karabo::data::Schema
getDeviceSchema
(const std::string &instanceId)¶ Retrieves the full Schema (parameter description) of the given instance;
- Return
- full Schema
- Parameters
instanceId
: Device’s instance ID
-
karabo::data::Schema
getDeviceSchemaNoWait
(const std::string &instanceId)¶ Retrieves the full Schema (parameter description) of the given instance The call is non-blocking, if no Schema is currently available the return will be empty. However, the schema request will be sent and should lead to later arrival of a schema.
- Return
- full Schema
- Parameters
instanceId
: Device’s instance ID
-
karabo::data::Schema
getActiveSchema
(const std::string &instanceId)¶ Retrieves the currently active Schema (filtered by allowed states and allowed roles) of the given instance
- Return
- active Schema
- Parameters
instanceId
: Device’s instance ID
-
karabo::data::Schema
getClassSchema
(const std::string &serverId, const std::string &classId)¶ Retrieves a schema from static context of a loaded Device class plug-in. This schema represents a description of parameters possible to configure for instantiation. I.e. returns in fact a description of the constructor arguments to that device class.
- Return
- Schema describing parameters available at instantiation time
- Parameters
serverId
: instanceId of a deviceServerclassId
: name of loaded class on the deviceServer (classId)
-
karabo::data::Schema
getClassSchemaNoWait
(const std::string &serverId, const std::string &classId)¶ Retrieves a schema from static context of a loaded Device class plug-in. This schema represents a description of parameters possible to configure for instantiation. This function can be used to pre-cache a schema for later usage. It returns an empty schema.
- Return
- an empty schem
- Parameters
serverId
: instanceId of a deviceServerclassId
: name of loaded class on the deviceServer (classId)
-
std::vector<std::string>
getProperties
(const std::string &deviceId)¶ Retrieve the properties of a device at deviceId.
- Return
- a vector containing the property paths of the device
- Parameters
deviceId
: of the device to request information from
-
std::vector<std::string>
getClassProperties
(const std::string &serverId, const std::string &classId)¶ Retrieve the properties of a class loaded on a server
- Return
- vector containing the property paths of the class
- Parameters
serverId
: server to request information fromclassId
: of the class
-
std::vector<std::string>
getCurrentlyExecutableCommands
(const std::string &instanceId)¶ Retrieve a list of commands that may be currently executed on a device in the distributed system. Available commands are determined by device state and access rights.
- Return
- a vector containing the slot names of the commands that can be executed
- Parameters
instanceId
: of the device to ask for available commands
-
std::vector<std::string>
getCurrentlySettableProperties
(const std::string &instanceId)¶ Retrieve a list of properties that may be currently altered on a device in the distributed system. Available properties are determined by device state and access rights.
- Return
- a vector containing the slot names of the properties that can be altered.
- Parameters
instanceId
: of the device to ask for settable properties
-
Hash
loadConfigurationFromFile
(const std::string &filename)¶ Load a device configuration from a file
- Return
- a Hash containing the configuration
- Parameters
filename
:
-
std::pair<bool, std::string>
instantiate
(const std::string &serverInstanceId, const std::string &classId, const karabo::data::Hash &configuration = karabo::data::Hash(), int timeoutInSeconds = -1)¶ Attempt to instantiate a device of the specified class, on a remote server with a given initial configuration
- Return
- (ok, reply) pair where ok is true if no exception occurred and reply is the answer received from server
- Parameters
serverInstanceId
: of the server to instantiate the device on. Needs to have the device plugin availableclassId
: of the device to be instantiateconfiguration
: Hash which contains the initial device configuration. It must have one out of the two following forms: option 1:- key “classId” pointing to a string, option 2:
- no classId specified: class id to be instantiated is taken from classId parameter option 3 (for backward compatibility - not recommended):
- a single key (e.g. “myClassId”) representing the classId
- the value for this key is a Hash with all the non-default properties
timeoutInSeconds
: by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device hasn’t been instantiated.
-
std::pair<bool, std::string>
instantiate
(const std::string &serverInstanceId, const karabo::data::Hash &configuration, int timeoutInSeconds = -1)¶ Instantiate a device on a remote server
- Return
- Parameters
serverInstanceId
: of the server to instantiate the device on. Needs to have the device plugin availableconfiguration
: Hash which contains the initial device configuration. The ‘classId’ attribute must be present.timeoutInSeconds
: by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device hasn’t been instantiated.
-
Hash
formatConfigToInstantiate
(const std::string &classId, const karabo::data::Hash &configuration)¶ Utility method that takes care of adding classId to configuration of device to be instantiated by instantiate and instantiateNoWait. If configuration does not have ‘classId’ key, this is added, with the value of classId parameter. Otherwise the configuration ‘classId’ value is used. In the latter case, if the value of classId parameter mismatches the one of ‘classId’ attribute of configuration a warning is thrown.
- Return
- configuration ready to be sent to device server
- Parameters
classId
: of the device to be instantiated.configuration
: of the device to be instantiated.
-
void
instantiateNoWait
(const std::string &serverInstanceId, const std::string &classId, const karabo::data::Hash &configuration = karabo::data::Hash())¶ Instantiate a device on a remote server. In contrast to DeviceClient::instantiate, this function returns immediately.
- Parameters
serverInstanceId
: of the server to instantiate the device on. Needs to have the device plugin availableclassId
: of the device to be instantiateconfiguration
: Hash which contains the initial device configuration. It must have one out of the two following forms: option 1:- key “classId” pointing to a string, option 2:
- no classId specified: class id to be instantiated is taken from classId parameter option 3 (for backward compatibility - not recommended):
- a single key (e.g. “myClassId”) representing the classId
- the value for this key is a Hash with all the non-default properties
-
void
instantiateNoWait
(const std::string &serverInstanceId, const karabo::data::Hash &configuration)¶ Instantiate a device on a remote server. In contrast to DeviceClient::instantiate, this function returns immediately.
- Return
- Parameters
serverInstanceId
: of the server to instantiate the device on. Needs to have the device plugin availableconfiguration
: Hash which contains the initial device configuration. The ‘classId’ attribute must be present.
-
std::pair<bool, std::string>
killDevice
(const std::string &deviceId, int timeoutInSeconds = -1)¶ Kill a device in the distributed system and wait until it is actually dead
- Return
- Parameters
deviceId
: of the device to killtimeoutInSeconds
: timeoutInSeconds by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device hasn’t been killed.
-
void
killDeviceNoWait
(const std::string &deviceId)¶ Kill a device in the distributed system and return immediately
- Return
- Parameters
deviceId
: of the device to kill
-
std::pair<bool, std::string>
killServer
(const std::string &serverId, int timeoutInSeconds = -1)¶ Kill a device server in the distributed system and all its associated devices. Waits til the server is dead.
- Return
- Parameters
serverId
: of the server to killtimeoutInSeconds
: timeoutInSeconds timeoutInSeconds by default set to -1, which means block indefinitely, if a positive value an Exception is thrown if the device server hasn’t been killed.
-
void
killServerNoWait
(const std::string &serverId)¶ Kill a device server in the distributed system and all its associated devices. Returns immediately.
- Return
- Parameters
serverId
: of the server to kill
-
karabo::data::Hash
get
(const std::string &instanceId)¶ Return the configuration Hash of an instance. The configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.
- Return
- a Hash holding the instance configuration
- Parameters
instanceId
: for which to return the configuration of
-
void
get
(const std::string &instanceId, karabo::data::Hash &hash)¶ Return the configuration Hash of an instance. The configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.
- Parameters
instanceId
: for which to return the configuration ofhash
: reference to write configuration into
-
karabo::data::Hash
getConfigurationNoWait
(const std::string &deviceId)¶ Return the cached configuration if it is still valid, otherwise query an updated version but return an empty Hash.
- Return
- a Hash holding the instance configuration
- Parameters
deviceId
: for which to return the configuration of
-
bool
hasAttribute
(const std::string &instanceId, const std::string &key, const std::string &attribute, const char keySep = data::Hash::k_defaultSep)¶ Check if an attribute exists for a property on a given instance
- Return
- a boolean indicating if the attribute is present
- Parameters
instanceId
: to check onkey
: path to the property to check if it has a given attributeattribute
: to check forkeySep
: path separator
- template <class T>
-
T
get
(const std::string &instanceId, const std::string &key, const char keySep = data::Hash::k_defaultSep)¶ Return a property from a remote instance. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.
- Return
- the current property value on the remote device TypeException if the templated type does not match the property type.
- Parameters
instanceId
: to retrieve the property fromkey
: identifying the propertykeySep
: path separator
- template <class T>
-
void
get
(const std::string &instanceId, const std::string &key, T &value, const char keySep = data::Hash::k_defaultSep)¶ Return a property from a remote instance. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.
- Parameters
instanceId
: to retrieve the property fromkey
: identifying the propertyvalue
: reference to write the property value tokeySep
: path separator TypeException if the templated type does not match the property type.
- template <class T>
-
T
getAs
(const std::string &instanceId, const std::string &key, const char keySep = data::Hash::k_defaultSep)¶ Return a property from a remote instance casted to the template type. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.
- Return
- the current property value on the remote device TypeException if the property cannot be casted to the template type
- Parameters
instanceId
: to retrieve the property fromkey
: identifying the propertykeySep
: path separator
-
std::any
getAsAny
(const std::string &instanceId, const std::string &key, const char keySep = data::Hash::k_defaultSep)¶ Return a property from a remote instance as a std::any value. The instance configuration is internally cached, so it does not necessarily result in a query to the distributed system if the device configuration has not changed since the last query.
- Return
- the current property value on the remote device as std::any type
- Parameters
instanceId
: to retrieve the property fromkey
: identifying the propertykeySep
: path separator
-
bool
cacheLoggerMap
(bool toggle)¶ Toggles caching of the DataLogger map on (true) and off (false). If set to true the logger map is always kept up to date, which speeds up repeated calls to DeviceClient::getProperyHistory.
- Return
- true if operation was successful
- Parameters
toggle
:
-
std::vector<karabo::data::Hash>
getFromPast
(const std::string &deviceId, const std::string &key, const std::string &from, std::string to = "", int maxNumData = 0)¶ Returns the history of a device property for a given period of time
- Return
- a vector of Hashes holding the property’s history. Each entry consists of a Hash with a key “v” holding the value of the appropriate type. For each entry “v” Karabo train and timestamp attributes are set which can be retrieved using the karabo::data::Timestamp::fromHashAttributes method.
- Parameters
deviceId
: of the device holding the propertykey
: path to the property on the devicefrom
: karabo::data::Epochstamp in Iso8601 format signifying the start of the time interval to get the history fromto
: karabo::data::Epochstamp in Iso8601 format signifying the end of the time interval to get the history from. If left empty default to nowmaxNumData
: maximum number of data points to retrieve, starting from the start of the interval
-
std::vector<karabo::data::Hash>
getPropertyHistory
(const std::string &deviceId, const std::string &key, const std::string &from, std::string to = "", int maxNumData = 0)¶ Returns the history of a device property for a given period of time
- Return
- a vector of Hashes holding the property’s history. Each entry consists of a Hash with a key “v” holding the value of the appropriate type. For each entry “v” Karabo train and timestamp attributes are set which can be retrieved using the karabo::data::Timestamp::fromHashAttributes method.
- Parameters
deviceId
: of the device holding the propertykey
: path to the property on the devicefrom
: karabo::data::Epochstamp in Iso8601 format signifying the start of the time interval to get the history fromto
: karabo::data::Epochstamp in Iso8601 format signifying the end of the time interval to get the history from. If left empty default to nowmaxNumData
: maximum number of data points to retrieve, starting from the start of the interval
-
std::string
getDataLogReader
(const std::string &deviceId)¶ Returns instanceId of data log reader for data of given device. Could be empty.
- Return
- Parameters
deviceId
:
-
std::pair<karabo::data::Hash, karabo::data::Schema>
getConfigurationFromPast
(const std::string &deviceId, const std::string &timepoint)¶ Returns the device configuration and corresponding schema for a given point in time. Information for the nearest matching logged time is returned.
- Return
- a pair of the configuration Hash and corresponding device Schema
- Parameters
deviceId
: of the device to return the configuration fortimepoint
: to return information for. Should be an iso8601 formatted string.
-
karabo::data::Hash
listConfigurationFromName
(const std::string &deviceId, const std::string &namePart = "")¶ Returns the configurations saved for a device under names that contain a given name part.
- Return
- a hash with the operation execution status and the list of configuration(s) and schema(s) in case of success. For the operation execution status, the returned hash has the keys “success” with a boolean value that indicates whether the the operation was successful and a key “reason” with a string value that will contain the reason for failure or will be empty in the case of success. The returned hash will also have a key “configs” whose value will be a vector of hashes with data about the configs that match the name part. If no configuration is saved for the device under a name that contains the namePart, the “configs” vector will be empty. Each hash in the “configs” vector contains the keys “name”, “timepoint”.
- Parameters
deviceId
: of the device whose named configuration(s) and schema(s) should be returned.namePart
: of the device configuration(s) and schema(s) to be returned. An empty namePart means returns all the named configuration(s) and schema(s)
-
karabo::data::Hash
getConfigurationFromName
(const std::string &deviceId, const std::string &name)¶ Returns the configuration and schema saved for a device under a given name.
- Return
- a hash with the operation execution status and the device configuration and schema in case of success. For the operation execution status, the returned hash has the keys “success” with a boolean value that indicates whether the the operation was successful and a key “reason” with a string value that will contain the reason for failure or will be empty in the case of success. The returned hash will also have a key “config” whose value will be a hash with the keys “name”, “timepoint”, “description”, “priority”, “user”, “config” and “schema” when a device configuration with the given name is found or an empty hash in case of failure or when no device configuration with the given name exists.
- Parameters
deviceId
: of the device whose named configuration and schema should be returned.name
: of the device configuration and schema to be returned.
-
karabo::data::Hash
getLastConfiguration
(const std::string &deviceId, int priority = 1)¶ Returns the most recently saved configuration for a device that has a given priority.
- Return
- a hash with the operation execution status and the device configuration and schema in case of success. For the operation execution status, the returned hash has the keys “success” with a boolean value that indicates whether the the operation was successful and a key “reason” with a string value that will contain the reason for failure or will be empty in the case of success. The returned hash will also have a key “config” whose value will be a hash with the keys “name”, “timepoint”, “description”, “priority”, “user”, “config” and “schema” when a device configuration with the given priority is found or an empty hash in case of failure or when no device configuration with the given priority exists.
- Parameters
deviceId
: of the device whose named configuration and schema should be returned.priority
: of the device configuration and schema to be returned.
-
std::pair<bool, std::string>
saveConfigurationFromName
(const std::string &name, const std::vector<std::string> &deviceIds)¶ Saves a collection of current device configurations (and the corresponding schemas) in the configuration database under a common name, user, priority and description.
- Return
- a pair with a success flag (true when the operation succeeds) in the first position and a reason failture description (empty in case of success) in the second position.
- Parameters
name
: to be assigned to the saved collection of device configurations (with schemas).deviceIds
: the devices whose current configurations (and schemas) are to be saved.
-
void
registerInstanceChangeMonitor
(const InstanceChangeThrottler::InstanceChangeHandler &callBackFunction, unsigned int throttlerIntervalMs = 500u, unsigned int maxChangesPerCycle = 100u)¶ Register a throttled callback handler to be triggered when a new device instance appears, updates its instance info record or goes away in the distributed system. The throtter that dispatches the instance changes events to the handler uses a given interval between its running cycles.
- Parameters
callBackFunction
: Function to be invoked with information about the instances changes events.throttlerInterval
: Interval, in milliseconds, between successive cycles of the throttle.maxChangesPerCycle
: Maximum number of instance changes to be dispatched per cycle of the throttler- upon reaching this limit the throttler immediately dispatches the changes, despite the elapsed time from the last cycle.
-
void
flushThrottledInstanceChanges
()¶ Flushes, asap, the throttled instance changes that are waiting to be dispatched.
-
void
registerInstanceNewMonitor
(const InstanceNewHandler &callBackFunction)¶ Register a callback handler to be triggered if a new instance appears in the distributed system.
- Parameters
callBackFunction
: which will receive the instanceInfo Hash
-
void
registerInstanceUpdatedMonitor
(const InstanceUpdatedHandler &callBackFunction)¶ Register a callback handler to be triggered if an instance receives a state update from the distributed system
- Parameters
callBackFunction
: which will receive the instanceInfo Hash
-
void
registerInstanceGoneMonitor
(const InstanceGoneHandler &callBackFunction)¶ Register a callback handler to be triggered if an instance disappears from the distributed system
- Parameters
callBackFunction
: receiving the instanceId and instanceInfo Hash
-
void
registerSchemaUpdatedMonitor
(const SchemaUpdatedHandler &callBackFunction)¶ Register a callback handler to be triggered if an instance receives a schema update from the distributed system Example:
- Note
- Currently, registering only a schema update monitor with an instance of a DeviceClient is not enough to have the registered call-back activated. A workaround for this is to also register a property monitor with the same instance of DeviceClient that has been used to register the schema update monitor.
- Parameters
callBackFunction
: receiving the instanceId and updated Schema
DeviceClient dc = std::shared_ptr<DeviceClient>(new DeviceClient()); dc->registerSchemaUpdateMonitor(fnSchemaUpdateHandler); dc->registerPropertyMonitor(“deviceId”, “property_to_monitor”, fnCallback);
-
void
registerClassSchemaMonitor
(const ClassSchemaHandler &callBackFunction)¶ Register a callback handler to be triggered if a new class appears on a device server
- Parameters
callBackFunction
: receiving the server id, class id and new class Schema
- template <class ValueType>
-
bool
registerPropertyMonitor
(const std::string &instanceId, const std::string &key, const std::function<void(const std::string&, const std::string&, const ValueType&, const karabo::data::Timestamp&)> &callbackFunction)¶ Register a callback function to be triggered when a given property on a device in the distributed system updates
- Return
- true if the operation was successful
- Parameters
instanceId
: of the device to be monitoredkey
: path to the property to be monitoredcallbackFunction
: handling the update notification. It receives the device id, path, value and timestamp of the updated property
- template <class ValueType, class UserDataType>
-
bool
registerPropertyMonitor
(const std::string &instanceId, const std::string &key, const std::function<void(const std::string&, const std::string&, const ValueType&, const karabo::data::Timestamp&, const std::any&)> &callbackFunction, const UserDataType &userData, )¶ Register a callback function to be triggered when a given property on a device in the distributed system updates. Additional user data may be passed to the callback
- Return
- true if the operation was successful
- Parameters
instanceId
: of the device to be monitoredkey
: path to the property to be monitoredcallbackFunction
: handling the update notification. It receives the device id, path, value and timestamp of the updated property as well as std::any userData.userData
: to be passed to the callback as std::any
-
void
unregisterPropertyMonitor
(const std::string &instanceId, const std::string &key)¶ Unregister a property monitor
- Parameters
instanceId
: to unregister the monitor fromkey
: path to the property to unregister from.
-
void
registerDeviceMonitor
(const std::string &instanceId, const std::function<void(const std::string&, const karabo::data::Hash&)> &callbackFunction)¶ Register a callback function to be triggered when a a device in the distributed system updates.
- Parameters
instanceId
: of the device to register tocallbackFunction
: handling the update. It will receive the device instance id and the updated device configuration Hash
-
void
registerDeviceForMonitoring
(const std::string &deviceId)¶ Registers a device to have its configurations changes monitored.
- Note
- In order to receive notifications about configuration changes for any of the monitored devices, one needs to register handlers by calling registerDeviceMonitor (updates one by one - even if updates are throttled) or with registerDevicesMonitor (bulk updates).
- Parameters
deviceId
: of the device to be added to the set of monitored devices.
-
void
registerDevicesMonitor
(const DevicesChangedHandler &devicesChangedHandler)¶ Registers a handler for configuration changes for any of the monitored devices.
- Note
- * To register a device to be monitored, a call to registerDeviceForMonitoring must be made.
- Throttling of device updates must be enabled via a call to setDeviceMonitorInterval with an argument greater than 0.
- Parameters
devicesChangesHandler
: callback function for configuration changes events for any monitored device.
-
void
unregisterDeviceFromMonitoring
(const std::string &deviceId)¶ Unregisters a device from configuration changes monitoring.
- Parameters
deviceId
: of the device to be removed from the set of monitored devices.
- template <class UserDataType>
-
void
registerDeviceMonitor
(const std::string &instanceId, const std::function<void(const std::string&, const karabo::data::Hash&, const std::any&)> &callbackFunction, const UserDataType &userData, )¶ Register a callback function to be triggered when a a device in the distributed system updates. Additional user data may be passed to the callback
- Parameters
instanceId
: of the device to register tocallbackFunction
: handling the update. It will receive the device instance id and the updated device configuration Hash as well as std::any userData.userData
: to be passed to the callback as std::any
-
void
unregisterDeviceMonitor
(const std::string &instanceId)¶ Unregister a device monitor.
- Parameters
instanceId
: to unregister the monitor from
-
bool
registerChannelMonitor
(const std::string &channelName, const InputChannelHandlers &handlers, const karabo::data::Hash &inputChannelCfg = karabo::data::Hash())¶ Register handlers to be called whenever the defined output channel receives data or end-of-stream (EOS). Internally, an InputChannel is created and configured using the cfg Hash and its connection status can be monitored via the ‘statusTracker’ of the handlers argument
- Return
- false if channel is already registered
- Parameters
channelName
: identifies the channel as a concatenation of the id of its devices, a colon (:) and the name of the output channel (e.g. A/COOL/DEVICE:output)handlers
: container for various handlers (handlers can be empty function pointers):- dataHandler std::function<void (const karabo::data::Hash&, const MetaData&)> to be called whenever data arrives
- inputHandler std::function<void (const InputChannel::Pointer&)> to be called whenever data arrives
- eosHandler std::function<void (const InputChannel::Pointer&)> called for EOS
- statusTracker std::function<void(karabo::net::ConnectionStatus)> called whenever the connection status of the underlying InputChannel changes
inputChannelCfg
: configures via InputChanel::create(..) - use default except you know what your are doing. For the expert: “connectedOutputChannels” will be overwritten
-
bool
registerChannelMonitor
(const std::string &instanceId, const std::string &channel, const karabo::xms::SignalSlotable::DataHandler &dataHandler, const karabo::data::Hash &inputChannelCfg = karabo::data::Hash(), const karabo::xms::SignalSlotable::InputHandler &eosHandler = karabo::xms::SignalSlotable::InputHandler(), const karabo::xms::SignalSlotable::InputHandler &inputHandler = karabo::xms::SignalSlotable::InputHandler())¶ Register handlers to be called whenever the defined output channel receives data or end-of-stream (EOS).
DEPRECATED - use interface with ‘InputChannelHandlers’ argument!
- Return
- false if channel is already registered
- Parameters
instanceId
: of the device having the output channelchannel
: is name of the output channeldataHandler
: std::function<void (const karabo::data::Hash&, const MetaData&) to be called whenever data arrivesinputChannelCfg
: configures via InputChanel::create(..) - use default except you know what your are doing for the expert: “connectedOutputChannels” will be overwritteneosHandler
: std::function<void (const InputChannel::Pointer&)> called for EOS if giveninputHandler
: std::function<void (const InputChannel::Pointer&)> to be called whenever data arrives
-
bool
registerChannelMonitor
(const std::string &channelName, const karabo::xms::SignalSlotable::DataHandler &dataHandler, const karabo::data::Hash &inputChannelCfg = karabo::data::Hash(), const karabo::xms::SignalSlotable::InputHandler &eosHandler = karabo::xms::SignalSlotable::InputHandler(), const karabo::xms::SignalSlotable::InputHandler &inputHandler = karabo::xms::SignalSlotable::InputHandler())¶ Register handlers to be called whenever the defined output channel receives data or end-of-stream (EOS).
DEPRECATED - use interface with ‘InputChannelHandlers’ argument!
- Return
- false if channel is already registered
- Parameters
channelName
: identifies the channel as a concatenation of the id of its devices, a colon (:) and the name of the output channel (e.g. A/COOL/DEVICE:output)dataHandler
: std::function<void (const karabo::data::Hash&, const MetaData&) to be called whenever data arrivesinputChannelCfg
: configures via InputChanel::create(..) - use default except you know what your are doing for the expert: “connectedOutputChannels” will be overwritteneosHandler
: std::function<void (const InputChannel::Pointer&)> called for EOS if giveninputHandler
: std::function<void (const InputChannel::Pointer&)> to be called whenever data arrives
-
bool
unregisterChannelMonitor
(const std::string &instanceId, const std::string &channel)¶ Unregister monitoring of output channel
- Return
- false if channel was not registered
- Parameters
instanceId
: of the device having the output channelchannel
: is name of the output channel
-
bool
unregisterChannelMonitor
(const std::string &channelName)¶ Unregister monitoring of output channel
- Return
- false if channel was not registered
- Parameters
channelName
: identifies the channel as a concatenation of the id of its devices, a colon (:) and the name of the output channel (e.g. A/COOL/DEVICE:output)
- template <class T>
-
void
set
(const std::string &instanceId, const std::string &key, const T &value, int timeoutInSeconds = -1, const char keySep = data::Hash::k_defaultSep)¶ Set a remote property in the distributed system
- Parameters
instanceId
: of the device to set the property onkey
: path to the property to setvalue
: to settimeoutInSeconds
: maximum timeout until set operation fails, set to -1 to wait foreverkeySep
: path separator
- template <class T>
-
void
setNoWait
(const std::string &instanceId, const std::string &key, const T &value, const char keySep = data::Hash::k_defaultSep)¶ Set a remote property in the distributed system as a fire-and-forget operation. Warning: there is no guarantee and indication if the set succeeded!
- Parameters
instanceId
: of the device to set the property onkey
: path to the property to setvalue
: to setkeySep
: path separator
-
void
set
(const std::string &instanceId, const karabo::data::Hash &values, int timeoutInSeconds = -1)¶ Bulk-set remote properties in the distributed system
- Parameters
instanceId
: of the device to set the property onvalues
: a Hash containing the to be set value in a path structure indicating which properties to settimeoutInSeconds
: maximum timeout until set operation fails, set to -1 to wait forever
-
void
setNoWait
(const std::string &instanceId, const karabo::data::Hash &values)¶ Bulk-set remote properties in the distributed system as a fire-and-forget operation. Warning: there is no guarantee and indication if the set succeeded!
- Parameters
instanceId
: of the device to set the property onvalues
: a Hash containing the to be set value in a path structure indicating which properties to set
-
void
executeNoWait
(const std::string &deviceId, const std::string &command)¶ Executes a function on a device (an exposed via its Schema) and immediately returns (fire & forget)
- Parameters
deviceId
: The deviceIdcommand
: Name of the command
- template <typename… Args>
-
void
execute
(const std::string &deviceId, const std::string &command, int timeoutInSeconds = 3, const Args&... slotArgs)¶ Executes a function on a device synchronously (waits until the function finished)
- Template Parameters
Args
: Variadic template for the slot args (no arg is a particular case).
- Parameters
deviceId
: The devideIdcommand
: The commandtimeoutInSeconds
: Timeout
- template <typename R1, typename… Args>
-
R1
execute1
(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)¶ Synchronously executes a slot that returns a single element response.
- Return
- A value of R1 type
- Template Parameters
R1
: Type of the response.Args
: Variadic template for the slot arguments.
- Parameters
deviceId
: Id of the device whose slot should be executed.slotName
: Name of the slot to execute.timeoutInSeconds
: Timeout for the slot execution.slotArgs
: Slot arguments.
- template <typename R1, typename R2, typename… Args>
-
std::tuple<R1, R2>
execute2
(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)¶ Synchronously executes a slot that returns a two element tuple as a response.
- Note
- a tuple, instead of a pair, is used as the return value for uniformity with the other executeN methods.
- Return
- std::tuple<R1, R2> with the results of the slot execution.
- Template Parameters
R1
: Type of first element of the resulting pair.R2
: Type of second element of the resulting pair.Args
: Variadic template for the slot arguments.
- Parameters
deviceId
: Id of the device whose slot should be executed.slotName
: Name of the slot to execute.timeoutInSeconds
: Timeout for the slot execution.slotArgs
: Slot arguments.
- template <typename R1, typename R2, typename R3, typename… Args>
-
std::tuple<R1, R2, R3>
execute3
(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)¶ Synchronously executes a slot that returns a three element tuple as a response.
- Return
- std::tuple<R1, R2, R3> Tuple with the results of the slot execution.
- Template Parameters
R1
: Type of first element of the resulting tuple.R2
: Type of second element of the resulting tuple.R3
: Type of third element of the resulting tuple.Args
: Variadic template for the slot arguments.
- Parameters
deviceId
: Id of the device whose slot should be executed.slotName
: Name of the slot to execute.timeoutInSeconds
: Timeout for the slot execution.slotArgs
: Slot arguments.
- template <typename R1, typename R2, typename R3, typename R4, typename… Args>
-
std::tuple<R1, R2, R3, R4>
execute4
(const std::string &deviceId, const std::string &slotName, int timeoutInSeconds = 3, const Args&... slotArgs)¶ Synchronously executes a slot that returns a four element tuple as a response.
- Return
- std::tuple<R1, R2, R3, R4> Tuple with the results of the slot execution.
- Template Parameters
R1
: Type of first element of the resulting tuple.R2
: Type of second element of the resulting tuple.R3
: Type of third element of the resulting tuple.R4
: Type of fourth element of the resulting tuple.Args
: Variadic template for the slot arguments.
- Parameters
deviceId
: Id of the device whose slot should be executed.slotName
: Name of the slot to execute.timeoutInSeconds
: Timeout for the slot execution.slotArgs
: Slot arguments.
-
karabo::data::Hash
getOutputChannelSchema
(const std::string &deviceId, const std::string &outputChannelName)¶ Request the data schema for an output channel as a Hash containing relevant information
- Return
- a Hash containing the output channel’s data schema
- Parameters
deviceId
:outputChannelName
:
-
std::vector<std::string>
getOutputChannelNames
(const std::string &deviceId)¶ Get the list of all output channel names of the remote device.
- Return
- vector containing output channel names
- Parameters
deviceId
:
-
karabo::core::Lock
lock
(const std::string &deviceId, bool recursive = false, int timeout = -1)¶ Request locking of device at deviceId. Throws a karabo::util::LockException in case the lock cannot be acquired in the given timeout
- Return
- a Lock object, holding the lock to deviceId.
- Parameters
deviceId
: the device to be lockedrecursive
: if true, recursive locks on this device are allowedtimeout
: timeout during which we try to acquire the lock. Set to -1 to wait try indefinitely, 0 to only try once, otherwise give integer seconds to wait.
Protected Types
-
typedef std::map<std::string, int>
InstanceUsage
¶ Map of devices that we are connected to with timer stating their age since last access. Before C++14 not an unordered_map since we want to erase while looping over it
Protected Functions
-
karabo::data::Hash
prepareTopologyEntry
(const std::string &path, const karabo::data::Hash &instanceInfo) const¶ Prepare a topology entry for the runtime system description
- Parameters
path
: the path created with prepareTopologyPath using instanceId and instanceInfoinstanceInfo
: The instanceInfo Hash received from the broadcast
-
void
mortalize
(const std::string &deviceId)¶ Unmark deviceId from staying connected all the time without ageing.
Also clears a zombie (marked by negative age) from m_instanceUsage and thus locks m_instanceUsageMutex. That means, unlike immortalize(..) and isImortal(..), mortalize(..) must not be called under protection of m_instanceUsageMutex.
-
bool
eraseFromRuntimeSystemDescription
(const std::string &path)¶ returns true if path could be removed
-
data::Hash
getSectionFromRuntimeDescription
(const std::string §ion) const¶ Get section (e.g. “device”) from runtime description. Returns empty Hash if section does not exist.
-
std::string
findInstanceSafe
(const std::string &instanceId) const¶ Find full path of ‘instanceId’ in m_runtimeSystemDescription, empty if path does not exist.
Protected Attributes
-
karabo::data::Hash
m_runtimeSystemDescription
¶ server + <serverId> type host version status deviceClasses + classes + <classId> + description SCHEMA configuration HASH description SCHEMA configuration HASH
device + <deviceId> type host version status classId serverId + fullSchema => SCHEMA configuration => HASH activeSchema + <stateName> => SCHEMA
-
DevicesChangedHandler
m_devicesChangesHandler
¶ Handler for all monitored devices configuration updates during last interval.
-
boost::asio::steady_timer
m_signalsChangedTimer
¶ defines whether aging is running or not
-
std::mutex
m_loggerMapMutex
¶ map of collected signalChanged
Private Types
-
typedef std::map<std::string, std::set<std::string>>
SignalChangedMap
¶ keys are instance IDs, values are a sets of properties that changed
Private Functions
-
std::string
findInstance
(const std::string &instanceId) const¶ As findInstanceSafe, but to be called under protection of m_runtimeSystemDescriptionMutex.
-
void
doSendSignalsChanged
(const SignalChangedMap &signalChangedMap)¶ Actually process data in ‘signalChangedMap’ - try/catch should be outside.
-
bool
connectNeeded
(const std::string &instanceId)¶ Marks ‘instanceId’ as used. Returns true if explicit “connect” call should still be done for it.
-
void
initServiceDeviceIds
(const karabo::data::Hash &serviceDeviceIds)¶ Internal helper method to initialize the service device ids members of the DeviceClient instance.
- Parameters
serviceDeviceIds
: A hash with ids of core service devices; e.g, “dataLoggerManagerId” key and the value is the name of the DataLoggerManager the device client instance should use for data logging operations. Currently keys “dataLoggerManagerId” and “configurationManagerId” are supported. If a supported key is missing, the default ID for the service device type is used.
-
std::vector<std::pair<std::string, karabo::data::Hash>>
findAndEraseDevicesAsGone
(const std::string &serverId)¶ Helper for _slotInstanceGone for servers
Finds all devices that belong to given server, removes them from m_runtimeSystemDescription and returns pairs of their deviceIds and instanceInfo. Requires protection of m_runtimeSystemDescriptionMutex.
-
void
treatInstanceAsGone
(const std::string &instanceId, const karabo::data::Hash &instanceInfo)¶ Helper for _slotInstanceGone
Does all needed action
- despite of removal from m_runtimeSystemDescription
- and despite of special treatment of devices on the server if instance is a server
-
struct
InputChannelHandlers
¶ - #include <DeviceClient.hh>
Container of handlers for InputChannel, to be passed to
bool registerChannelMonitor(const std::string& channelName, const InputChannelHandlers& handlers, const karabo::data::Hash& inputChannelCfg = karabo::data::Hash());
See documentation of that method for meaning of various handlers.
Public Functions
-
InputChannelHandlers
(const karabo::xms::SignalSlotable::DataHandler &data, const karabo::xms::SignalSlotable::InputHandler &eos = karabo::xms::SignalSlotable::InputHandler(), const std::function<void(karabo::net::ConnectionStatus)> &status = std::function<void(karabo::net::ConnectionStatus)>())¶ Construct with all handlers except input handler (could be specified afterwards)
-
InputChannelHandlers
(const karabo::xms::SignalSlotable::InputHandler &input, const karabo::xms::SignalSlotable::InputHandler &eos = karabo::xms::SignalSlotable::InputHandler(), const std::function<void(karabo::net::ConnectionStatus)> &status = std::function<void(karabo::net::ConnectionStatus)>())¶ Construct with all handlers except data handler (could be specified afterwards)
-
-
-
class
DeviceServer
¶ - #include <DeviceServer.hh>
The DeviceServer class hosts device instances.
The DeviceServer class hosts device instances. It monitors the system for new class plugins appearing and notifies the distributed system of these and their static information.
Inherits from karabo::xms::SignalSlotable
Public Functions
-
DeviceServer
(const karabo::data::Hash &config)¶ The constructor expects a configuration Hash. The following configuration options are supported:
- serverId: a string giving the server’s id
- init: a json string containing configurations for device that are to be automatically started by the server
- connection: a Hash containing the connection information for a karabo::net::Broker
- pluginDirectory: a path to the plugin directory for this device server
- heartbeatInterval: interval in seconds at which this server sends heartbeats to the distributed system
- Parameters
-
bool
isRunning
() const¶ Check if the device server is running
- Return
-
void
autostartDevices
()¶ It just launches devices that marked to be started by server automatically.
Public Static Functions
-
void
expectedParameters
(karabo::data::Schema&)¶ Static properties of the device server
- Parameters
to
: inject these properties to
Private Functions
-
std::tuple<std::string, std::string, data::Hash>
prepareInstantiate
(const data::Hash &configuration)¶ Helper to create input passed to instantiate. Returns a tuple of the deviceId, the classId and the configuration.
-
void
instantiate
(const std::string &deviceId, const std::string &classId, const data::Hash &config, const SignalSlotable::AsyncReply &asyncReply)¶ Helper for instantiateDevices - e.g. provides the (async) reply for slotStartDevice.
-
void
slotTimeTick
(unsigned long long id, unsigned long long sec, unsigned long long frac, unsigned long long period)¶ A slot called by the time-server to synchronize this device with the timing system.
- Parameters
id
: current train idsec
: current system secondsfrac
: current fractional secondsperiod
: interval between subsequent ids in microseconds
-
void
timeTick
(const boost::system::error_code ec, unsigned long long newId)¶ Helper function for internal time ticker deadline timer to provide internal clock that calls ‘onTimeUpdate’ for every id even if slotTimeTick is called less often.
- Parameters
ec
: error code indicating whether deadline timer was cancelledid
: current train id
-
-
class
InstanceChangeThrottler
¶ - #include <InstanceChangeThrottler.hh>
Receives instance new, update and gone messages and dispatches them to an interested part in “cycles” spaced by a given interval. Also takes care of removing redundant message sequences.
The second level hashes are the values of the root hash. Their keys are the types of the instances whose changes are on the third level hashes. The keys are the contents of the InstanceInfo.Type field in the instance change data. Typical values for second level keys would be “device”, “server” and “macro”.
- Note
- The hash is composed of three level of hashes. The root hash has the types of the instances changes as its keys . The possible values for this first level keys are “new”, “gone” and “update”. The three keys will always be present in the root hash, even when a particular cycle has no change of the given type to dispatch.
The third level hashes are the values of the second level hashes. Their keys will be the instanceIds in the instance change data. Those keys can be either a deviceId, a serverId or any other kind of Id, depending on the type of the instance. The the third level hashes will be the ones resulting from calling the InstanceChangeEntryEncoder function passed as an argument to the Throttler with the InstanceId and InstanceInfo in the instance change data. For “new” and “update” changes the third level hash will be an empty hash with the input InstanceInfo fields as attributes. For “gone” changes the third level hash will not be empty and will have the same lay-out as the input InstanceInfo hash.
Inherits from std::enable_shared_from_this< InstanceChangeThrottler >
Public Functions
-
void
submitInstanceNew
(const std::string &instanceId, const karabo::data::Hash &instanceInfo)¶ Submits an instance new change for dispatching by the throttler.
- Parameters
instanceId
: The id of the instance the new change refers to.instanceInfo
: Information about the instance the new change refers to.
-
void
submitInstanceUpdate
(const std::string &instanceId, const karabo::data::Hash &instanceInfo)¶ Submits an instance update change for dispatching by the throttler.
- Parameters
instanceId
: The id of the instance the update change refers to.instanceInfo
: Information about the instance the update change refers to.
-
void
submitInstanceGone
(const std::string &instanceId, const karabo::data::Hash &instanceInfo)¶ Submits an instance gone change for dispatching by the throttler.
- Parameters
instanceId
: The id of the instance the gone change refers to.instanceInfo
: Information about the instance the gone change refers to.
-
unsigned int
cycleIntervalMs
() const¶ The interval, in milliseconds, between cycles of the throttler.
-
unsigned int
maxChangesPerCycle
() const¶ The maximum number of instance changes entries to be dispatched per throttler cycle. If this limit is reached before the throttler interval elapses, a cycle is started immediately to dispatch the changes to the registered handler.
-
std::string
getInstChangeTypeStr
(InstChangeType changeType) const¶ Returns the string representation of a given InstChangeType value.
-
void
flush
()¶ Flushes the throttler by making it dispatch the instance changes it has stored asap.
- Note
- this is a wrapper for the private flushThrottler(bool) method.
Public Static Functions
-
std::shared_ptr<InstanceChangeThrottler>
createThrottler
(const InstanceChangeHandler &instChangeHandler, unsigned int cycleIntervalMs = 500u, unsigned int maxChangesPerCycle = 100)¶ InstanceChangeThrottler factory.
- Return
- A shared pointer to an InstanceChangeThrottler.
- Note
- The Throttler only has a private constructor; every instantiation of a Throttler must come from this factory method. It takes care of initializing the newly instantiated Throttler.
- Note
- instChangeEntryEncoder has been added to allow the Throttler to call DeviceClient::prepareTopologyEntry without directly knowing DeviceClient.
- Parameters
instChangeHandler
: The handler for instance change events dispatched by the Throttler.cycleIntervalMs
: The interval in milliseconds between throttler cycles.maxChangesPerCycle
: The maximum number of instance changes entries to be dispatched per throttler cycle. If this limit is reached before the throttler interval elapses, a cycle is started immediately to dispatch the changes to the handler.
Private Functions
-
Hash
instNewUpdateEncoder
(const std::string &instanceId, const karabo::data::Hash &instanceInfo) const¶ Encodes the instanceInfo hash into the format that the Throttler uses internally for changes of type NEW and UPDATE.
- Return
- a hash whose only key is the instanceId, with the keys/values in instanceInfo as attributes and an empty hash as the only value.
- Parameters
instanceId
: the id of the instance the change is about.instanceInfo
: the instanceInfo hash as used by the karabo GUI.
-
karabo::data::Hash
instGoneEncoder
(const std::string &instanceId, const karabo::data::Hash &instanceInfo) const¶ Encodes the instanceInfo hash into the format that the Throttler uses internally for changes of type GONE.
- Return
- a hash whose only key is the instanceId and whose only value is the instanceInfo hash.
- Parameters
instanceId
: the id of the instance the change is about.instanceInfo
: the instanceInfo hash as used by the karabo GUI.
-
void
addChange
(InstChangeType changeType, const std::string &instanceId, const karabo::data::Hash &instanceInfo)¶ Adds an instance change to m_instChanges.
As part of the addition, performs some optimizations to the set of events already in the hash. It can happen that the new change actually “cancels” a set of changes that had been previously added. An example: an ‘instance gone’ event can “cancel” all the ‘instance new’ and ‘instance update’ events related to the same instance; in this scenario, the ‘addition’ of the gone event would actually consist of the removal of the other events related to the same instance.
- Parameters
changeType
:instanceId
:instanceInfo
:
-
void
runThrottlerCycleAsync
(const boost::system::error_code &e)¶ Throttler cycle execution. For each cycle, the throttler dispatches the instances changes hash.
- Parameters
e
: Error code passed by the boost::asio::deadline_timer ticking mechanism.
-
void
kickNextThrottlerCycleAsync
()¶ Schedules the next throttler event dispatching cycle.
-
void
flushThrottler
(bool kickNextCycle = true)¶ Flushes the throttler by running its dispatching loop immediately.
- Note
- Assumes that the mutex for acessing instanceChange data is acquired by a caller (either the direct caller or another caller down the activation stack).
- Parameters
if
: true, the next throttler cycle is scheduled after the flush completes.
Private Members
-
karabo::data::Hash
m_instChanges
¶ A Hash with all the instances changes to be dispatched by the Throttler in its next cycle.
Description for the Hash format can be found in the class documentation.
-
class
Lock
¶ Public Functions
-
Lock
(std::weak_ptr<karabo::xms::SignalSlotable> sigSlot, const std::string &deviceId, bool recursive = false)¶ Create a lock on a device. Throws a karabo::util::LockException if the lock cannot be acquired
- Parameters
sigSlot
: a SignalSlotable instance to use for locking the remote devicedeviceId
: the deviceId of the device to lockrecursive
: allow recursive locking if true
-
~Lock
()¶ The destructor unlocks the device the lock is held on if the lock is valid. It is called explictly if the lock was stolen and will then throw a karabo::util::LockException
-
void
lock
(bool recursive = false) const¶ Reacquire a lock if this lock was previously unlocked
- Parameters
recursive
: allow recursive locking if true
-
void
unlock
() const¶ Unlock this lock
-
bool
valid
() const¶ Returns if this lock is currently valid. Note that the locked device will be queried through the distributed system when asking for lock validity.
Private Functions
-
void
lock_impl
(bool recursive) const¶ Perform locking. Calling this function leads to the following remote calls:
1) check if we are allowed to lock: the lockedBy field on the remote device is either empty, or if recursive == true contains the lock requestor’s device id
2) request locking, e.g. set the lockedBy field. This can still fail if another device locked in between
3) check if we are the lock holder: lockedBy should now contain our device id
- Parameters
recursive
: allow recursive locking if true
-
void
unlock_impl
() const¶ Simply calls the clearLock slot on the locked device if we are the lock-holder
-
-
struct
QueueWorker
¶ Inherits from karabo::core::BaseWorker< karabo::data::Hash::Pointer >
-
class
Runner
¶ - #include <Runner.hh>
The Runner class starts device-servers in the distributed system.
The Runner class instantiates device-servers in the distributed system. It parses command line arguments to deduce configuration.
Public Static Functions
-
DeviceServer::Pointer
instantiate
(int argc, const char **argv)¶ Instantiates a device server taking command line arguments into account
Users of this function must check the returned pointer for validity. The pointer may be empty in case the help option is given.
- Return
- Pointer to device server instance (may be empty)
- Parameters
argc
: Number of commandline argumentsargv
: String array of commandline options
-
DeviceServer::Pointer
-
struct
Worker
¶ - #include <Worker.hh>
A worker that passes any data received in its queue to a callback function working asynchronously working in a separate thread.
Inherits from karabo::core::BaseWorker< bool >
Public Functions
-
Worker
(const std::function<void()> &callbackint delay = -1, int repetitions = -1, )¶ Instantiate a worker with a callback function to work on data. See Worker::WorkerBase for options
- Parameters
callback
:delay
:repetitions
:
-
-
enum
The karabo::devices Namespace¶
-
namespace
karabo
::
devices
¶ Namespace for package core
Typedefs
-
typedef nlohmann::json
json
¶
-
using
karabo::devices::BeginTemporarySessionHandler = typedef std::function<void(const BeginTemporarySessionResult&)>
-
using
karabo::devices::ExpirationHandler = typedef std::function<void(const ExpiredTemporarySessionInfo&)>
Handler for expired temporary session events.
-
using
karabo::devices::EminentExpirationHandler = typedef std::function<void(const EminentExpirationInfo&)>
Handler for “temporary session about to expire” events.
-
using
karabo::devices::AsyncHandler = typedef std::function<void()>
-
using
karabo::devices::InfluxResponseHandler = typedef std::function<void(const karabo::net::HttpResponse&)>
Enums
Functions
- template <class T>
-
void
addToSetOrCreate
(Hash &h, const std::string &key, const T &object)¶ Helper function used below: Add object to set<T> at key’s position in h - if no such key exists, create one
-
static void
trim_vector_elements
(std::vector<std::string> &v)¶
Variables
-
constexpr karabo::data::Schema::AccessLevel
MAX_LOGIN_ACCESS_LEVEL
= karabo::data::Schema::AccessLevel::EXPERT¶
-
constexpr karabo::data::Schema::AccessLevel
MAX_TEMPORARY_SESSION_ACCESS_LEVEL
= karabo::data::Schema::AccessLevel::EXPERT¶
-
constexpr unsigned int
CHECK_TEMPSESSION_EXPIRATION_INTERVAL_SECS
= 5U¶
-
const unsigned int
defVectorMaxSize
= 100¶
-
struct
BeginTemporarySessionResult
¶ Inherits from karabo::net::OneTimeTokenAuthorizeResult
-
class
DataLogger
¶ - #include <DataLogger.hh>
A DataLogger device is assigned devices in the distributed system and logs their slow control data.
DataLoggers are managed by the karabo::devices::DataLoggerManager.
Each is able to log any number of devices. This list can be specified at instantiation, but can also dynamically changed by the slots slotTagDeviceToBeDiscontinued and slotAddDevicesToBeLogged. When the logger is ready to log data, its state changes from INIT to NORMAL.
Inherits from karabo::core::Device
Subclassed by karabo::devices::FileDataLogger, karabo::devices::InfluxDataLogger
Protected Functions
-
void
initializeLoggerSpecific
()¶ Do some actions here that may require asynchronous logic … and, finally, startConnection() should be called This function may be overridden by derived classes but at the end the ‘startConnection’ function should be called as a last step of initialization
-
bool
removeFrom
(const std::string &str, const std::string &vectorProp)¶ Helper to remove an element from a vector<string> element - needs protection by m_perDeviceDataMutex. Note that if the same element is in the vector more than once, only the first one is removed.
- Return
- whether could be removed
- Parameters
str
: the element to removevectorProp
: the key of the vector<string> element
-
bool
appendTo
(const std::string &str, const std::string &vectorProp)¶ Helper to add an element to a vector<string> element - needs protection by m_perDeviceDataMutex. Note that if the same element is already in, it will not be added again.
- Return
- whether it was added (i.e. false if ‘str’ was already in the vectorProperty
- Parameters
str
: the element to addvectorProp
: the key of the vector<string> element
-
void
preDestruction
()¶ Override preDestruction from Device class
Private Functions
-
void
slotTagDeviceToBeDiscontinued
(const std::string &reason, const std::string &deviceId)¶ FIXME: Update text This tags a device to be discontinued, three cases have to be distinguished
(a) Regular shut-down of the device (wasValidUpToNow = true, reason = ‘D’) (b) Silent death of the device (wasValidUpToNow = true, reason = ‘D’) (c) Start-up of this (DataLogger) device whilst the device was alive (wasValidUpToNow = false, reason = ‘L’)
This slot will be called by the DataLoggerManager
Helper for connecting to signalChanged.
Flush data in file hierarchy or to the database tables
- Parameters
aReplyPtr
: if pointer to an AsyncReply that (if non-empty) has to be called without argument when done
“Flush” data accumulated in the internal cache to the external storage (file, database,…)
-
bool
allowLock
() const¶ This device may not be locked
- Return
- false
-
void
-
class
DataLoggerManager
¶ - #include <DataLoggerManager.hh>
The DataLoggerManager manages device loggers in the distributed system.
In the Karabo distributed system two types of data archiving exist:
- run associated archival of scientific data through the DAQ system
- constant device state and slow control logging using the data logger services
This device manages the data loggers used in the second logging scenario. It is the central configuration point to set via its expected parameters the
- flushInterval: at which loggers flush their data to disk
- maximumFileSize: of log files after which a new log file chunk is created
- directory: the directory into which loggers should write their data
- serverList: a list of device servers which each runs one logger. Each device in the distributed system is assigned to one logger. They are added to the loggers on these servers in a round robin fashion, allowing for load balancing. Assignment is made permanent in a loggermap.xml file that is regularly written to disk. This allows to distribute the servers in the serverList to be distributed among several hosts and still have fixed places for reading the data back.
Inherits from karabo::core::Device
Private Functions
-
std::pair<bool, std::string>
checkSummary
()¶ Assemble summary from m_checkStatus and clear for next use. The boolean returned tells whether the status requires operator attention.
-
void
printLoggerData
() const¶ Print internal cash of logger status. Needs to be protected by m_strand.
-
void
forceDeviceToBeLogged
(const std::string &deviceId)¶ If deviceId’s logging status is fishy, re-add to its logger. Needs to be protected by m_strand.
- Parameters
deviceId
: the fishy device
-
void
slotGetLoggerMap
()¶ Request the current mapping of loggers to servers. The reply contains a Hash where the the logger id is the key and the server id the value.
-
std::string
loggerServerId
(const std::string &deviceId, bool addIfNotYetInMap)¶ Get id of server that should run logger for given device that should be logged
- Return
- the server id - can be empty if addIfNotYetInMap == false
- Parameters
deviceId
: the device that should be loggedaddIfNotYetInMap
: whether to create a server/logger relation in the logger map in case it does not yet exist for deviceId
-
std::string
serverIdToLoggerId
(const std::string &serverId) const¶ Get id of DataLogger running on server with id ‘serverId’
-
std::string
loggerIdToServerId
(const std::string &loggerId) const¶ Get id of server that should run logger with id ‘loggerId’
-
bool
allowLock
() const¶ This device may not be locked
- Return
- false
-
void
evaluateBlockedOnStrand
(const karabo::data::Hash &oldList, const karabo::data::Hash &newList)¶ Evaluate old and new configuration in order to possibly start/stop archiving for some devices/device classes.
- Parameters
oldList
: old configuration for blocked devices/classesnewList
: new configuration for blocked devices/classes
-
std::vector<karabo::data::Hash>
makeLoggersTable
()¶ create a vector of hashes that can be loaded in a table from the Hash with the mapping device -> logger
Private Members
-
karabo::data::Hash
m_checkStatus
¶ 1st level keys: entries in m_serverList, 2nd level: “state”, “backlog”, “beingAdded” and “devices”
-
std::unordered_map<std::string, std::set<std::string>>
m_knownClasses
¶ Keep track of all important stuff during check.
-
const std::string
m_blockListFile
¶ Hash with ‘deviceIds’ and ‘classIds’ entries.
-
class
DataLogReader
¶ - #include <DataLogReader.hh>
DataLogReader devices read archived information from the Karabo data loggers.
DataLogReader devices read archived information from the Karabo data loggers. They are managed by karabo::devices::DataLoggerManager devices. Calls to them should usually not happen directly, but rather through a karabo::core::DeviceClient and it’s karabo::core::DeviceClient::getPropertyHistory and karabo::core::DeviceClient::getConfigurationFromPast methods.
Inherits from karabo::core::Device
Subclassed by karabo::devices::FileLogReader, karabo::devices::InfluxLogReader
Protected Functions
-
void
slotGetPropertyHistory
(const std::string &deviceId, const std::string &property, const karabo::data::Hash ¶ms)¶ Use this slot to get the history of a given property The slot replies a vector of Hashes where each entry consists of a Hash with a key “v” holding the value of the property at timepoint signified in “v“‘s attributes using a format compatible with karabo::data::Timestamp::fromHashAttributes
- Parameters
deviceId
: for which to get the history forproperty
: path to the property for which to get the history fromparams
: Hash containing optional limitations for the query:- from: iso8601 timestamp indicating the start of the interval to get history from
- to: iso8601 timestamp indicating the end of the interval to get history from
- maxNumData: maximum number of data points to retrieve starting from start
-
void
slotGetConfigurationFromPast
(const std::string &deviceId, const std::string &timepoint)¶ Request the configuration Hash and schema of a device at a given point at time. Depending on the device status and on the availability of logged data, the configuration and schema returned will be:
- If the device was online and logging data at the given timepoint, the configuration and the schema will be the ones that were active at the timepoint;
- If the device was offline at the given timepoint, but there is data logged for it before the timepoint, the last active configuration and schema before that timepoint will be returned;
- If the device was offline at the given timepoint and there’s no data logged before the timepoint, an empty configuration and an empty schema will be returned.
The slot replies with a tuple of 4 values. The first two are the Hash and Schema objects containing the configuration Hash and the corresponding device schema for timepoint. The third is a boolean whose true value indicates the device was online and actively logging data at the timepoint. The fourth value is the string form of the timepoint for the configuration returned and will be the latest timestamp among the timestamps of the properties in the configuration returned.
- Parameters
deviceId
: of the device to get the configuration fromtimepoint
: in iso8601 format for which to get the information
An important note: if no configuration is found for the device at the timepoint (or before the timepoint), the third value in the reply will be false and the fourth will be the string form of the Epoch (01/01/1970 at 00:00:00).
-
void
onOk
()¶ helper functions to handle state transition in derived classes: onOk: sets the State to ON
-
const string
onException
(const std::string &message)¶ helper functions to handle state transition in derived classes: onException: sets the State to ERROR, logs the exception trace to status and to the Karabo Logs
- Return
- the exception trace
- Parameters
message
: a string to be prepended to the trace
-
void
-
struct
DeviceData
¶ Inherits from std::enable_shared_from_this< DeviceData >
Subclassed by karabo::devices::FileDeviceData, karabo::devices::InfluxDeviceData
Public Functions
-
virtual void
handleChanged
(const karabo::data::Hash &config, const std::string &user) = 0¶ Called when configuration updates arrive for logging
- Parameters
config
: a Hash with the updates and their timestampsthe
: user responsible for this update - if any
-
virtual void
handleSchemaUpdated
(const karabo::data::Schema &schema, const karabo::data::Timestamp &stamp) = 0¶ Called when a Schema update arrive for logging
- Parameters
schema
: - the new onestamp
: - the timestamp to be assigned for that update
-
void
getPathsForConfiguration
(const karabo::data::Hash &configuration, const karabo::data::Schema &schema, std::vector<std::string> &paths) const¶ Retrieves the paths of the leaf nodes in a given configuration. The paths are returned in ascending order of their corresponding nodes timestamps.
- Note
- karabo::devices::DataLogReader depends on the configuration items being properly sorted in time to retrieve configuration changes.
- Parameters
configuration
: A configuration with the nodes corresponding to the paths.schema
: The schema for the configuration hash.paths
: The paths of the leaf nodes in the configuration, sorted by nodes timestamps.
-
virtual void
-
class
FileDataLogger
¶ Inherits from karabo::devices::DataLogger
Private Functions
“Flush” data accumulated in the internal cache to the external storage (file, database,…)
-
struct
FileDeviceData
¶ Inherits from karabo::devices::DeviceData
Public Functions
-
void
handleChanged
(const karabo::data::Hash &config, const std::string &user)¶ Called when configuration updates arrive for logging
- Parameters
config
: a Hash with the updates and their timestampsthe
: user responsible for this update - if any
-
bool
updatePropsToIndex
()¶ Helper function to update data.m_idxprops, returns whether data.m_idxprops changed.
-
void
ensureFileClosed
()¶ Helper to ensure archive file is closed. Must only be called from functions posted on ‘data.m_strand’.
-
std::pair<bool, size_t>
ensureFileOpen
()¶ Helper to ensure archive file (m_configStream) is open. Must only be called from functions posted on ‘data.m_strand’.
- Return
- pair of * whether it is a new file (in contrast to a re-opened existing one)
- current file position, size_t(-1) tells that file could not be opened (permissions?)
-
void
handleSchemaUpdated
(const karabo::data::Schema &schema, const karabo::data::Timestamp &stamp)¶ Called when a Schema update arrive for logging
- Parameters
schema
: - the new onestamp
: - the timestamp to be assigned for that update
-
void
-
struct
FileLoggerIndex
¶ - #include <FileLogReader.hh>
A compound for representing indexes in text file logged data.
-
class
FileLogReader
¶ - #include <FileLogReader.hh>
A reader for data logs stored in text files by the class karabo::devices::FileDataLogger.
Inherits from karabo::devices::DataLogReader
Private Functions
-
void
readToHash
(karabo::data::Hash &hashOut, const std::string &path, const karabo::data::Timestamp ×tamp, const std::string &type, const std::string &value) const¶ Internal helper: Place ‘value’ interpreted as ‘type’ (and with given ‘timestamp’) into ‘hashOut’ at ‘path’.
-
std::pair<bool, FileLoggerIndex>
findLoggerIndexTimepoint
(const std::string &deviceId, const std::string &timepoint)¶ Retrieves, from the logger index, the event of type “device became online” that is closest, but not after a given timepoint. The retrieved logger index event can be used as a starting point for sweeping the device log for the last known given configuration at that timepoint.
- Return
- a pair whose ‘first’ is a boolean that indicates whether configuration was active at the timepoint (true) or whether it is a configuration from the most recent activation of the device prior to the timepoint because the device was not active logging at the timepoint. The pair’s ‘second’ value is the logger index of the given device that is the closest “device became online” event that is not after the given timepoint.
- Parameters
deviceId
: the device whose logger index event should be retrieved.timepoint
: the timepoint that will be used as the reference to find the logger index event.
-
FileLoggerIndex
findNearestLoggerIndex
(const std::string &deviceId, const karabo::data::Epochstamp &timepoint, const bool before)¶ Find logger closest index from archive_index.txt file that is before/after (according to ‘before’) ‘timepoint’. If there is none before (after) but that is asked for, take the one just after (before).
-
size_t
findPositionOfEpochstamp
(std::ifstream &f, double stamp, size_t left, size_t right, bool preferBefore)¶ Find index of that MetaData::Record in ‘f’ (between indices ‘left’ and ‘right’) that matches the Epochstamp ‘stamp’. In case no exact match (within 1 ms) is found, ‘preferBefore’ decides whether the index with a smaller or larger time stamp is returned.
-
void
extractTailOfArchiveIndex
(const std::string &tail, FileLoggerIndex &entry) const¶ Helper to extract DataLoggerIndex values out of the tail of a line in archive_index.txt. The tail is everything after event, timestampAsIso8061 and timestampAsDouble. The entry has to be partly filled (m_event and m_epoch) and partly serves as output (m_train, m_position, m_user and m_fileindex). Works for lines written to archive_index.txt by >= 1.5
-
void
-
class
GuiServerDevice
¶ - #include <GuiServerDevice.hh>
The GuiServerDevice mediates between GUI clients and the distributed system.
The GuiServerDevice acts as a mediator between the distributed system and GUI clients, which connect to it through (tcp) channels. The device centrally manages updates from the distributed system and pushes them to the clients. Conversely, it handles requests by clients and passes them on to devices in the distributed system.
Inherits from karabo::core::Device
Private Functions
-
void
initUsersActionsLog
()¶ Initializes the user actions log.
The log contains entries describing the writing actions the GUI Server performed upon request of a user. It is separated from the remaining device server logs.
-
void
logUserAction
(const WeakChannelPointer &channel, const std::string &entryText)¶ Adds an entry with a given text to the user actions log.
- Parameters
channel
: The TCP Channel connecting the GUI Server to the GUI Client that originated the action execution request.entryText
: A description of the action (and possibly its parameters)
-
void
loggerMapConnectedHandler
()¶ Wrapping requestNoWait
-
void
postReconfigure
()¶ Called if configuration changed from outside.
-
void
startDeviceInstantiation
()¶ Starts the deadline timer which throttles device instantiation.
-
void
startNetworkMonitor
()¶ Starts the deadline timer which triggers network stats collection
-
void
startMonitorConnectionQueues
(const karabo::data::Hash ¤tSuspects)¶ Starts the deadline timer which monitors connection queues
- Parameters
currentSuspects
: Hash with pending message counts - keys are bad client addresses
-
void
collectNetworkStats
(const boost::system::error_code &error)¶ Perform network stats collection
-
void
deferredDisconnect
(WeakChannelPointer channel)¶ writes a message to the specified channel with the given priority
- Parameters
channel
:message
:prio
: Deferred disconnect handler.
-
void
safeAllClientsWrite
(const karabo::data::Hash &message, int prio = LOSSLESS)¶ writes message to all channels connected to the gui-server device
- Parameters
message
:prio
:
-
void
sendLoginErrorAndDisconnect
(const karabo::net::Channel::Pointer &channel, const std::string &userId, const std::string &cliVersion, const std::string &errorMsg)¶ Sends a login error message to the client currently connected and closes the connection after a time interval elapses.
- Parameters
channel
: the channel the client to be notified and disconnected is connected.userId
: the id of the user whose login attempt failed.cliVersion
: the version of the GUI client attempting to login.errorMsg
: the error message to be sent to the client.
-
void
onError
(const karabo::net::ErrorCode &e, WeakChannelPointer channel)¶ an error specified by ErrorCode e occurred on the given channel. After an error the GUI-server will attempt to disconnect this channel.
- Parameters
e
:channel
:
-
bool
violatesReadOnly
(const std::string &type, const karabo::data::Hash &info)¶ validate the incoming type and info hash if a readOnly command is requested to be executed
- Return
- bool whether the request violates read-only restrictions
- Parameters
type
:info
:
-
bool
isProjectLoadingReplyType
(const std::string &replyType)¶ Checks whether a given reply type requested by a GUI Client is for a request involved in the Load Project operation.
- Return
- bool is the reply type involved in the Load Project operation?
- Parameters
replyType
: the string that specifies the reply type.
-
bool
violatesClientConfiguration
(const std::string &type, WeakChannelPointer channel)¶ validates the client configuration
currently only validating the type versus the client version.
- Return
- bool whether the request violates client validation
- Parameters
type
:channel
:
-
void
onGuiInfo
(const karabo::data::Hash &hash)¶ an info further specified by hash occurred on a connection to a GUI client. The GUI-server will attempt to forward the info to the debug channel of the GUI client.
- Parameters
hash
:
-
void
onConnect
(const karabo::net::ErrorCode &e, karabo::net::Channel::Pointer channel)¶ connects a client on to the GUI server on channel. The channel is registered with two priority handlers: remove oldest and loss-less. The onRead and onError handlers are registered to handle incoming data and faults on the channel. Both upon successful completion and exceptions in the process the acceptor socket of the GUI-server is re-registered so that new client connections may be established.
- Parameters
e
: holds an error code if any error occurs when calling this slotchannel
:
-
void
registerConnect
(const karabo::util::Version &version, const karabo::net::Channel::Pointer &channel, const std::string &userId = "", const std::string &oneTimeToken = "")¶ Creates an internal ChannelData structure mapped to the TCP Channel in charge of the connection between the GUI Client and the GUI Server. Also updates the number of connected clients property of the GUI Server.
Called after a successful user authorization based on a one-time token (when the GUI Server requires user authentication).
For GUI Servers that don’t require user authentication / authorization, it’s called right after the message of “type” “login” is received by the server and the client’s version is verified as one being supported by the server. The client’s version verification is also performed by a GUI Server that requires authentication (right before the token validation).
- Note
- The access level is not being saved in the internal ChannelData structure because all the access level enforcement is currently client side only. If any enforcement is required on the server side, the access level information must be stored in ChannelData and updated whenever the user downgrades his/her access level on the GUI client.
- Parameters
version
: the version of the connected GUI clientchannel
: the TCP channel for the connection being registereduserId
: the ID of the user logged in the connected GUI ClientoneTimeToken
: the one-time token resulting from the user authentication previously triggered by GUI client - the token is used by the GUI Server to validate the authentication and to authorize the user (send the user’s login access level back to the GUI Client).
-
void
onWaitForLogin
(const karabo::net::ErrorCode &e, const karabo::net::Channel::Pointer &channel, karabo::data::Hash &info)¶ Handler for login messages expected to be sent by a GUI Client right after it establishes a TCP connection to the GUI Server.
Keeps discarding and logging warnings for any message whose “type” is not “login”. When such an unexpected message is received, the GUI server binds this handler again to the TCP channel.
When a message of “type” of “login” is received, its handling is delegated to onLogin(const karabo::net::ErrorCode&, const karabo::net::Channel::Pointer&, karabo::data::Hash&)
- Parameters
e
: holds an error code if the eventloop cancels this task or the channel is closedchannel
: the TCP channel for the recently established connection with a GUI clientinfo
: a Hash containing the message “type” and, for “login” messages, some info related to the login process, like the version of the connected GUI client and some user related info, like the userID or the oneTimeToken the GUI Client received upon successfully authenticating the user logging in.
-
void
onLogin
(const karabo::net::Channel::Pointer &channel, const karabo::data::Hash &info)¶ Handles a login request of a user on a GUI client. If the login credentials are valid, the current system topology is sent to the client.
- Note
- for clients >= 2.20 a login message with no OneTimeToken is interpreted by an authenticated GUI Server as a request for a read-only session. The GUI Server will respond to such messages with Access Level OBSERVER and the read-only flag set to true. For login messages with OneTimeToken the read-only flag will be always set to false and the Access Level will be the one returned by the Karabo Authentication Server.
- Parameters
channel
: the TCP channel between the GUI client and the GUI serverinfo
: Hash with information needed to validate the login request.
-
void
onTokenAuthorizeResult
(const WeakChannelPointer &channel, const std::string &userId, const karabo::util::Version &cliVersion, const std::string &oneTimeToken, const bool isLoginOverLogin, const karabo::net::OneTimeTokenAuthorizeResult &authResult)¶ Handles the result of the authorize one-time token operation performed as part of a GUI client login on behalf of an authenticated user.
- Note
- a login over an existing user session has the potentially “desired side-effect” of just “refreshing” an existing user session, by updating its start time. This is useful when the maximum retention time for a token session is about to expire - the user can be instructed to refresh his/her login to keep going on the same GUI Client session. This is the primary reason for the GUI Server not caring if the login over login corresponds to a user change or not.
- Parameters
channel
: the communication channel established with the GUI client logging in.userId
: the ID of the user on whose behalf the login is being made.cliVersion
: the version of the GUI client logging in.oneTimeToken
: the one-time token sent by the GUI client logging in.isLoginOverLogin
: is the token being authorized in the context of a login over an existing user session (true) or of a login that is starting a completely new user session?authResult
: the result of the one-time token authorization operation to be handled.
-
void
onTemporarySessionExpiration
(const ExpiredTemporarySessionInfo &info)¶ Handles a temporary session expired event communicated by the internal instance of the GuiServerTemporarySessionManager.
The expiration is handled by sending a message of type “onTemporarySessionExpired” to the client associated with the expired token. The message carries a Hash with paths “expiredToken” and “expirationTime”.
- Parameters
info
: data about the expired temporary session.
-
void
onEndTemporarySessionNotice
(const EminentExpirationInfo &info)¶ Handles a “temporary session about to expire” event.
The eminent temporary session end is handled by sending a message of type “onEndTemporarySessionNotice” to the client associated with the about to expire token. The message carries a Hash with paths “toExpireToken” and “secondsToExpiration”.
- Parameters
info
: data about the temporary session about to expire.
-
void
onRead
(const karabo::net::ErrorCode &e, WeakChannelPointer channel, karabo::data::Hash &info, const bool readOnly)¶ handles incoming data in the Hash
info
fromchannel
. The further actions are determined by the contents of thetype
property ininfo
. Valid types and there mapping to methods are given in the following table:onRead
allowed types¶type resulting method call reconfigure onReconfigure execute onExecute getDeviceConfiguration onGetDeviceConfiguration getDeviceSchema onGetDeviceSchema getClassSchema onGetClassSchema initDevice onInitDevice killServer onKillServer killDevice onKillDevice startMonitoringDevice onStartMonitoringDevice stopMonitoringDevice onStopMonitoringDevice getPropertyHistory onGetPropertyHistory getConfigurationFromPast onGetConfigurationFromPast subscribeNetwork onSubscribeNetwork requestNetwork onRequestNetwork info onGuiInfo requestGeneric onRequestGeneric subscribeLogs <no action anymore> setLogPriority onSetLogPriority beginTemporarySession onBeginTemporarySession endTemporarySession onEndTemporarySession Both upon successful completion of the request or in case of an exception the
onRead
function is bound to the channel again, maintaining the connection of the client to the gui-server.- Parameters
e
: holds an error code if the eventloop cancel this task or the channel is closedchannel
:info
:readOnly
:
-
void
setTimeout
(karabo::xms::SignalSlotable::Requestor &requestor, const karabo::data::Hash &input, const std::string &instanceKey)¶ Sets the appropriate timeout to a Requestor
If input has a “timeout” key, set the maximum value of that and the gui server timeout on the requestor, except if input.get<std::string>(instanceKey) is one instance of the classes in “ignoreTimeoutClasses”.
-
void
forwardReconfigureReply
(bool success, WeakChannelPointer channel, const karabo::data::Hash &input)¶ Callback helper for
onReconfigure
- Parameters
success
: whether call succeededchannel
: who requested the callinput
: will be copied to the keyinput
of the reply message
-
void
forwardHashReply
(bool success, WeakChannelPointer channel, const karabo::data::Hash &info, const karabo::data::Hash &reply)¶ Callback helper for generic actions called by the gui server.
- Parameters
success
: whether call succeededchannel
: who requested the callinfo
: the input info Hashreply
: the reply from the remote device or an empty Hash on failure
-
void
onRequestGeneric
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Request a generic action internally. Generic interface to call slots that take a single Hash as argument and reply with a single Hash.
- Parameters
channel
: from which the request originatesinfo
: is a Hash that should contain the slot information.- type: requestGeneric
- instanceId: the instanceId to be called
- slot: the slot name of the instance
- empty: if this property is provided, the input Hash is not bounced back
- replyType (optional): the value of the key
type
in the reply to the client - timeout (optional) [s]: account for the slot call a specified timeout in seconds!
- args: The Hash containing the parameters for the slot call
The
forwardHashReply
method is used to relay information to the gui client.Returns:
In the default case, the return Hash is composed as follows::
- success: boolean to indicate if the generic request was successful
- reason: information on the error if not successful otherwise empty
- type: if specified in the input Hash, the
replyType
is used otherwiserequestGeneric
- request: the full input Hash information, including
args
- reply: The reply Hash of the instanceId
.. note: If the info Hash from the client provides an
empty
property, an empty Hash is sent back to the client instead of the input Hash.
-
void
onReconfigure
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Calls the Device::onReconfigure slot on the device specified in
info
.The info should have the following entries:
- string at
deviceId
defines the target device - Hash at
configuration
is the configuration update to apply - bool at
reply
: if given and true, success or failure will be reported back to channel by a message of typereconfigureReply
that containsinput
: the Hash given here asinfo
success
: bool whether reconfiguration succeededfailureReason
: string with failure reason
- optional int at
timeout
: if a reply should be reported back, defines seconds of timeout. In casetimeout
is missing, timeout errors will reportsuccess
as true but provides afailureReason
mentioning the timeout
- Parameters
channel
: to potentially send “reconfigureReply”info
:
- string at
-
void
onBeginTemporarySession
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Handles a message of type “beginTemporarySession” by starting a temporary session on top of the current user-authenticated session (if there’s one). The session begining is an asynchronous operation whose completion (either successful or not) will be handled by the onBeginTemporarySessionResult method.
- Parameters
channel
: the TCP channel connecting to the client that requested the begining of the temporary session.info
: a Hash which is supposed to contain an “temporarySessionToken” whose value is a one-time token that must be successfully authorized for the temporary session to be started.
-
void
onBeginTemporarySessionResult
(WeakChannelPointer channel, karabo::data::Schema::AccessLevel levelBeforeTemporarySession, const BeginTemporarySessionResult &result)¶ Handles the result of an “beginTemporarySession” request sent by a connected client.
- Parameters
channel
: the TCP channel connecting to the client that requested the temporary session that will be used to send a message of type “onBeginTemporarySession” with the begin operation results back to the client.levelBeforeTemporarySession
: sent by the client as part of the begin temporary session request to be sent back when the temporary session ends.result
: the results of the begin temporary session operation that will be sent back to the client.
-
void
onEndTemporarySession
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Handles a message of type “endTemporarySession” by ending the current temporary session (if there’s one). The end of the session is performed synchronously (there’s no I/O involved) and its results are transmitted back to the client through a message of type “onEndTemporarySession”.
- Note
- the hash with the results of the ending operation sent back to the requesting client has the fields “success”, “reason” and “temporarySessionToken” (an echo of the token provided in the request).
- Parameters
channel
: the TCP channel connecting to the client that requested the end of the temporary session. Will be used to send the response back to the client.info
: a Hash which is supposed to contain an “temporarySessionToken” whose value is a one-time token that must match the one associated to the temporary session being terminated.
-
void
forwardExecuteReply
(bool success, WeakChannelPointer channel, const karabo::data::Hash &input)¶ Callback helper for
onExecute
- Parameters
success
: whether call succeededchannel
: who requested the callinput
: will be copied to the keyinput
of the reply message
-
void
onExecute
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Calls a
command
slot on a specified device.The info should have the following entries:
- string at
deviceId
defines the target device - string at
command
is the slot to call - bool at
reply
: if given and true, success or failure will be reported back to channel by a message of typeexecuteReply
that containsinput
: the Hash given here asinfo
success
: bool whether execution succeededfailureReason
: string with failure reason
- optional int at
timeout
: if a reply should be reported back, defines seconds of timeout. In casetimeout
is missing, timeout errors will reportsuccess
as true but provides afailureReason
mentioning the timeout- Parameters
channel
:info
:
- string at
-
void
onInitDevice
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Enqueues a future device instantiation. The relevant information will be stored in
m_pendingDeviceInstantiations
andinitSingleDevice
will take care of the actual instantiation when it is called by the instantiation timer.- Parameters
channel
:info
:
-
void
initSingleDevice
(const boost::system::error_code &error)¶ Instructs the server at
serverId
to try initializing the device atdeviceId
as given ininfo
. The reply from the device server is registered to theinitReply
callback.NOTE: This should only be called by
m_deviceInitTimer
- Parameters
error
:
-
void
initReply
(WeakChannelPointer channel, const std::string &givenDeviceId, const karabo::data::Hash &givenConfig, bool success, const std::string &message, bool isFailureHandler)¶ is the callback for the
onInitDevice
method. It is called upon reply from the device server handling the initialization request. The reply is passed to the callingchannel
in form of a hash message withtype=initReply
,deviceId
,success
andmessage
fields.- Parameters
channel
:givenDeviceId
:givenConfig
:success
:message
:
-
void
onGetDeviceConfiguration
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ requests the current device configuration for
deviceId
specified ininfo
and sends it back in a hash message onchannel
. The message contains the following fields:type=deviceConfiguration
,deviceId
andconfiguration
. The configuration is retrieved using the device client interface.- Parameters
channel
:info
:
-
void
onKillServer
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ instructs the server specified by
serverId
ininfo
to shutdown.- Parameters
info
:
-
void
onKillDevice
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ instructs the device specified by
deviceId
ininfo
to shutdown.- Parameters
info
:
-
void
onStartMonitoringDevice
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Registers a monitor on the device specified by
deviceId
ininfo
Upon changes of device properties they will be forwarded tochannel
from a handler for changes in configurations of monitored devices that is kept internally by the gui-server.Only one channel per client is maintained for passing monitoring information and only one monitor is registered by the gui-server for any number of clients monitoring
deviceId
.After successful registration the current device configuration is returned by calling
onGetDeviceConfiguration
forchannel
.- Parameters
channel
:info
:
-
void
onStopMonitoringDevice
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ De-registers the client connected by
channel
from the device specified bydeviceId
ininfo
. If this is the last channel monitoringdeviceId
the device is removed from the set of devices monitored by the device-client.- Parameters
channel
:info
:
-
void
onGetClassSchema
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ requests a class schema for the
classId
on the server specified byserverId
ininfo
. This is done through the device client. A hash reply is sent out overchannel
containingtype=classSchema
,serverId
,classId
andschema
.- Parameters
channel
:info
:
-
void
onGetDeviceSchema
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ requests a device schema for the device specified by
deviceId
ininfo
. This is done through the device client. A hash reply is sent out overchannel
containingtype=deviceSchema
,deviceId
, andschema
.- Parameters
channel
:info
:
-
void
onGetPropertyHistory
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ requests the history for a
property
ondeviceId
in the time ranget0
andt1
as specified ininfo
. Additional the maximum number of data points may be specified inmaxNumData
. The request is asynchronously sent to the device logger logging information fordeviceId
. The reply from the logger is then forwarded to the client onchannel
using thepropertyHistory
history callback.- Parameters
channel
:info
:
-
void
propertyHistory
(WeakChannelPointer channel, bool success, const std::string &deviceId, const std::string &property, const std::vector<karabo::data::Hash> &data)¶ Callback for
onGetPropertyHistory
. It forwards the history reply indata
for theproperty
ondeviceId
to the client connected onchannel
. The hash reply is of the formattype=propertyHistory
,deviceId
,property
,success
,data
andfailureReason
which states the failure reason if any.- Parameters
channel
:success
: whether the request succeededdeviceId
:property
:data
:
-
void
onGetConfigurationFromPast
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Request configuration for a
device
at point in timetime
as specified ininfo
. Theinfo
hash can as well have apreview
boolean which is send back to the client. The request is asynchronously sent to the device logger logging information fordeviceId
. The reply from the logger is then forwarded to the client onchannel
using theconfigurationFromPast
history callback in case of success orconfigurationFromPastError
for failures.
-
void
configurationFromPast
(WeakChannelPointer channel, const std::string &deviceId, const std::string &time, const bool &preview, const karabo::data::Hash &config, const karabo::data::Schema&, const bool configAtTimepoint, const std::string &configTimepoint)¶ Success callback for
onGetDeviceConfiguration
-
void
configurationFromPastError
(WeakChannelPointer channel, const std::string &deviceId, const std::string &time)¶ Failure callback for
onGetDeviceConfiguration
-
std::string
getDataReaderId
(const std::string &deviceId) const¶ Helper for history retrieval functions
- Return
- id of DataLogReader device to ask for history
- Parameters
deviceId
: of the device whose history is searched for
-
void
onSubscribeNetwork
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ registers the client connected on
channel
to a pipe-lined processing channel identified bychannelName
ininfo
in casesubscribe
is true. In case the pipe-lined processing channel is already connected to the gui-server no further action is taken. Otherwise, a new connection is opened, set to copy and dropping behaviour in case the gui-server is busy, and with a maximum update frequency as defined by thedelayOnInput
property of the gui server. Network data from the pipe-lined processing connection is handled by theonNetworkData
callback.In this way only one connection to a given pipe-lined processing channel is maintained, even if multiple gui-clients listen to it. The gui-server thus acts as a kind of hub for pipe-lined processing onto gui-clients.
If
subscribe
is set to false, the connection is removed from the list of registered connections, but is kept open.- Parameters
channel
:info
:
-
void
onSubscribeLogs
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Kept to reply back that log subscription not supported anymore after 2.16.X
- Parameters
channel
:info
:
-
void
onSetLogPriority
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ sets the Log priority on a server. The
info
hash should contain apriority
string and ainstanceId
string.- Parameters
channel
:info
:
-
void
forwardSetLogReply
(bool success, WeakChannelPointer channel, const karabo::data::Hash &input)¶ Callback helper for
onSetLogPriority
- Parameters
success
: whether call succeededchannel
: who requested the callinput
: will be copied to the keyinput
of the reply message
-
void
onRequestNetwork
(WeakChannelPointer channel, const karabo::data::Hash &info)¶ Receives a message from the GUI client that it processed network data from an output channel with name
channelName
in the info Hash.- Parameters
channel
:info
:
-
void
onNetworkData
(const std::string &channelName, const karabo::data::Hash &data, const karabo::xms::InputChannel::MetaData &meta)¶ handles data from the pipe-lined processing channels the gui-server is subscribed to and forwards it to the relevant client channels, which have connected via
onSubscribeNetwork
. The incoming data is forwarded to all channels connected to this pipe-lined processing channel using the following hash message format:type=networkData
,name
is the channel name anddata
holding the data.- Parameters
channelName
: name of the InputChannel that provides these datadata
: the data coming from channelNamemeta
: corresponding meta data
-
void
sendSystemTopology
(WeakChannelPointer channel)¶ sends the current system topology to the client connected on
channel
. The hash reply containstype=systemTopology
and thesystemTopology
.- Parameters
channel
:
-
void
instanceNewHandler
(const karabo::data::Hash &topologyEntry)¶ sends the current system topology to the client connected on
channel
. The hash reply containstype=systemVersion
and thesystemVersion
.- Parameters
channel
:
-
void
instanceChangeHandler
(const karabo::data::Hash &instChangeData)¶ Handles events related to instances: new instance, instance updated, instance gone.
: Its signature matches karabo::core::InstanceChangeThrottler::InstanceChangeHandler).
-
void
devicesChangedHandler
(const karabo::data::Hash &what)¶ Acts upon incoming configuration updates from one or more devices. It is called back by a monitor registered on the device client. The reconfiguration contained in the
what
hash is forwarded to any channels connected to the monitor byonStartMonitoringDevice
.The message type of the hash sent out is type=”deviceConfigurations”. The hash has a second first level key, named “configurations”, whose value is a hash with the deviceIds as keys and the configuration changes for the corresponding deviceId as values.
- Parameters
what
: A hash containing all the configuration changes that happened to one or more monitored devices since the last update. Each node under the key “configurations” has the ‘deviceId’ as key and the changed configurations as a value of type Hash.
-
void
slotGetClientSessions
(const karabo::data::Hash &options)¶ Retrieve information about the current client sessions of the GUI server.
The reply is a hash with a single key, “clientSessions”, whose value is a vector of hashes with one hash per existing client connection. The hash for each connection has the following keys:
. "clientVersion": string with the version of the connected client; . "sessionStartTime": UTC timestamp of session start time as an ISO8601 string; . "sessionToken": one-time token for the client session. Will be empty if the GUI Server is not in authenticated mode; . "temporarySessionStartTime": UTC timestamps of temporary session start time as an ISO8601 string. If there's no active temporary session on top of the client session, an empty string is returned; . "temporarySessionToken": one-time token for the temporary session (an empty string if there's no active temporary session).
- Parameters
options
: Hash with a single key “onlyWithTempSession” and a boolean value that when “true” makes the slot include only client sessions with active temporary sessions in the reply.
-
void
slotProjectUpdate
(const karabo::data::Hash &info, const std::string &instanceId)¶ Called from projectManagers to notify about updated Projects
- Parameters
info
: the info hash containing the information about the updated projectsinstanceId
: the instance id of the project manager device
-
void
slotDumpToLog
()¶ Slot to dump complete debug info to log file
Same info as received from ‘slotDumpDebugInfo’ with empty input Hash
-
void
slotNotify
(const karabo::data::Hash &info)¶ Slot to send a notification message to all clients connected - replies empty Hash
- Parameters
info
: a Hash with following keys- ”message”: a string containing the notification type
- ”contentType”: a string defining the type of notification as the GUI client understands it
- ”banner” means message will go to the GUI banner. Therefore it will be stored in the “bannerMessage” property of the GuiServerDevice and sent to any client that connects.
- other types will likely just be shown in a pop-up window of the client
-
void
slotBroadcast
(const karabo::data::Hash &info)¶ Slot to send a Hash to the GUI clients connected - replies empty Hash
WARNING: No checks are performed on this slot. This slot can possibly disconnect all clients. Do not use it unless you understand the risks.
- Parameters
info
: a Hash with at least the following keys.- ”message”: a Hash that will be sent to the client(s). It should contain a “type” string.
- ”clientAddress”: a string containing the GUI client address as coded in the
slotDumpDebugInfo
results. If the value for this key is an empty string, all clients will be notified.
-
void
requestScene
(const karabo::data::Hash &info)¶ Slot to provide scene
- Parameters
info
: Hash with key “name” that provides string identifying which scene
-
karabo::data::Hash
getDebugInfo
(const karabo::data::Hash &info)¶ Helper for ‘slotDumpToLog’ and ‘slotDumpDebugInfo’
-
void
slotDisconnectClient
(const std::string &client)¶ Slot to force disconnection of client. Reply is whether specified client found.
- Parameters
client
: string to identify client, as can be received via slotDumpDebugInfo(Hash(“clients”, 0))
-
void
typeAndInstanceFromTopology
(const karabo::data::Hash &topologyEntry, std::string &type, std::string &instanceId)¶ Returns the instance type and instance id from a topology entry
- Parameters
topologyEntry
: a Hash of the topology formattype
: string which will afterwards contain typeinstanceId
: string which will be filled with the instance id
-
bool
allowLock
() const¶ This device may not be locked
- Return
- false
-
void
registerPotentialProjectManager
(const karabo::data::Hash &topologyEntry)¶ Checks if an instance at instanceId is a ProjectManager. If so, register it to the list of known project services
- Parameters
topologyEntry
: the topology Hash, from which the class of instanceId will be deduced
-
std::vector<std::string>
getKnownProjectManagers
() const¶ Return a list of project services known to this GUI server
- Return
-
bool
checkProjectManagerId
(WeakChannelPointer channel, const std::string &deviceId, const std::string &type, const std::string &reason)¶ Check if a given project manager identified by id is known in the distributed system
- Return
- true if the project manager id exists in the distributed system
- Parameters
channel
: to forward a failure message to if notdeviceId
: of the project manager devicetype
: of the request
-
std::string
getChannelAddress
(const karabo::net::Channel::Pointer &channel) const¶ Utility for getting a “name” from client connections.
-
bool
skipExecutionTimeout
(const std::string &deviceId)¶ Helper Function to identify whether a device belongs to the timeout violation list TODO: remove this once “fast slot reply policy” is enforced
returns true if a
.timeout()
should be skipped on execution requestor
-
void
recalculateTimingOutDevices
(const karabo::data::Hash &topologyEntry, const std::vector<std::string> &timingOutClasses, bool clearSet)¶ Helper Function to recalculate the list of timeout violating devices from the list of offending classes TODO: remove this once “fast slot reply policy” is enforced
Private Static Attributes
-
const std::string
m_errorDetailsDelim
¶ In reported failure reasons, this delimiter comes between short message and details like a trace.
-
void
-
class
GuiServerTemporarySessionManager
¶ - #include <GuiServerTemporarySessionManager.hh>
Manages temporary sessions on top of user-authenticated GUI Sessions.
Takes care of authorizing one-time temporary session tokens to start temporary sessions and of communicating temporary sessions about to expire or already expired.
Inherits from std::enable_shared_from_this< GuiServerTemporarySessionManager >
Public Functions
-
GuiServerTemporarySessionManager
(const std::string &topic, const std::string &authServerUrl, unsigned int temporarySessionDurationSeconds, unsigned int temporarySessionEndNoticeSeconds, EminentExpirationHandler onEminentExpiration, ExpirationHandler onExpiration)¶ Construct a new Gui Server Temporary Session Manager object.
- Parameters
topic
: the Karabo topic against which temporary session tokens will be authorized.authServerUrl
: the URL of the authentication server to use for authorizing one-time temporary session tokens.temporarySessionDurationSeconds
: the duration, in seconds, to be enforced for temporary sessions.temporarySessionEndNoticeSeconds
: the time in advance, in seconds, to communicate about an eminent end of temporary session event.onEminentExpiration
: handler for temporary sessions about to expire.onExpiration
: handler for expired temporary sessions.
-
void
beginTemporarySession
(const std::string &temporarySessionToken, const BeginTemporarySessionHandler &onBeginTemporarySession)¶ Asynchronously starts a new temporary session for a given one-time temporary session token.
- Note
- Calls the registered BeginTemporarySessionHandler with the results of the beginTemporarySession operation.
- Parameters
temporarySessionToken
: the one-time temporary session token to be authorized and bound to the started temporary session.onBeginTemporarySession
: handler for begin temporary session events (either successful or failed).
-
EndTemporarySessionResult
endTemporarySession
(const std::string &temporarySessionToken)¶ Synchronously terminates a temporary session referenced by a given temporary session token.
- Return
- a structure with the endTemporarySession operation results.
- Note
- an error due to a beginTemporarySession token not found isn’t necessarily an error from the GUI client point of view. In the unlikely scenario of an endTemporarySession request that reaches the GUI server while the expiration check that will detect the expiration of the same token is already running, the end temporary session request will “fail” with a “token not found” message. It is up to the GUI client to decide what to do in such cases - maybe keep track of an “over the wire” end temporary session request token and ignore any error related to it if an expiration notification is received for that token between the request dispatch and the arrival of its response.
- Parameters
temporarySessionToken
: the one-time temporary session token bound to the session to be terminated.
Private Functions
-
void
checkTemporarySessionsExpirations
(const boost::system::error_code &error)¶ Checks the currently active temporary sessions removing the expired ones after invoking the registered expiration handlers for each of them.
- Parameters
error
: an error code sent by boost::asio that if different from 0 indicates that the timer pulse that should invoke this check at some future point has been cancelled.
-
void
onTokenAuthorizeResult
(const std::string &temporarySessionToken, const BeginTemporarySessionHandler &onBeginTemporarySession, const karabo::net::OneTimeTokenAuthorizeResult &authResult)¶ Handles the result of a temporary session token authorization request, updating the internal state of the manager and communicating the outcome of the begin temporary session request to the external requestor.
- Parameters
temporarySessionToken
: the one-time temporary session token whose authorization was requested.onBeginTemporarySession
: handler for begin temporary session events (either successful or failed).authResult
: the result of the authorization of the temporary session token provided by the external caller of the begin temporary session operation.
-
void
scheduleNextExpirationsCheck
()¶ Schedules the next expiration check if there’s any escalation to be checked.
- Note
- this method must be called with the m_tempSessionsMutex locked.
-
-
class
IndexBuilderService
¶ - #include <FileLogReader.hh>
A singleton class for building logger indices from logger files. It calls karabo-idxbuild with a list of command line arguments.
Inherits from std::enable_shared_from_this< IndexBuilderService >
Public Functions
-
void
buildIndexFor
(const std::string &commandLineArguments)¶ Build an index by calling karabo-idxbuild with the supplied command line arguments
- Parameters
commandLineArguments
:
Public Static Functions
-
IndexBuilderService::Pointer
getInstance
()¶ Return a pointer to a singleton instance of IndexBuilderService. If no instance exists one is created.
- Return
Private Functions
-
bool
allowLock
() const¶ This device may not be locked
- Return
- false
-
void
-
class
InfluxDataLogger
¶ Inherits from karabo::devices::DataLogger
Public Functions
-
void
preDestruction
()¶ Override preDestruction from Device class
Protected Functions
-
void
initializeLoggerSpecific
()¶ Do some actions here that may require asynchronous logic … and, finally, startConnection() should be called This function may be overridden by derived classes but at the end the ‘startConnection’ function should be called as a last step of initialization
“Flush” data accumulated in the internal cache to the external storage (file, database,…)
-
void
-
struct
InfluxDeviceData
¶ Inherits from karabo::devices::DeviceData
Public Functions
-
void
handleChanged
(const karabo::data::Hash &config, const std::string &user)¶ Called when configuration updates arrive for logging
- Parameters
config
: a Hash with the updates and their timestampsthe
: user responsible for this update - if any
-
void
login
(const karabo::data::Hash &configuration, const std::vector<std::string> &sortedPaths)¶ Helper to store logging start event
- Parameters
configuration
: full device configuration received when logging startssortedPaths
: full paths of configuration, sorted by increasing timestamp
-
void
handleSchemaUpdated
(const karabo::data::Schema &schema, const karabo::data::Timestamp &stamp)¶ Called when a Schema update arrive for logging
- Parameters
schema
: - the new onestamp
: - the timestamp to be assigned for that update
-
unsigned int
newPropLogRate
(const std::string &propPath, karabo::data::Epochstamp currentStamp, std::size_t currentSize)¶ Calculates what the value of the property logging rate of the device will be when the logging of a value with a given size and a given timestamp is taken into account.
- Return
- The updated value of the property logging rate, in bytes/sec, taking the logging of the value into account.
- Parameters
prop
: The path of the property whose current logging rate will be evaluated.currentStamp
: The current property update timestamp.currentSize
: The size for the new data to be logged - this is used along with the other records in the current log rating window to calculate the new value for the property logging rate.
-
unsigned int
newSchemaLogRate
(std::size_t schemaSize)¶ Calculates what the value of the schema logging rate of the device will be when the logging of a schema with a given size is taken into account. As schemas currently don’t have associated time information, the current system time is used for all the timing references.
- Return
- The updated value of the schema logging rate, in bytes/sec, taking the logging of the schema into account.
- Parameters
schemaSize
: The size for the new schema to be logged - this is used along with the other records in the current log rating window to calculate the new value for the schema logging rate.
-
bool
logNewSchema
(const std::string &schemaDigest, const std::vector<char> &schemaArchive)¶ Logs a new schema into the corresponding device’s __SCHEMA measurement. It is assumed that the verification of the uniquiness of the device schema has already been verified based on its digest.
- Return
- true If the new schema has been successfuly submitted for logging.
- Return
- false If the logging of the new schema has not been submitted for logging. Currently, this happens if logging the new schema would be above the allowed schema logging rate threshold for a device.
- Parameters
schemaDigest
: The digest (assumed unique) of the new schema to be saved.schemaArchive
: The serialised schema to be saved
-
void
logRejectedData
(const std::vector<RejectedData> &rejects, unsigned long long ts)¶ Logs the given set of rejected data in the BAD__DATA measurement and to the Karabo log. To avoid spanning of the Karabo log, log is emmitted for each device only once in a period of 30 secs.
- Parameters
rejects
: The rejected data to be logged.ts
: An epoch with the precision expected in the InfluxDb.
-
void
logRejectedDatum
(const RejectedData &rejects)¶ Logs the given rejected data record in the BAD__DATA measurement and to the Karabo log. To avoid spanning of the Karabo log, log is emmitted for each device only once in a period of 30 secs.
- Parameters
rejects
: The rejected data to be logged.
-
struct
LoggingRecord
¶ - #include <InfluxDataLogger.hh>
The size, in characters, and the epoch seconds of a device log entry saved to Influx.
Used for calculating the logging rates associated to a device.
-
void
-
class
InfluxLogReader
¶ Inherits from karabo::devices::DataLogReader
Private Functions
Triggers the retrieval of the number of data points for a given device property during a time interval.
- Parameters
context
:
Handles the retrieval of the number of data points for an ongoing GetPropertyHistory process. Responsible for invoking the appropriate async method for retrieving the property values depending on the number of data points received.
- Parameters
dataCountResponse
:context
:
Triggers the retrieval of the property values in an ongoing GetPropertyHistory process.
- Parameters
context
:
Triggers the retrieval of the property values mean in an ongoing GetPropertyHistory process. This is used when the number of available data points for the property is larger than the maximum requested by the slot caller and all values are scalar numbers. The UINT64 properties are included in this despite being reinterpreted as INT64 on the backend and possibly returning incorrect data.
- Parameters
context
:
Triggers the retrieval of the property values samples in an ongoing GetPropertyHistory process. This is used when the number of available data points for the property is larger than the maximum requested by the slot caller.
- Parameters
context
:
Handles the retrieval of the values of a property in an ongoing GetPropertyHistory process. Responsible for transforming the json formatted values received from InfluxDbClient into a vector of hashes suitable to be returned to the slot caller. Also responsible for replying to the slot caller.
- Parameters
valuesResp
:columnPrefixToRemove
:context
:
Handles the retrieval of the values of a property in an ongoing GetPropertyHistory process. Responsible for transforming the json formatted values received from InfluxDbClient into a vector of hashes suitable to be returned to the slot caller. This function extends the functionality of
onPropertyValues
while keeping the property history protocol.Also responsible for replying to the slot caller.
- Parameters
valuesResp
:context
:
-
std::string
unescapeLoggedString
(const std::string &loggedStr)¶ Unescapes a logged string. A logged string has its new lines mangled, then its double slashes escaped and then its double quotes escaped. This functions applies those transformations in the reverse order.
- Return
- The unescaped original string.
- Parameters
loggedStr
: The string as it has been escaped by the Influx Logger.
-
bool
preHandleHttpResponse
(const karabo::net::HttpResponse &httpResponse, const karabo::xms::SignalSlotable::AsyncReply &asyncReply)¶ Performs an initial common handling of an HTTP response received by the Log Reader.
In the InfluxDb client <-> server communication context, any response with a status code greater or equal to 300 is considered an error and will be completely handled by this method. A specific status code, 503, indicates that the InfluxDb server was not available and puts the Log Reader in ERROR state. Any other error puts the Log Reader in ON state.
The error handling consists of sending the appropriate error reply to the caller of the InfluxLogReader slot affected by the error and of optionally disconnecting the InfluxDbClient used by the slot.
- Return
- true if the preHandle method completely processed the HttpResponse and no further action from the Log Reader is needed. This is the case for responses with status codes indicating errors. false if the response should still be processed by the response handler that called preHandleHttpResponse.
- Parameters
httpResponse
: the response that potentially indicates an error.asyncReply
: the reply to be sent to the caller of the slot where the error happened.
-
karabo::data::Epochstamp
toEpoch
(unsigned long long timeFromInflux) const¶ Convert a time point from influx to karabo Epochstamp
-
karabo::data::Hash
buildInfluxClientConfig
(const std::string &dbUrlForSlot) const¶ Builds and returns the configuration Hash for instantiating an InfluxDbClient to be used in the execution of one of the slots supported by the reader.
- Return
- the configuration Hash for the InfluxDbClient.
- Parameters
dbUrlForSlot
: the URL to be used in the configuration - each slot can use a different database URL.
-
class
PropertyTest
¶ - #include <PropertyTest.hh>
The PropertyTest device includes all types Karabo knows about in it’s expected parameter section. It is a test device to assure changes to the framework do not result in broken types.
Inherits from karabo::core::Device
Private Functions
-
void
orderTest_slotStart
()¶ The order test started with this slot works as follows:
- ’stringProperty’ defines the ‘other’ PropertyTest device supposed to send messages to us
- ’int32Property’ defines how many messages it should send
- the number of messages and our own id are transferred to the other device
- we connect our ‘slotCount’ to the other’s ‘signalCount’
- we call the other’s ‘slotStartCount’ which will trigger sending messages to us, alternating between direct calls to our ‘slotCount’ and emitting ‘signalCount’ with count arguments starting from 0
- we keep track of all counts received and their order (so do not run with billions of counts!)
- end of messaging is signaled to us via a call with count = -1
- we publish the number of received messages and those counts (well, up to 1000 only) that are not in order as “orderTest.receivedCounts” and “orderTest.nonConsecutiveCounts”, respectively.
-
void
-
struct
PropFileInfo
¶ - #include <FileLogReader.hh>
A compound structure holding data on an logger archive file.
-
typedef nlohmann::json
The karabo::io Namespace¶
Warning
doxygennamespace: Cannot find namespace “karabo::io” in doxygen xml output for project “KARABO” from directory: /usr/src/app/checkouts/readthedocs.org/user_builds/karabo/checkouts/latest/doc/.build/html/reference/xml
The karabo::net Namespace¶
-
namespace
karabo
::
net
¶ Namespace for package net
Typedefs
-
typedef boost::system::error_code
ErrorCode
¶
-
using
karabo::net::tcp = typedef boost::asio::ip::tcp
-
using
karabo::net::errorCode = typedef boost::beast::error_code
-
using
karabo::net::flatBuffer = typedef boost::beast::flat_buffer
-
using
karabo::net::getRequest = typedef boost::beast::http::request<boost::beast::http::empty_body>
-
using
karabo::net::HttpHeader = typedef boost::beast::http::field
-
using
karabo::net::HttpHeaders = typedef boost::beast::http::fields
-
using
karabo::net::HttpResponseHandler = typedef std::function<void(const boost::beast::http::response<boost::beast::http::string_body>&)>
-
using
karabo::net::postRequest = typedef boost::beast::http::request<boost::beast::http::string_body>
-
using
karabo::net::response = typedef boost::beast::http::response<boost::beast::http::string_body>
-
using
karabo::net::resolver = typedef boost::asio::ip::tcp::resolver
-
using
karabo::net::results_type = typedef boost::asio::ip::tcp::resolver::results_type
-
using
karabo::net::tcp_stream = typedef boost::beast::tcp_stream
-
using
karabo::net::ssl_stream = typedef boost::beast::ssl_stream<boost::beast::tcp_stream>
-
using
karabo::net::ssl_context = typedef boost::asio::ssl::context
-
using
karabo::net::verb = typedef boost::beast::http::verb
-
using
karabo::net::InfluxResponseHandler = typedef std::function<void(const HttpResponse&)>
-
using
karabo::net::InfluxConnectedHandler = typedef std::function<void(bool)>
-
typedef std::shared_ptr<std::vector<char>>
VectorCharPointer
¶
-
using
karabo::net::AuthOneTimeTokenHandler = typedef std::function<void(const OneTimeTokenAuthorizeResult&)>
-
using
karabo::net::AsyncHandler = typedef std::function<void(const boost::system::error_code)>
Enums
Functions
-
boost::system::error_code
make_error_code
(AmqpCppErrc e)¶
-
std::ostream &
operator<<
(std::ostream &os, const HttpResponse &o)¶
-
InfluxDbClient::Pointer
buildInfluxReadClient
()¶ Instantiates an InfluxDbClient that connects to an InfluxDb reading node.
The connection parameters for the InfluxDb reading node are obtained via the following environment variables:
KARABO_INFLUXDB_QUERY_URL KARABO_INFLUXDB_DBNAME (with fallback to the Broker domain) KARABO_INFLUXDB_QUERY_USER KARABO_INFLUXDB_QUERY_PASSWORD
- Return
- A std::shared_ptr to the built InfluxDbClient instance.
-
std::ostream &
operator<<
(std::ostream &os, const NetworkInterface &ni)¶ Send a string representation of the NetworkInterface object to a stream. (Mostly for debug purposes)
-
static karabo::data::Hash
extendHeaderForBufferSets
(const karabo::data::Hash &hdr, const std::vector<karabo::data::BufferSet::Pointer> &body)¶ Helper to extend header for writing header and BufferSet pointers.
- Return
- extended header
-
std::string
bareHostName
()¶ Return the bare host name after stripping domain (exflxxx12345.desy.de => exflxxx12345)
- Return
Wrapper around boost::asio::io_service::run that catches exceptions, logs them as errors and continues after some delay.
- Parameters
service
: shared pointer to the io_servicecategory
: the category used for loggingerrorMessage
: will be part of the logged errordelayInMilliSec
: is the delay after each catch
-
std::tuple<std::string, std::string>
parseGenericUrl
(const std::string &url)¶ Parses a URL and returns a tuple.
The URL must of format: <scheme>:<scheme-dependent-part>
- Return
- tuple containing scheme and scheme dependent part
- Parameters
url
: A well formed URL
-
std::tuple<std::string, std::string, std::string, std::string, std::string>
parseUrl
(const std::string &url)¶ Parses a HTTP-like URL and returns a tuple.
The URL must of format: <scheme>://<domain>:<port>/<path>?<query>
- Return
- tuple containing scheme, domain, port, path and query
- Parameters
url
: A well formed URL
-
string
urlencode
(const std::string &value)¶
-
std::string
getIpFromCIDRNotation
(const std::string &addressRange)¶ Returns an IP string from a Classless Inter-Domain Routing specification
e.g. the string 192.168.0.0/24 represents the IP range between 192.168.0.0 and 192.168.0.255.
The function will ignore loopback interface and interfaces that are down. Only IP4 specifications are implemented.
- Return
- an IP address matching the input range or the input string if the input string does not specify a network range or if it does not match any external active interface
Variables
-
const size_t
kDefaultQueueCapacity
= 5000¶
-
class
AmqpBroker
¶ Inherits from karabo::net::Broker
Public Functions
-
Broker::Pointer
clone
(const std::string &instanceId)¶ The function creates broker communication object with the new identity by cloning the parent object. Concrete meaning of cloning strategy is an implementation detail.
- Return
- new broker communication object
- Parameters
instanceId
: - unique ID
-
void
connect
()¶ This function establishes connection to the broker otherwise throws exception
-
void
disconnect
()¶ Close broker connection
-
bool
isConnected
() const¶ Predicate to check broker connection status
- Return
- true if connection is open
-
std::string
getBrokerUrl
() const¶ Get active URI used for establishing connection to broker
- Return
- uri like “mqtt://localhost:1883”
-
std::string
getBrokerType
() const¶ Get type string identifying broker. Example: “AmqpBroker”
- Return
- the type defined by active uri
-
boost::system::error_code
subscribeToRemoteSignal
(const std::string &signalInstanceId, const std::string &signalFunction)¶ Establish logical signal-slot connection between 2 devices that is required by used protocol for registration
- Parameters
signalInstanceId
: device instance ID of a signalsignalFunction
: signal name
-
boost::system::error_code
unsubscribeFromRemoteSignal
(const std::string &signalInstanceId, const std::string &signalFunction)¶ Close logical signal-slot connection. De-registration in broker specific API.
- Parameters
signalInstanceId
:signalFunction
:
-
void
subscribeToRemoteSignalAsync
(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler)¶ Establish signal-slot connection asynchronously
- Parameters
signalInstanceId
:signalFunction
:completionHandler
: this callback is called when complete
-
void
unsubscribeFromRemoteSignalAsync
(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler)¶ Unsubscribe from (remote) signal asynchronously
- Parameters
signalInstanceId
:signalFunction
:completionHandler
:
-
void
startReading
(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier = consumer::ErrorNotifier())¶ AMQP subscription: subscribe to the following exchanges… “m_domain.slots” with routingKey m_instanceId “m_domain.global_slots”
- Parameters
handler
: - success handlererrorNotifier
: - error handler
-
void
stopReading
()¶ Stop processing messages coming via main path
-
void
startReadingHeartbeats
(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier = consumer::ErrorNotifier())¶ Heartbeat is used for tracking instances (tracking all instances or no tracking at all)
AMQP subscription Subscribe to the exchange “m_domain.signals” with the routing key: “*.signalHeartbeat” heartbeats of all known connections
- Parameters
handler
: - success handlererrorNotifier
: - error handler
-
void
write
(const std::string &topic, const karabo::data::Hash::Pointer &header, const karabo::data::Hash::Pointer &body)¶ Write message to broker, blocks until written
- Parameters
topic
: Either the “domain” as passed to the Broker base class, the “domain” with the suffix “_beats”, or “karaboGuiDebug”header
: of the message - must containbody
: of the message
Public Static Functions
-
void
expectedParameters
(karabo::data::Schema &s)¶ AmqpBroker operates currently with the following set of …
Signals are sent to the exchange …
exchange = <domain>.signals routing_key = <signalInstanceId>.<signalName> < selector queue = <m_instanceId> < common queue
the signals are emitted to the exchange bound via routing_key to the queue. The slotInstanceIds should subscribe to the AMQP::topic type exchange with the ‘routing_key’ and queue = <slotInstanceId>
Special case of above signals… signalHeartbeat … exchange = <domain>.signals routing_key = <signalInstanceId>.signalHeartbeat queue = <m_instanceId>
Calls, commands, requests, replies are sent to
exchange = <domain>.slots routing_key = <slotInstanceId> queue = <m_instanceId> < common queue
all requests/calls/replies to the device send to this exchange The further message dispatching to slots is provided by using info in message header.
Broadcast messages should be sent to …
exchange = <domain>.global_slots routing_key = “” queue = <m_instanceId>
there is a way of implementing “broadcast” messages like in JmsBroker. In JMS it was enough to use “|*|” in header’s slotInstanceIds. In AMQP we have to be subscribed to such exchange (to receive broadcast messages). Known global slots: slotInstanceNew to announce the new device in Karabo network slotInstanceUpdated to announce the device info to be updated slotInstanceGone to announce device death, slotPing to trigger sending their status by all devices received such message
GUI debug
exchange = <domain>.karaboGuiDebug routing_key = “” queue = <as gui=”” debug=”” listener=”” defines>=”“>
GUI debugging channel
-
void
defaultQueueArgs
(AMQP::Table &args)¶ Fill argument with default AMQP message queue creation arguments
-
Broker::Pointer
-
class
AmqpClient
¶ - #include <AmqpClient.hh>
Class that exposes an AMQP client.
It will create a unique queue and consume from it exclusively and with automatic acknowledgment. Its queue name will start with the given instanceId and will potentially be suffixed by some characters to ensure uniqueness.
To actually receive messages via the handlers specified in the constructor, the client has to subscribe to exchanges, potentially with routing keys to select messages on the broker side.
- Note
- : This client does not know about Karabo “domains” (a.k.a. “topics”), i.e. exchanges and queues created are “shared” among all clients connected to the same broker.
Inherits from std::enable_shared_from_this< AmqpClient >
Public Types
-
enum
ChannelStatus
¶ Channel status tells what should be the next step to do in channel preparation
Values:
-
REQUEST
¶
-
CREATE
¶
-
CREATE_QUEUE
¶
-
CREATE_CONSUMER
¶
-
READY
¶
-
Public Functions
-
AmqpClient
(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, ReadHandler readHandler)¶ Create client with raw data interface from connection
- Parameters
connection
: the connection, all internal data access will run in its io contextinstanceId
: the client id - will usually be the name of the queue that will be subscribedqueueArgs
: the arguments passed to queue creationreadHandler
: a read handler for all received messages (if an invalid function, must call setReadHandler before the first subscription)
-
void
setReadHandler
(ReadHandler readHandler)¶ (Re-)set the read handler that will be called for all received messages
- Parameters
readHandler
: A valid read function (karabo::data::ParameterException if not valid)
-
void
asyncSubscribe
(const std::string &exchange, const std::string &routingKey, AsyncHandler onSubscriptionDone)¶ Asynchronously subscribes client
If subscription is reported to have failed, it will be tried again
- at next subscription or
- if reviveIfReconnected() is called.
- Parameters
exchange
: name of AMQP exchange that will be created if not yet existingroutingKey
: the AMQP routing keyonSubscriptionDone
: a valid handler called in AMQP io context (so please no mutex inside, please) when subscription established or failed
-
void
asyncUnsubscribe
(const std::string &exchange, const std::string &routingKey, AsyncHandler onUnsubscriptionDone)¶ Asynchronously unsubscribes client
Note: Success will be reported for an unsubscription from exchange/routing key that it was not subscribed before
- Parameters
exchange
: name of AMQP exchange that will be unsubscribed fromroutingKey
: the AMQP routing key to unsubscribe fromonUnsubscriptionDone
: a valid handler called in AMQP io context (so please no mutex inside, please) when unsubscription succeeded or failed
-
void
asyncUnsubscribeAll
(AsyncHandler onUnsubscriptionsDone)¶ Asynchronously unsubscribes client from all subscriptions
- Parameters
onUnsubscriptionDone
: a valid handler called in AMQP io context (so please no mutex inside, please) when all unsubscription requests are done. If any of them failed, the error code passed is the one of the last failure
Asynchronously publish data
- Parameters
exchange
: the exchange…routingKey
: …and the routingkey for the datadata
: a raw data container fo the message to be published (must be non-zero pointer)onPublishDone
: handler called in AMQP io context (so please no mutex inside, please) when data published
-
void
reviveIfReconnected
()¶ Revice after connection was lost and re-established
Means to recreate channel, redo all subscriptions and publish postponed messages
To be called if AmqpConnection is connected again after connection loss Must be called within io context of AmqpConnection
Private Functions
-
void
asyncPrepareChannel
(AsyncHandler onChannelPrepared)¶ Prepare m_channel until it is ChannelStatus::READY
Must be called in the io context of the AmqpConnection
- Parameters
onChannelPrepared
: handler called when m_channel READY or if failure on the way
-
void
moveChannelState
()¶ Helper to move the created channel through its states, asynchronously calling itself. If READY (or failure), call and erase the m_channelPreparationCallback
Helper to publish, must run in io context and only when channel is READY and exchange declared
-
void
queueMessage
(PostponedMessage &&message)¶ Queue message (or drop if queueu too long), must run in io context
-
void
publishPostponed
()¶ Helper to publish postponed messages until first found with an exchange that is not yet declared
Private Members
-
std::queue<PostponedMessage>
m_postponedPubMessages
¶ Messages postponed since channel not yet ready or exchange not yet declared.
-
class
AmqpConnection
¶ - #include <AmqpConnection.hh>
Wraps the AMQP::TcpConnection and the single threaded io context where all calls to the amqp library must run.
Inherits from std::enable_shared_from_this< AmqpConnection >
Public Types
-
enum
State
¶ Connection states
Values:
-
eUnknown
= 2000¶
-
eStarted
¶
-
eNotConnected
¶
-
eConnectionDone
¶
-
eConnectionReady
¶
-
eConnectionClosed
¶
-
eConnectionError
¶
-
eConnectionLost
¶
-
-
using
ChannelCreationHandler
= std::function<void(const std::shared_ptr<AMQP::Channel>&, const std::string &errMsg)>¶ Handler for asyncCreateChannel
Either returns the channel or (if returned channel pointer is empty) state the failure reason.
Public Functions
-
AmqpConnection
(std::vector<std::string> urls)¶ Constructing a connection and starting the thread of the io context
- Parameters
urls
: vector of broker urls to try to connect to in asyncConnect (throws karabo::util::NetworkException if vector is empty)
-
std::string
getCurrentUrl
() const¶ Return currently used broker URL (either already connected to it or the currently/next tried one)
-
bool
isConnected
() const¶ Whether connection established
-
std::string
connectionInfo
() const¶ Various info about internal connection (for debug logs)
-
void
asyncConnect
(AsyncHandler onComplete)¶ Asynchronously connect to any of the broker addresses passed to the constructor.
Addresses will be tried in the order they have been passed. Can be called from any thread.
- Parameters
onComplete
: AsyncHAndler called (not from within asyncConnect) about success or failure of connection attempt. If all addresses failed, the error code passed is the one of the last address passed to the constructor. The handler (if valid) will be called from within the internal io context, but not within the scope of asyncConnect.
-
void
asyncCreateChannel
(ChannelCreationHandler onComplete)¶ Trigger creation of an amqp channel and return it via the handler.
If not connected yet, try to connect first. Note that if connection lost, channel creation will not be tried again, but failure is reported.
Can be called from any thread.
- Parameters
onComplete
: A valid (!) ChannelCreationHandler that will be called from within the internal io context, but not within the scope of asyncCreateChannel.
-
void
registerForReconnectInfo
(std::weak_ptr<AmqpClient> client)¶ Register client to be informed about re-established connection after connection loss
-
void
cleanReconnectRegistrations
()¶ Clean clients registered to receive reconnect info, i.e. remove all dangling weak pointers
Can e.g. be called in the destructor of a client that registered before.
- template <typename CompletionToken>
-
void
post
(CompletionToken &&task) const¶ Post a task to the io context
The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.
- template <typename CompletionToken>
-
void
dispatch
(CompletionToken &&task) const¶ Detach a task to the io context, i.e. run it now or later depending on which thread we are in
The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.
Private Functions
-
void
doAsyncConnect
()¶ Helper to asyncConnect iterating over urls until success or all urls tried. Then calls m_onConnectionComplete.
Prerequisite: m_urlIndex < m_urls.size()
-
void
informReconnection
(const boost::system::error_code &ec)¶ Must run in io context
-
void
triggerReconnection
()¶ Must run in io context
-
const char *
stateString
(AmqpConnection::State state)¶ Convert State to a string (or rather const char*)
Private Members
-
std::set<std::weak_ptr<AmqpClient>, std::owner_less<std::weak_ptr<AmqpClient>>>
m_registeredClients
¶ Track clients to inform about reconnections.
-
enum
-
class
AmqpHashClient
¶ - #include <AmqpHashClient.hh>
Class that wraps around AmqpClient to provide a message interface with Hash header and body.
Deserialisation of incoming messages is done via a karabo::net::Strand, i.e. a running karabo::net::EventLoop is needed.
Inherits from std::enable_shared_from_this< AmqpHashClient >
Public Functions
-
void
asyncSubscribe
(const std::string &exchange, const std::string &routingKey, AsyncHandler onSubscriptionDone)¶ Asynchronously subscribes client by just forwarding to AmqpClient::asyncSubscribe
==> See docs of that.
-
void
asyncUnsubscribe
(const std::string &exchange, const std::string &routingKey, AsyncHandler onUnsubscriptionDone)¶ Asynchronously unsubscribes client by just forwarding to AmqpClient::asyncUnsubscribe
==> See docs of that.
-
void
asyncUnsubscribeAll
(AsyncHandler onUnsubscriptionDone)¶ Asynchronously unsubscribes client from all subscriptions by just forwarding to AmqpClient::asyncUnsubscribeAll
==> See docs of that.
-
void
asyncPublish
(const std::string &exchange, const std::string &routingKey, const data::Hash::Pointer &header, const data::Hash::Pointer &body, AsyncHandler onPublishDone)¶ Asynchronously publish data from header and body
Hashes are serialised such that AmqpClient::asyncPublish can be use internally. ==> See docs of that.
Public Static Functions
-
AmqpHashClient::Pointer
create
(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, HashReadHandler readHandler, ErrorReadHandler errorReadHandler)¶ Create client with message interface based on two Hashes (header and body).
- Parameters
connection
: the connection, all internal data access will run in its io contextinstanceId
: the client id - will usually be the name of the queue that will be subscribedqueueArgs
: the arguments passed to queue creationreadHandler
: a valid read handler for all received messageserrorReadHandler
: a valid handler called when a received message could not be processed, e.g. due to serialisation problems
Private Functions
-
AmqpHashClient
(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, HashReadHandler readHandler, ErrorReadHandler errorReadHandler)¶ Internal constructor, use static create instead: raw clients read handler has to be set after construction
Handler passed to raw client (i.e. runs in io context of connection).
Post arguments for deserialzation on the respective strand that runs in Karabo event loop
Deserializes ‘data’ input into Hash for header and body, adds exchange and key to the header and calls handler passed to constructor
-
void
-
class
Broker
¶ Inherits from std::enable_shared_from_this< Broker >
Subclassed by karabo::net::AmqpBroker
Public Functions
-
virtual Broker::Pointer
clone
(const std::string &instanceId) = 0¶ The function creates broker communication object with the new identity by cloning the parent object. Concrete meaning of cloning strategy is an implementation detail.
- Return
- new broker communication object
- Parameters
instanceId
: - unique ID
-
virtual void
connect
() = 0¶ This function establishes connection to the broker otherwise throws exception
-
virtual void
disconnect
() = 0¶ Close broker connection
-
virtual bool
isConnected
() const = 0¶ Predicate to check broker connection status
- Return
- true if connection is open
-
virtual std::string
getBrokerUrl
() const = 0¶ Get active URI used for establishing connection to broker
- Return
- uri like “mqtt://localhost:1883”
-
virtual std::string
getBrokerType
() const = 0¶ Get type string identifying broker. Example: “AmqpBroker”
- Return
- the type defined by active uri
-
const std::string &
getInstanceId
() const¶ Get current instance ID associated with this broker object
- Return
- instanceId
-
const std::string &
getDomain
() const¶ Get the domain this broker is communicating to
- Return
- domain
-
void
setConsumeBroadcasts
(bool consumeBroadcasts)¶ Set flag defining the way how to handle broadcast messages. It influences on subscription to such messages, i.e. has to be called before startReading(..)
- Parameters
consumeBroadcasts
: true means subscription
-
virtual boost::system::error_code
subscribeToRemoteSignal
(const std::string &signalInstanceId, const std::string &signalFunction) = 0¶ Establish logical signal-slot connection between 2 devices that is required by used protocol for registration
- Parameters
signalInstanceId
: device instance ID of a signalsignalFunction
: signal name
-
virtual boost::system::error_code
unsubscribeFromRemoteSignal
(const std::string &signalInstanceId, const std::string &signalFunction) = 0¶ Close logical signal-slot connection. De-registration in broker specific API.
- Parameters
signalInstanceId
:signalFunction
:
-
virtual void
subscribeToRemoteSignalAsync
(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler) = 0¶ Establish signal-slot connection asynchronously
- Parameters
signalInstanceId
:signalFunction
:completionHandler
: this callback is called when complete
-
virtual void
unsubscribeFromRemoteSignalAsync
(const std::string &signalInstanceId, const std::string &signalFunction, const AsyncHandler &completionHandler) = 0¶ Unsubscribe from (remote) signal asynchronously
- Parameters
signalInstanceId
:signalFunction
:completionHandler
:
-
virtual void
startReading
(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier = consumer::ErrorNotifier()) = 0¶ Set up handlers for processing messages arriving via main communication path
- Parameters
handler
: - read handlererrorNotifier
: - error handler
-
virtual void
stopReading
() = 0¶ Stop processing messages coming via main path
-
virtual void
startReadingHeartbeats
(const consumer::MessageHandler &handler, const consumer::ErrorNotifier &errorNotifier) = 0¶ Set up handlers for processing heartbeat messages arriving via special path.
Heartbeat is used for tracking instances (tracking all instances or no tracking at all)
- Parameters
handler
: - read message handlererrorNotifier
: - error handler
-
virtual void
write
(const std::string &topic, const karabo::data::Hash::Pointer &header, const karabo::data::Hash::Pointer &body) = 0¶ Send message to broker
- Parameters
topic
:header
:body
:priority
:timeToLive
:
Public Static Functions
-
std::vector<std::string>
brokersFromEnv
()¶ Specifies the string of broker URLs from the environment variable KARABO_BROKER. If KARABO_BROKER is not defined, uses a hard coded fallback.
-
std::string
brokerTypeFromEnv
()¶ Specifies the broker type as the protocol of the broker URLs defined by brokersFromEnv(). Throws LogicException if broker addresses specified with different types or without protocol.
-
std::string
brokerTypeFrom
(const std::vector<std::string> &urls)¶ Specifies the broker type as the protocol of the given broker URLs. Throws LogicException if broker addresses specified with different types or without protocol.
-
std::string
brokerDomainFromEnv
()¶ Specify broker domain (i.e. topic for JmsBroker) from environment variables.
First source is KARABO_BROKER_TOPIC, as a fall back the environment variables LOGNAME, USER, LNAME and USERNAME are checked in that order.
-
virtual Broker::Pointer
-
class
Channel
¶ - #include <Channel.hh>
Represents a communication channel used for p2p messaging on a connection to a remote instance. This is only an interface, see TcpChannel for a concrete implementation using the tcp protocol.
Inherits from std::enable_shared_from_this< Channel >
Subclassed by karabo::net::TcpChannel
Public Functions
-
virtual Connection::Pointer
getConnection
() const = 0¶ Return a pointer to the connection this channels belongs to
- Return
-
virtual size_t
readSizeInBytes
()¶ Synchronously reads the message’s size. Will block until a message arrives on the socket.
- Return
- Size in bytes of incoming TCP message
-
virtual std::string
consumeBytesAfterReadUntil
(const size_t nBytes)¶ Synchronously reads size bytes and return them as a string. The reading will block until the bytes are read.
- Note
- reads up nBytes expecting no header. To be used ONLY after a readAsyncStringUntil operation in case some bytes must be read after readAsyncStringUntil has been used.
- Parameters
size
: This number of bytes will be copied into data
-
virtual void
read
(char *data, const size_t &size)¶ Synchronously reads size bytes into data. The reading will block until the data record is read.
- Parameters
data
: Pre-allocated contiguous block of memorysize
: This number of bytes will be copied into data
-
virtual void
read
(std::vector<char> &data)¶ This function reads into a vector of chars The reading will block until the data record is read.
- Parameters
data
: A vector which will be updated accordingly
-
virtual void
read
(std::string &data)¶ This function reads into a string The reading will block until the data record is read. CAVEAT: As string is not guaranteed to be represented by a contiguous block of memory this function will always introduce a copy under the hood.
- Parameters
data
: A string which will be updated accordingly
-
virtual void
read
(karabo::data::Hash &data)¶ This function reads into a hash. The reading will block until the data record is read. The reading will block until the data record is read.
- Parameters
data
: Hash object which will be updated
-
virtual void
read
(karabo::data::Hash &header, char *data, const size_t &size)¶ Synchronously reads size bytes from socket into data and provides a header. The reading will block until the data record is read.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: Pre-allocated contiguous block of memorysize
: This number of bytes will be copied into data
-
virtual void
read
(karabo::data::Hash &header, std::vector<char> &data)¶ This function reads into a header and a vector of chars. The reading will block until the data record is read.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: A vector which will be updated accordingly
-
virtual void
read
(karabo::data::Hash &header, std::string &data)¶ This function reads into header and a string The reading will block until the data record is read. CAVEAT: As string is not guaranteed to be represented by a contiguous block of memory this function will always introduce a copy under the hood.
- Parameters
data
: A string which will be updated accordingly
-
virtual void
read
(karabo::data::Hash &header, karabo::data::Hash &data)¶ This function reads into a header hash and a data hash. The reading will block until the data record is read. The reading will block until the data record is read.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: Hash object which will be updated to contain data information
-
virtual void
readAsyncSizeInBytes
(const ReadSizeInBytesHandler &handler)¶ In case a message arrived, handler will be called back The handler will inform about the number of bytes going to come in
- Parameters
handler
: Call-back function of signature: void (Channel::Pointer, const size_t&)
-
virtual void
readAsyncRaw
(char *data, const size_t &size, const ReadRawHandler &handler)¶ Asynchronously reads size number of bytes into pre-allocated data buffer A handler can be registered to inform about completion of writing NOTE: This function only makes sense calling after having used “readAsyncSizeInBytes”, which gives a chance to correctly pre-allocated memory in user-space.
- Parameters
data
: Pre-allocated contiguous block of memorysize
: This number of bytes will be copied into datahandler
: Call-back function of signature: void (Channel::Pointer)
-
virtual void
readAsyncStringUntil
(const std::string &terminator, const ReadStringHandler &handler)¶ Read a string until terminator string is found. (No header is expected).
- Parameters
terminator
: when this string found, read is donehandler
: handler with signature ReadStringHandler, i.e. void (const boost::system::error_code&, std::string&) is called. second handler parameter is the read string with terminator stripped away
-
virtual void
readAsyncVector
(const ReadVectorHandler &handler)¶ Asynchronously reads data into a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)
-
virtual void
readAsyncString
(const ReadStringHandler &handler)¶ Asynchronously reads data into a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const std::string&)
-
virtual void
readAsyncHash
(const ReadHashHandler &handler)¶ Asynchronously reads data into a hash. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)
-
virtual void
readAsyncHashPointer
(const ReadHashPointerHandler &handler)¶ Asynchronously reads data into a hash. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)
-
virtual void
readAsyncVectorPointer
(const ReadVectorPointerHandler &handler)¶ Asynchronously reads data into a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)
-
virtual void
readAsyncHashVector
(const ReadHashVectorHandler &handler)¶ Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::vector<char>&)
-
virtual void
readAsyncHashString
(const ReadHashStringHandler &handler)¶ Asynchronously reads data into a hash header and a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::string&)
-
virtual void
readAsyncHashHash
(const ReadHashHashHandler &handler)¶ Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)
-
virtual void
readAsyncHashPointerHashPointer
(const ReadHashPointerHashPointerHandler &handler)¶ Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)
-
virtual void
readAsyncHashVectorPointer
(const ReadHashVectorPointerHandler &handler)¶ Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<char>&)
-
virtual void
readAsyncHashVectorBufferSetPointer
(const ReadHashVectorBufferSetPointerHandler &handler)¶ Asynchronously reads data into a hash header and into a vector of BufferSet pointers. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<karabo::data::BufferSet::Pointer>&)
-
virtual void
write
(const char *data, const size_t &size)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
data
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be written
-
virtual void
write
(const std::vector<char> &data)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
data
: vector of chars containing the data to be written
-
virtual void
write
(const std::string &data)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
data
: string, where each character represents on byte of data to be written
-
virtual void
write
(const karabo::data::Hash &data)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
data
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
-
virtual void
write
(const karabo::data::Hash &header, const char *data, const size_t &size)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be written
-
virtual void
write
(const karabo::data::Hash &header, const std::vector<char> &data)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be written
-
virtual void
write
(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being written and BufferSet’s layoutbody
: vector of BufferSet pointers
Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be written, passed as a shared pointer
-
virtual void
write
(const karabo::data::Hash &header, const std::string &data)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: string, where each character represents on byte of data to be written
-
virtual void
write
(const karabo::data::Hash &header, const karabo::data::Hash &body)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
-
virtual void
writeAsyncRaw
(const char *data, const size_t &size, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be writtenhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual void
writeAsyncVector
(const std::vector<char> &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: vector of chars containing the data to be writtenhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: vector of chars containing the data to be written, passed as a shared pointerhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual void
writeAsyncHash
(const karabo::data::Hash &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived typeshandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual void
writeAsyncHashRaw
(const karabo::data::Hash &header, const char *data, const size_t &size, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtendata
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be writtenhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual void
writeAsyncHashVector
(const karabo::data::Hash &header, const std::vector<char> &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be writtenhandler
: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be written, passed as a shared pointerhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual void
writeAsyncHashHash
(const karabo::data::Hash &header, const karabo::data::Hash &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtenbody
: data contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived typeshandler
: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual void
writeAsyncHashVectorBufferSetPointer
(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body, const WriteCompleteHandler &handler)¶ Write header and vector<BufferSet::Pointer> asynchronously.
Upon write completion a handler function is called. Data inside the buffers must not be changed or deleted before this handler is called. Special care is needed if any Hash that had been serialised into the buffers contained an NDArray: The raw data of the array will be shared between the BufferSet and the Hash. Deletion of the Hash is safe, though.
- Parameters
header
: containing metadata for the data being writtenbody
: data as a vector of BufferSet::Pointerhandler
: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
virtual size_t
dataQuantityRead
()¶ Returns the number of bytes read since the last call of this method
-
virtual size_t
dataQuantityWritten
()¶ Returns the number of bytes written since the last call of this method
-
virtual void
setTimeoutSyncRead
(int milliseconds)¶ Set a timeout in when synchronous reads timeout if the haven’t been handled
- Parameters
milliseconds
:
-
virtual void
close
() = 0¶ Close this channel
-
virtual bool
isOpen
() = 0¶ Check if this channel is open
- Return
-
virtual void
writeAsync
(const char *data, const size_t &size, int prio = 4)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
data
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be writtenprio
: the priority of this write operation
-
virtual void
writeAsync
(const std::vector<char> &data, int prio = 4)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
data
: vector of chars containing the data to be writtenprio
: the priority of this write operation
Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
data
: vector of chars containing the data to be written, passed as a shared pointerprio
: the priority of this write operation
-
virtual void
writeAsync
(const std::string &data, int prio = 4)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
data
: passed as a string were each character represents one byte of the messageprio
: the priority of this write operation
-
virtual void
writeAsync
(const karabo::data::Hash &data, int prio = 4, bool copyAllData = true)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
data
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived typesprio
: the priority of this write operationcopyAllData
: When false, raw data (ByteArray) inside an NDArray won’t be copied. For other kind of data, the value of this flag is ignored and a copy will take place.
-
virtual void
writeAsync
(const karabo::data::Hash &header, const char *data, const size_t &size, int prio = 4)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
data
: Pointer to a contiguous block of memory that should be writtenheader
: containing metadata for the data being writtensize
: This number of bytes will be writtenprio
: the priority of this write operation
-
virtual void
writeAsync
(const karabo::data::Hash &header, const std::vector<char> &data, int prio = 4)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be writtenprio
: the priority of this write operation
Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be written, passed as a shared pointerprio
: the priority of this write operation
-
virtual void
writeAsync
(const karabo::data::Hash &header, const std::string &data, int prio = 4)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
header
: containing metadata for the data being writtendata
: passed as a string were each character represents one byte of the messageprio
: the priority of this write operation
-
virtual void
writeAsync
(const karabo::data::Hash &header, const karabo::data::Hash &data, int prio = 4, bool copyAllData = true)¶ Write data asynchronously, i.e. do not block upon call. Fire and forget, no callback called upon completion
- Parameters
header
: containing metadata for the data being writtendata
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived typesprio
: the priority of this write operation.copyAllData
: When false, raw data (ByteArray) inside an NDArray won’t be copied. For other kind of data, the value of this flag is ignored and a copy will take place.
-
virtual void
setAsyncChannelPolicy
(int priority, const std::string &new_policy, const size_t capacity = 0)¶ Set the policy of how data is queue on this channel for the queue of the given priority. Policies are:
- “LOSSLESS”: all data is queue and the queue increases in size within incoming data
- ”REJECT_NEWEST”: if the queue’s fixed capacity is reached new data is rejected
- ”REMOVE_OLDEST”: if the queue’s fixed capacity is the oldest data is rejected
NOTE: This method can potentially modify the capacity of a queue which is in use! This is undefined behavior. Users are encouraged to only call this method when intializing a Channel object instance.
- Parameters
priority
: of the queue to set the policy forpolicy
: to set for this queuecapacity
: is an optional capacity for the queue
-
virtual Connection::Pointer
-
class
Connection
¶ - #include <Connection.hh>
This class serves as the interface for all connections. A connection is only established upon call of the start() function.
Inherits from std::enable_shared_from_this< Connection >
Subclassed by karabo::net::TcpConnection
Public Functions
-
virtual ChannelPointer
start
() = 0¶ Starts the connection
-
virtual int
startAsync
(const ConnectionHandler &handler)¶ Starts the connection asynchronously
- Parameters
handler
: A callback with the following signature: void myHandler(ErrorCode, ChannelPointer)
-
virtual void
stop
() = 0¶ Stops the connection
-
virtual ChannelPointer
createChannel
() = 0¶ This function creates a “channel” for the given connection.
- Return
- Pointer to Channel
-
virtual ChannelPointer
-
class
ConnectionHandler
¶ - #include <AmqpConnection.hh>
Declare our custom ConnectionHandler to implement their callbacks. Every callback delegates processing to externally assigned callback if it was set. ConnectionHandler object calls its callbacks as follows… while connecting… onAttached, onConnected, onReady in case of error … onError, (onLost), onDetached in case of normal closure … onClosed, onDetached.
onAttached is called if new connection is associated with the handler onConnected is called if physical (TCP) connection is successfully established. onReady is called if login is successful onClosed is called in case of normal connection closure onError is called if errors encountered on connection onLost is called if onError was called and earlier onConnected was called. onDetached id called as a last step of connection loss
Inherits from LibBoostAsioHandler
-
class
EventLoop
¶ - #include <EventLoop.hh>
Karabo’s central event loop. Asynchronous events are passed throughout the distributed system by posting to the loop.
Public Static Functions
-
void
addThread
(const int nThreads = 1)¶ Add a number of threads to the event loop, increasing the number of thread available to handle events posted to the loop
- Parameters
nThreads
:
-
void
removeThread
(const int nThreads = 1)¶ Remove a number of threads from the event loop, reducing the number of threads available to handle events posted to the loop
- Parameters
nThreads
:
- template <class Function>
-
void
post
(Function &&func, unsigned int delayMs = 0)¶ Post a task on the underlying io event loop for later execution
- Parameters
func
: a functor not taking any argument, but with any return typedelayMs
: execution will be delayed by given time (in milliseconds)
-
boost::asio::io_context &
getIOService
()¶ Return the Eventloop’s underlying boost::asio::io_service
- Return
-
void
work
()¶ Start the event loop and block until EventLoop::stop() is called.
The system signals SIGINT and SIGTERM will be caught and trigger the following actions:
- a signal handler set via setSignalHandler is called,
- and EventLoop::stop() is called.
Must not be called in parallel to itself or to run().
If one or more tasks are in deadlock and thus their threads cannot be joined at the end, a karabo::data::TimeoutException is thrown.
-
void
run
()¶ Start the event loop and block until all work posted to its io service is completed or until EventLoop::stop() is called.
Must not be called in parallel to itself or to work().
If one or more tasks are in deadlock and thus their threads cannot be joined at the end, a karabo::data::TimeoutException is thrown.
-
size_t
getNumberOfThreads
()¶ Return the number of threads currently available to the event loop for distributing work
- Return
-
void
setSignalHandler
(const SignalHandler &handler)¶ Set the handler to be called if a system signal is caught.
See work() about which signals are caught.
- Parameters
handler
: function with signature ‘void (int signal)’
-
static bool
setCatchExceptions
(bool flag)¶ Public flag to change behaviour of catching exceptions in threads
By default, flag is true and the event loop runs its threads such that any exception is caught and logged as error. If flag is false, exceptions are rethrown after logging them.
- Return
- previous value of flag
- Parameters
flag
: that should be valid now
Private Functions
-
void
clearThreadPool
()¶ Clears the thread pool and joins the threads
If joining fails repeatedly, throws karabo::data::TimeoutException.
-
void
-
class
HttpClient
¶ Public Functions
-
HttpClient
(const std::string &baseURL, bool verifyCerts = false)¶ Creates a web client capable of submitting GET and POST requests to a given URL, over a secure or a plain connection.
- Parameters
baseURL
: the base URL to be prepended to each request path.verifyCerts
: when set to false, allows self generated server certificates when connecting securely (by bypassing certificate origin verification).
-
class
Impl
¶ Implementation of the WebClient class.
-
-
class
HttpRequestRunner
¶ Inherits from std::enable_shared_from_this< HttpRequestRunner >
-
class
HttpsRequestRunner
¶ Inherits from std::enable_shared_from_this< HttpsRequestRunner >
-
class
InfluxDbClient
¶ - #include <InfluxDbClient.hh>
This class uses HTTP protocol for communications with InfluxDB server. The protocol follows request/response pattern and before sending next request the response from the current one should be received. Only one request/response session per connection is allowed. To follow this rule the internal queue is used. Any request (functor) first is pushed into internal queue and then checked the internal state if some current request/response session is ongoing. If not, the next request is popped from front side of internal queue and executed. The internal flag (state) is raised. When response callback is called it checks if the internal queue has next entry and if so this entry is popped and executed. If not, the internal flag is lowered. For the time being the internal queue has no limits defined so it is possible that if the client cannot cope with input load rate some overflow condition can be encountered. The practice should show how we can handle these problems.
Inherits from std::enable_shared_from_this< InfluxDbClient >
Public Functions
-
void
startDbConnectIfDisconnected
(const InfluxConnectedHandler &hook = InfluxConnectedHandler())¶ Check if connection is lost and try to re-establish connection to InfluxDB server
- Parameters
hook
: function that will be called when connection is established
-
bool
isConnected
()¶ Returns true if connection is established to InfluxDB server
-
std::string
influxVersion
()¶ The version of the InfluxDb server the client is connected to.
- Return
- std::string the connected InfluxDb server version (empty if no server is currently connected).
-
std::string
serverUrl
()¶ The url of the InfluxDb server the client is connected to (or supposed to connect to).
- Return
- std::string the InfluxDb server url.
-
void
queryDb
(const std::string &statement, const InfluxResponseHandler &action)¶ HTTP request “GET /query …” to InfluxDB server is registered in internal queue. Can be called with connection to InfluxDB or without. Blocking if no connection exists. Otherwise non-blocking.
- Parameters
statement
: is SELECT expression.action
: callback: void(const HttpResponse&) is called when response comes from InfluxDB server
-
void
postQueryDb
(const std::string &statement, const InfluxResponseHandler &action)¶ HTTP request “POST /query …” to InfluxDB server is registered in internal queue.
- Parameters
statement
: SELECT, SHOW, DROP and others QL commandsaction
: callback: void(const HttpResponse&) is called when response comes from InfluxDB server
-
void
flushBatch
(const InfluxResponseHandler &respHandler = InfluxResponseHandler())¶ Flushes the contents of the write buffer to the InfluxDb.
- Parameters
respHandler
: If defined, the handler function will be called with the response sent by Influx after it accepted and processed the current batch of updates. If not defined, the flushBatch will work in a call-and-forget mode.
-
void
getPingDb
(const InfluxResponseHandler &action)¶ HTTP request “GET /ping …” to InfluxDB server is registered in internal queue.
- Parameters
action
: callback: void(const HttpResponse&) is called when response comes from InfluxDB server
-
bool
connectWait
(std::size_t millis)¶ Returns true if connection is established in “millis” time range, otherwise timeout condition comes up and returns false.
- Return
- true if connection established, or false in case of timeout
- Parameters
millis
: time in milliseconds to wait for connection to be established
Public Static Functions
-
std::string
generateUUID
()¶ Returns UUID used as Request-ID for HTTP requests
Private Functions
-
void
writeDb
(const std::string &message, const std::string &requestId)¶ Writing HTTP request
- Parameters
message
: formed in compliance of HTTP protocol Malformed requests resulting in response code 4xxrequestId
: unique id of the HTTP request to be sent to Influx.
-
void
onDbConnect
(const karabo::net::ErrorCode &ec, const karabo::net::Channel::Pointer &channel, const InfluxConnectedHandler &hook)¶ Low-level callback called when connection to InfluxDB is established
-
void
onDbRead
(const karabo::net::ErrorCode &ec, const std::string &data)¶ Low-level callback called when reading is done
Low-level callback called when writing into networks interface is done
-
void
postWriteDb
(const std::string &batch, const InfluxResponseHandler &action)¶ HTTP request “POST /write …” to InfluxDB server is registered in internal queue.
- Parameters
batch
: is a bunch of lines following InfluxDB “line protocol” separated by newline (‘ ‘) the “line protocol” is detailed at https://influxdbcom.readthedocs.io/en/latest/content/docs/v0.9/write_protocols/write_syntax/action
: callback is called when acknowledgment (response) comes from InfluxDB server. The callback signature is void(const HttpResponse&). The success error code in HttpResponse structure is 204.
-
void
postQueryDbTask
(const std::string &statement, const InfluxResponseHandler &action)¶ Actual “POST /query …” is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call
-
void
getPingDbTask
(const InfluxResponseHandler &action)¶ Actual “GET /ping …” is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call
-
void
postWriteDbTask
(const std::string &batch, const InfluxResponseHandler &action)¶ Actual “POST /write …” is accomplished. Non-blocking call, Connection should be established before this call
-
void
queryDbTask
(const std::string &statement, const InfluxResponseHandler &action)¶ Actual “GET /query …” is accomplished. If no connection to DB, this call is blocked until the connection is established. Otherwise the call is non-blocking.
-
void
onResponse
(const HttpResponse &o, const InfluxResponseHandler &action)¶ Generic wrap callback is called and call in turn the user “action”.
-
void
tryNextRequest
(std::unique_lock<std::mutex> &requestQueueLock)¶ Try to take the next request from internal queue and execute it. Set internal state to be “active” if it was not. Helper function.
- Parameters
requestQueueLock
: must be locked scoped_lock of m_requestQueueMutex, will be unlocked afterwards
-
void
sendToInfluxDb
(const std::string &msg, const InfluxResponseHandler &action, const std::string &requestId)¶ Send HTTP request to InfluxDb. Helper function.
Wraps the given InfluxResponseHandler within a callback to onResponse. It will be up to onResponse to call the action InfluxResponseHandler and keep the consumption of requests submitted to the InfluxDbClient going.
-
std::string
getRawBasicAuthHeader
()¶ Gets the raw form of the http Authorization header with values of dbUser and dbPassword separated by a colon and base64 encoded.
- Return
- The raw form of the Authorization header.
-
void
handleHttpReadError
(const std::string &errMsg, const std::string &requestId, bool logAsError = true)¶ Handle unrecoverable read and parsing errors while processing HTTP responses from Influx.
The recovery involves recycling the network connection, as there is no way to recover synchronism in the read operation within the current connection after those kind of errors happen. Also generates an HTTP response with status code 700 and an error message to communicate to users of the InfluxDbClient instance.
- Parameters
errMsg
: the error message to be put in the generated 700 coded http response.requestId
: the unique identifier of the HTTP request whose response could not be processed (needed to update the internal bookeeping of the InfluxDb client).logEsError
: if true (default), log as info, else as error
-
void
-
class
LosslessQueue
¶ - #include <Queues.hh>
The LosslessQueue implements a queue that guarantees to preserve messages.
Inherits from karabo::net::Queue
Subclassed by karabo::net::RejectNewestQueue
Public Functions
-
size_t
size
()¶ Return the size of the queue, i.e. the number of messages it holds
- Return
-
size_t
max_size
()¶ Return the maximum allowed size of this queue
- Return
-
void
set_capacity
(size_t capacity)¶ Set the capacity in terms of messages this queue can hold
- Parameters
capacity
:
-
size_t
capacity
()¶ Return this queues message capacity
- Return
-
void
clear
()¶ Clear this queue
-
bool
empty
()¶ Check if this queue is empty, i.e. size is 0
- Return
-
bool
full
()¶ Check if this queue is full, i.e. if it has reached its maximum capacity
- Return
-
void
resize
(size_t new_size)¶ Resize the queue to a new size
- Parameters
new_size
:
-
void
push_back
(const Message::Pointer &entry)¶ Add an element to the end of the queue, increases the size by one
- Parameters
entry
:
-
void
pop_front
()¶ Pop the first element from the queue, decreases the size by one
-
size_t
-
class
Message
¶ - #include <Queues.hh>
This class represents a message in the distributed Karabo system.
Public Functions
-
const karabo::data::BufferSet::Pointer &
body
() const¶ Return the body of the message
- Return
-
const VectorCharPointer &
header
() const¶ Return the header of the message
- Return
-
const karabo::data::BufferSet::Pointer &
-
class
NetworkInterface
¶ Public Functions
-
NetworkInterface
(const std::string &name_or_ip, bool exclude_loopback = true)¶ Construct a NetworkInterface object from an interface or IP address.
-
const std::string &
name
() const¶ Return the interface name for the object (for instance, ‘lo’ or ‘enp4s0’)
-
const std::string &
presentationIP
() const¶ Return the presentation address for the object.
The presentation address is the IP address (four numbers between 0 and 255, separated with ‘.’)
Private Functions
-
void
constructFromInterface
(const struct ifaddrs *ifa)¶ Fill the object fields with the information specified in the ifaddrs object passed as parameter.
-
-
struct
OneTimeTokenAuthorizeResult
¶ - #include <UserAuthClient.hh>
The results of a one-time token validation / authorization.
Subclassed by karabo::devices::BeginTemporarySessionResult
-
class
Queue
¶ - #include <Queues.hh>
This class defines the interface for message queues in the Karabo distributed system.
Subclassed by karabo::net::LosslessQueue, karabo::net::RemoveOldestQueue
Public Functions
-
virtual size_t
size
() = 0¶ Return the size of the queue, i.e. the number of messages it holds
- Return
-
virtual size_t
max_size
() = 0¶ Return the maximum allowed size of this queue
- Return
-
virtual void
set_capacity
(size_t capacity) = 0¶ Set the capacity in terms of messages this queue can hold
- Parameters
capacity
:
-
virtual size_t
capacity
() = 0¶ Return this queues message capacity
- Return
-
virtual void
clear
() = 0¶ Clear this queue
-
virtual bool
empty
() = 0¶ Check if this queue is empty, i.e. size is 0
- Return
-
virtual bool
full
() = 0¶ Check if this queue is full, i.e. if it has reached its maximum capacity
- Return
-
virtual void
resize
(size_t new_size) = 0¶ Resize the queue to a new size
- Parameters
new_size
:
-
virtual void
push_back
(const Message::Pointer &entry) = 0¶ Add an element to the end of the queue, increases the size by one
- Parameters
entry
:
-
virtual void
pop_front
() = 0¶ Pop the first element from the queue, decreases the size by one
-
virtual size_t
-
class
RejectNewestQueue
¶ - #include <Queues.hh>
The RejectNewestQueue implements a queue that will reject new entries when it has reached its maximum capacity.
Inherits from karabo::net::LosslessQueue
-
class
RemoveOldestQueue
¶ - #include <Queues.hh>
The RemoveOldestQueue implements a queue that removes the oldest element in the queue when it has reached is maximum capacity and a new element is pushed to it.
Inherits from karabo::net::Queue
Public Functions
-
size_t
size
()¶ Return the size of the queue, i.e. the number of messages it holds
- Return
-
size_t
max_size
()¶ Return the maximum allowed size of this queue
- Return
-
void
set_capacity
(size_t capacity)¶ Set the capacity in terms of messages this queue can hold
- Parameters
capacity
:
-
size_t
capacity
()¶ Return this queues message capacity
- Return
-
void
clear
()¶ Clear this queue
-
bool
empty
()¶ Check if this queue is empty, i.e. size is 0
- Return
-
bool
full
()¶ Check if this queue is full, i.e. if it has reached its maximum capacity
- Return
-
void
resize
(size_t new_size)¶ Resize the queue to a new size
- Parameters
new_size
:
-
void
push_back
(const Message::Pointer &entry)¶ Add an element to the end of the queue, increases the size by one
- Parameters
entry
:
-
void
pop_front
()¶ Pop the first element from the queue, decreases the size by one
-
size_t
-
struct
RemoveThreadException
¶ - #include <EventLoop.hh>
An exception that is thrown if a thread cannot be removed from the EventLoop
Inherits from std::exception
-
class
Strand
¶ - #include <Strand.hh>
A poor man’s substitute for boost::asio::strand because that does not guarantee that handlers posted on different strands can run in parallel (“strand collision”). Compared to boost::asio::strand, this
- lacks dispatch: we usually do not want that in Karabo since it allows the handler to be called now in this scope
- lacks running_in_this_thread: probably not too important
- has a more restrictive wrap: would be useful to support more, but a proper implementation would also need dispatch
Every handler posted will be put into a FIFO queue and the FIFO will be emptied in the background by posting the handlers to the given boost::asio::io_context (either from net::EventLoop, passed in constructor, or defined by setContext(..)).
NOTE: Do not create a Strand on the stack, but do it on the heap using the Configurator:
auto stack = karabo::data::Configurator<Strand>::create(Hash(…));
Otherwise ‘enable_shared_from_this’ does not work which is needed to guarantee (via usage of karab::util::bind_weak) that an internal Strand method is executed on the event loop when the Strand is already destructed.
Inherits from std::enable_shared_from_this< Strand >
Public Functions
-
Strand
(boost::asio::io_context &ioContext)¶ Contructor only kept for backward compatibility.
Better use karabo::data::Configurator<Strand>::create(“Strand”, Hash())
-
Strand
(const karabo::data::Hash &config)¶ Construct the Strand.
The boost::asio::io_context of the karabo::net::EventLoop will be used.
Best use this constructor indirectly via karabo::data::Configurator<Strand>::create(“Strand”, cfg) which will validate cfg and create the Strand properly on the heap.
Keys of cfg are “maxInARow” (unsigned int) and “guaranteeToRun” (bool), see expectedParameters.
-
void
setContext
(boost::asio::io_context &ioContext)¶ Set the context to which the handlers are to be posted.
No concurrency protection: Must be called directly after Strand creation, before it is used.
-
void
post
(const std::function<void()> &handler)¶ Post a handler to the io_context with the guarantee that it is not executed before any other handler posted before has finished. Handlers posted on different Strands can always be run in parallel.
Note that “guaranteeToRun” flag of the constructor determines what happens with yet unhandled handlers when the Strand is destructed.
- Parameters
handler
: function without arguments and return value - will be copied
-
void
post
(std::function<void()> &&handler)¶ Post a handler to the io_context with the guarantee that it is not executed before any other handler posted before has finished. Handlers posted on different Strands can always be run in parallel.
Note that “guaranteeToRun” flag of the constructor determines what happens with yet unhandled handlers when the Strand is destructed.
- Parameters
handler
: function without arguments and return value as r-value reference - will be moved to avoid a copy
-
std::function<void()>
wrap
(std::function<void()> handler)¶ This function is used to create a new handler function object that, when invoked, will pass the wrapped handler to the Strand’s post function (instead of using dispatch as boost::io_service::strand::wrap does).
- Return
- A function object that, when invoked, passes the wrapped handler to the Strand’s post function.
- Parameters
handler
: The handler to be wrapped. The strand will make a copy of the handler object. Compared to boost::io_service::strand::wrap, the handler signature is much more restricted, i.e. must be void().
-
boost::asio::io_context &
getContext
() const¶ This function may be used to obtain the io_context object that the strand uses to post handlers.
- Return
- A reference to the io_context of the Strand. Ownership is not transferred to the caller.
-
boost::asio::io_context &
get_io_service
() const¶ Deprecated.
Use getContext() instead.
-
class
TcpChannel
¶ Inherits from karabo::net::Channel
Public Functions
-
Connection::Pointer
getConnection
() const¶ Pointer to connection of this channel - empty if that is not alive anymore
-
size_t
readSizeInBytes
()¶ Synchronously reads the TCP message’s size. Will block until a message arrives on the socket.
- Return
- Size in bytes of incoming TCP message
-
void
readAsyncSizeInBytes
(const ReadSizeInBytesHandler &handler)¶ In case a TCP message arrived, handler will be called back The handler will inform about the number of bytes going to come in The handler must have the following signature: void handler(Channel::Pointer, const size_t&)
- Parameters
handler
: Callback with signature: void (Channel::Pointer, const size_t&)
-
std::string
consumeBytesAfterReadUntil
(const size_t nBytes)¶ Synchronously reads size bytes and return them as a string. The reading will block until the bytes are read.
- Note
- reads up nBytes expecting no header. This method should ONLY be used in association with readAsyncStringUntil - they consume from the same boost::asio::streamBuffer and should not be used in the same context of the other read* methods.
- Parameters
size
: This number of bytes will be copied into data
-
void
read
(char *data, const size_t &size)¶ Synchronously reads size bytes from TCP socket into data.
- Parameters
data
: Pre-allocated contiguous block of memorysize
: This number of bytes will be copied into data
-
void
read
(std::vector<char> &data)¶ This function reads from a channel into vector of chars The reading will block until the data record is read. The vector will be updated accordingly (must not be pre-allocated before)
- Return
- void
This function reads from a channel into shared pointer of vector of chars The reading will block until the data record is read. The shared pointer of vector will be updated accordingly (must not be pre-allocated before)
- Return
- void
-
void
read
(karabo::data::Hash &data)¶ This function reads from a channel into vector of chars The reading will block until the data record is read. The size of data record is the first 4 bytes in a channel stream. The hash will be updated accordingly.
- Return
- void
-
void
read
(karabo::data::Hash &header, char *data, const size_t &size)¶ Synchronously reads size bytes from socket into data and provides a header.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: Pre-allocated contiguous block of memorysize
: This number of bytes will be copied into data
-
void
read
(karabo::data::Hash &header, std::vector<char> &data)¶ This function reads into a header and a vector of chars. The reading will block until the data record is read.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: A vector which will be updated accordingly
This function reads into a header and shared pointer of vector of chars. The reading will block until the data record is read.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: A shared pointer of a vector which will be updated accordingly
-
void
read
(karabo::data::Hash &header, karabo::data::Hash &data)¶ This function reads into a header hash and a data hash. The reading will block until the data record is read. The reading will block until the data record is read.
- Parameters
header
: Hash object which will be updated to contain header informationdata
: Hash object which will be updated to contain data information
-
void
readAsyncRaw
(char *data, const size_t &size, const ReadRawHandler &handler)¶ Asynchronously reads size number of bytes into pre-allocated data buffer A handler can be registered to inform about completion of writing
- Parameters
data
: Pre-allocated contiguous block of memorysize
: This number of bytes will be copied into datahandler
: Function of signature: <void (Channel::Pointer)> which will be call-backed upon read completion
-
void
readAsyncStringUntil
(const std::string &terminator, const ReadStringHandler &handler)¶ wrapper around boost::asio::async_read_until
Read a string until terminator string is found. (No header is expected).
- Note
- This method should ONLY be used in association with consumeBytesAfterReadUntil - they consume from the same boost::asio::streambuf and should not be used in the same context of the other read* methods.
- Parameters
terminator
: when this string found, read is donehandler
: handler with signature ReadStringHandler, i.e. void (const boost::system::error_code&, std::string&) is called. second handler parameter is the read string including the terminator
-
void
readAsyncString
(const ReadStringHandler &handler)¶ Asynchronously reads data into a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const std::string&)
-
void
readAsyncVector
(const ReadVectorHandler &handler)¶ Asynchronously reads data into a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)
-
void
readAsyncHash
(const ReadHashHandler &handler)¶ Asynchronously reads data into a hash. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)
-
void
readAsyncHashPointer
(const ReadHashPointerHandler &handler)¶ Asynchronously reads data into a hash. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&)
-
void
readAsyncVectorPointer
(const ReadVectorPointerHandler &handler)¶ Asynchronously reads data into a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const std::vector<char>&)
-
void
readAsyncHashVector
(const ReadHashVectorHandler &handler)¶ Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::vector<char>&)
-
void
readAsyncHashString
(const ReadHashStringHandler &handler)¶ Asynchronously reads data into a hash header and a string. All memory management is done by the API. NOTE: A string in general is not storing data contiguously. Thus, an additional copy under the hood is needed which makes this interface slightly slower.
- Parameters
handler
: Call-function of signature: void (Channel::Pointer, const karabo::data::Hash&, const std::string&)
-
void
readAsyncHashHash
(const ReadHashHashHandler &handler)¶ Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)
-
void
readAsyncHashPointerHashPointer
(const ReadHashPointerHashPointerHandler &handler)¶ Asynchronously reads data into a hash header and a hash body. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const karabo::data::Hash&)
-
void
readAsyncHashVectorPointer
(const ReadHashVectorPointerHandler &handler)¶ Asynchronously reads data into a hash header and a vector<char>. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<char>&)
-
void
readAsyncHashVectorBufferSetPointer
(const ReadHashVectorBufferSetPointerHandler &handler)¶ Asynchronously reads data into a hash header and into a vector of BufferSet pointers. All memory management is done by the API.
- Parameters
handler
: Call-function of signature: void (const ErrorCode&, const karabo::data::Hash&, const std::vector<karabo::data::BufferSet::Pointer>&)
-
void
write
(const char *data, const size_t &size)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
data
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be written
-
void
write
(const karabo::data::Hash &header, const char *data, const size_t &size)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be written
-
void
write
(const karabo::data::Hash &data)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
data
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
-
void
write
(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being written and BufferSet’s layoutbody
: vector of BufferSet pointers
-
void
write
(const karabo::data::Hash &header, const karabo::data::Hash &body)¶ Synchronous write. The function blocks until all bytes are written.
- Parameters
header
: containing metadata for the data being writtendata
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived types
-
void
writeAsyncRaw
(const char *data, const size_t &size, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be writtenhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
void
writeAsyncVector
(const std::vector<char> &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: vector of chars containing the data to be writtenhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
void
writeAsyncHash
(const karabo::data::Hash &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
data
: is contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived typeshandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
void
writeAsyncHashRaw
(const karabo::data::Hash &header, const char *data, const size_t &size, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtendata
: Pointer to a contiguous block of memory that should be writtensize
: This number of bytes will be writtenhandler
: to be called upon write completion handler. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
void
writeAsyncHashVector
(const karabo::data::Hash &header, const std::vector<char> &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtendata
: vector of chars containing the data to be writtenhandler
: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
void
writeAsyncHashVectorBufferSetPointer
(const karabo::data::Hash &header, const std::vector<karabo::data::BufferSet::Pointer> &body, const WriteCompleteHandler &handler)¶ Write header and vector<BufferSet::Pointer> asynchronously.
Upon write completion a handler function is called. Data inside the buffers must not be changed or deleted before this handler is called. Special care is needed if any Hash that had been serialised into the buffers contained an NDArray: The raw data of the array will be shared between the BufferSet and the Hash. Deletion of the Hash is safe, though.
- Parameters
header
: containing metadata for the data being writtenbody
: data as a vector of BufferSet::Pointerhandler
: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
void
writeAsyncHashHash
(const karabo::data::Hash &header, const karabo::data::Hash &data, const WriteCompleteHandler &handler)¶ Write data asynchronously, i.e. do not block upon call. Upon write completion a handler function is called
- Parameters
header
: containing metadata for the data being writtenbody
: data contained in a Hash with no particular structure, but serializable, i.e. containing no non-karabo data types or Hash derived typeshandler
: to be called upon write completion. Needs to be a function wrapped into a std::function which takes const boost::system::error_code& as its only argument.
-
size_t
dataQuantityRead
()¶ Returns the number of bytes read since the last call of this method
-
size_t
dataQuantityWritten
()¶ Returns the number of bytes written since the last call of this method
-
void
close
()¶ Close this channel
-
bool
isOpen
()¶ Check if this channel is open
- Return
-
karabo::data::Hash
queueInfo
()¶ Records the sizes of the write queues in a Hash. Useful for debugging devices with multiple channels open (like the GuiServerDevice…)
-
std::string
remoteAddress
() const¶ Address of the remote endpoint
-
void
writeAsync
(const char *data, const size_t &size, int prio)¶ Writes a copy from the data array.
-
void
writeAsync
(const std::vector<char> &data, int prio)¶ Writes a copy of the data vector.
Sends the vector pointed by data. The data vector must not be changed after the call to writeAsync.
-
void
writeAsync
(const std::string &data, int prio)¶ Writes a copy of the data string.
-
void
writeAsync
(const karabo::data::Hash &data, int prio, bool copyAllData)¶ When copyAllData is false, elements of type NDArray in the hash won’t be copied before being sent.
-
void
writeAsync
(const karabo::data::Hash &header, const char *data, const size_t &size, int prio)¶ Writes copies of the header hash and the data array.
-
void
writeAsync
(const karabo::data::Hash &header, const std::vector<char> &data, int prio)¶ Writes copies of the header hash and of the data vector.
Writes a copy of the header hash. Sends the vector pointed by data, not a copy of it. The data vector must not be changed after the call to writeAsync.
-
void
writeAsync
(const karabo::data::Hash &header, const std::string &data, int prio)¶ Writes copies of the header hash and of the data string.
-
void
writeAsync
(const karabo::data::Hash &header, const karabo::data::Hash &data, int prio, bool copyAllData)¶ When copyAllData is false, elements of type NDArray in the body hash won’t be copied before being sent. copyAllData doesn’t influence the handling of the header hash.
-
void
setAsyncChannelPolicy
(int priority, const std::string &new_policy, const size_t capacity = 0)¶ Set the policy of how data is queue on this channel for the queue of the given priority. Policies are:
- “LOSSLESS”: all data is queue and the queue increases in size within incoming data
- ”REJECT_NEWEST”: if the queue’s fixed capacity is reached new data is rejected
- ”REMOVE_OLDEST”: if the queue’s fixed capacity is the oldest data is rejected
NOTE: This method can potentially modify the capacity of a queue which is in use! This is undefined behavior. Users are encouraged to only call this method when intializing a Channel object instance.
- Parameters
priority
: of the queue to set the policy forpolicy
: to set for this queuecapacity
: is an optional capacity for the queue
Public Static Functions
This function returns low level info about connection like … “localIp”, “localPort”, “remoteIp”, “remotePort” that constitute active connection. in form of Hash container
- Return
- Hash with 4 key/value pairs
- Parameters
ptr
: input TcpChannel boost shared pointer
Private Functions
-
void
bytesAvailableHandler
(const boost::system::error_code &e)¶ Internal default handler
- Parameters
channel
:
-
void
onSizeInBytesAvailable
(const ErrorCode &error, const ReadSizeInBytesHandler &handler)¶ This function calls the corresponding handler
- Parameters
handler
:error
:
-
void
onVectorBufferSetPointerAvailable
(const ErrorCode &error, size_t length, const std::vector<karabo::data::BufferSet::Pointer> &buffers, const ReadVectorBufferSetPointerHandler &handler)¶ Internal handler called after filling the buffer set
- Parameters
error
: error codelength
: number of bytes read == total size of buffer setbuffers
: vector of buffer set pointers with the datahandler
: to be called
-
void
byteSizeAvailableHandler
(const size_t byteSize)¶ Internal default handler
- Parameters
byteSize
:
-
karabo::data::BufferSet::Pointer
bufferSetFromString
(const std::string &str)¶ Creates a buffer set with the given string stored in its sole buffer.
- Return
- shared_ptr to the buffer set with the string stored.
- Note
- actually places a copy of the string into the buffer set.
- Parameters
str
: the string to be stored in the buffer set.
-
karabo::data::BufferSet::Pointer
bufferSetFromPointerToChar
(const char *data, size_t size)¶ Creates a buffer set with contents of a given buffer of chars stored in its sole buffer.
- Return
- shared_ptr to the buffer set with the input buffer contents stored.
- Note
- actually places a copy of the contents of the input buffer into the buffer set.
- Parameters
data
: a pointer to the first char in the input sequence.
-
karabo::data::BufferSet::Pointer
bufferSetFromVectorCharPointer
(const VectorCharPointer &dataVect)¶ Creates a buffer set with characters in a given vector of chars stored in its sole buffer.
- Return
- shared_ptr to the buffer set with the character in the vector stored.
- Parameters
data
: a pointer to the vector of chars to be stored in the buffer set.
-
karabo::data::BufferSet::Pointer
bufferSetFromHash
(const karabo::data::Hash &data, bool copyAllData)¶ Creates a buffer set with a given hash stored in its sole buffer.
- Return
- pBuffSet a shared pointer that will be pointed to the newly created buffer set with the hash.
- Parameters
data
: the hash to be stored in the buffer set.copyAllData
: if false no copy of any NDArray internal to the hash will be made upon storing the hash in the bufferset (the buffer set will actually become one of the “owners” of the NDArray).
-
unsigned int
storeCompleteHandler
(const WriteCompleteHandler &handler)¶ Helper to store async write completion handler
- Return
- index under which handler is stored in internal map
- Parameters
handler
: write completion handler to be stored
-
void
applySocketKeepAlive
()¶ Helper to apply the TCP keep-alive settings to the socket if configured to do so.
Requires that m_socketMutex is locked.
-
Connection::Pointer
-
class
TcpConnection
¶ - #include <TcpConnection.hh>
a class for handling tcp connections
This class serves as the interface for all connections. A connection is only established upon call of the start() function. It is a factory class and thus can be configured using its expected parameters
Inherits from karabo::net::Connection
Public Functions
-
int
startAsync
(const ConnectionHandler &slot)¶ Starts the connection asynchronously providing a slot
-
void
stop
()¶ Closes the connection
-
ChannelPointer
createChannel
()¶ This function creates a “channel” for the given connection.
- Return
- Pointer to Channel
-
int
-
class
UserAuthClient
¶ Public Functions
Validate and authorize, asynchronously, a given one-time token against a given topic.
- Parameters
token
: the token to be validated and authorized.topic
: the topic against which the user linked to a valid token will be authorized.handler
: the handler to be called when the token is processed.
-
namespace
consumer
¶ Typedefs
-
using
karabo::net::consumer::MessageHandler = typedef std::function<void(karabo::data::Hash::Pointer, karabo::data::Hash::Pointer)>
-
using
karabo::net::consumer::ErrorNotifier = typedef std::function<void(Error, const std::string& description)>
-
using
-
typedef boost::system::error_code
The karabo::xms Namespace¶
-
namespace
karabo
::
xms
¶ Namespace for package xms
Namespace for package io
Typedefs
-
typedef InputChannelElement
INPUT_CHANNEL_ELEMENT
¶
-
typedef InputChannelElement
INPUT_CHANNEL
¶
-
typedef std::function<void(const std::vector<karabo::data::Hash>&)>
ShowConnectionsHandler
¶
-
typedef std::function<void(const std::vector<unsigned long long>&, const std::vector<unsigned long long>&)>
ShowStatisticsHandler
¶
-
typedef OutputChannelElement
OUTPUT_CHANNEL_ELEMENT
¶
-
typedef OutputChannelElement
OUTPUT_CHANNEL
¶
Functions
-
Hash
getHeartbeatInfo
(const Hash &instanceInfo)¶
Variables
-
const int
kMaxServerInitializationAttempts
= 2000¶
-
const int
msPingTimeoutInIsValidInstanceId
= 1000¶ Milliseconds of timeout when asking for validity of my id at startup:
-
const char *
beatsTopicSuffix
= "_beats"¶
-
const int
channelReconnectIntervalSec
= 6¶
-
const int
getOutChannelInfoTimeoutMsec
= 3000¶
-
class
InputChannel
¶ - #include <InputChannel.hh>
The InputChannel class is used to receive data from pipelined processing OutputChannels.
The InputChannel class is used to receive data from pipelined processing OutputChannels. It additionally supports receiving meta data associated with each data token read. Specifically, the meta information contains source information, i.e. what produced the data token, and timing information, e.g. train ids.
void onInput(const InputChannel::Pointer& input) { for (unsigned int i = 0; i != input->size(); ++i) { Hash h; const InputChannel::MetaData& meta = input->read(i); std::cout<<"Source: <<meta.getSource()<<std::endl; std::cout<<"TrainId: <<meta.getTimestamp().getTrainId()<<std::endl; } }
Inherits from std::enable_shared_from_this< InputChannel >
Public Functions
-
InputChannel
(const karabo::data::Hash &config)¶ If this object is constructed using the factory/configuration system this method is called
- See
- expectedParameters) and default-filled configuration
- Parameters
input
: Validated (
-
void
reconfigure
(const karabo::data::Hash &config, bool allowMissing = true)¶ Reconfigure InputChannel Disconnects any previous “connectedOutputChannels” if not in config[“connectedOutputChannels”].
- Parameters
config
: as needed by the constructorallowMissing
: if true, lack of keys “dataDistribution”, “minData”, “onSlowness”, “delayOnInput” and “respondToEndOfStream” is OK and their respective previous configuration is kept, if false, an exception is thrown when these keys are missing in config
-
void
registerInputHandler
(const InputHandler &ioInputHandler)¶ Register handler to be called when new data has arrived. For each index i from 0 to < size(), data and meta data can be received via read(i, dataHash) and readMetaData(i, metaData), respectively.
Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.
-
void
registerDataHandler
(const DataHandler &ioDataHandler)¶ Register handler to be called for each data item that arrives.
Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.
-
void
registerEndOfStreamEventHandler
(const InputHandler &endOfStreamEventHandler)¶ Register handler to be called when connected output channels inform about end-of-stream. If connected to more than one output channel, the handler is called if the last of them sends the end-of-stream signal.
Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when the end-of-stream signal arrives, i.e. this registration must not be called if (being) connected to any outpu channel.
-
InputChannel::Handlers
getRegisteredHandlers
() const¶ Get handlers registered for data, input and end-of-stream handling.
Do not call concurrrently with the corresponding register[Data|Input|EndOfStreamEvent]Handler() methods.
-
size_t
dataQuantityRead
()¶ Returns the number of bytes read since the last call of this method
See karabo::util::TcpChannel::dataQuantityRead()
-
size_t
dataQuantityWritten
()¶ Returns the number of bytes written since the last call of this method
See karabo::util::TcpChannel::dataQuantityWritten()
-
std::map<std::string, karabo::data::Hash>
getConnectedOutputChannels
()¶ Returns a map of between “output channel string” and “output channel info” Hash outputChannelString (STRING) represented like “instanceId:channelName” outputChannelInfo contains connection parameters or is empty, depending on connection state. This contains all output channels that the InputChannel is configured for, irrespective whether currently connected or not.
- Return
- map.
-
std::unordered_map<std::string, karabo::net::ConnectionStatus>
getConnectionStatus
()¶ Provide a map between the output channels that are configured and their connection status.
- Return
- map
-
const InputChannel::MetaData &
read
(karabo::data::Hash &data, size_t idx = 0)¶ Read data from the InputChannel - to be called inside an InputHandler callback
Kept for backward compatibility only since internally the data is copied! Use one of the other read methods instead.
- Return
- meta data associated to the data token. Lifetime of the object corresponds to live time of the InputHandler callback.
- Parameters
data
: reference that will hold the dataidx
: of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens
-
karabo::data::Hash::Pointer
read
(size_t idx = 0)¶ Read data from the InputChannel - to be called inside an InputHandler callback
- Return
- the data as a pointer
- Parameters
idx
: of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens
-
karabo::data::Hash::Pointer
read
(size_t idx, MetaData &source)¶ Read data and meta data from the InputChannel - to be called inside an InputHandler callback
- Return
- the data as a pointer
- Parameters
idx
: of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokenssource
: reference that will hold the meta data
-
size_t
size
()¶ Number of data tokens - to be called inside an InputHandler callback
-
void
connect
(const karabo::data::Hash &outputChannelInfo, const std::function<void(const karabo::net::ErrorCode&)> &handler = std::function<void(const karabo::net::ErrorCode&)>())¶ Asynchronously connect this input channel to the output channel described by the first argument
- Parameters
outputChannelInfo
: Hash with three keys- ”outputChannelString”: a string matching one of the configured “connectedOutputChannels”
- ”connectionType”: a string - currently only “tcp” supported
- ”hostname”: a string telling which host/ip address to connect to
- ”port”: an unsigned int telling the port
- ”memoryLocation: string “remote” or “local” to tell whether other end is in another process or can share memory
handler
: indicates asynchronously (like via EventLoop::post) the success of the connection request
-
void
disconnect
(const std::string &connectionString)¶ Disconnect and clean internals
- Parameters
connectionString
: One of the “connectedOutputChannels” given at construction
-
void
updateOutputChannelConfiguration
(const std::string &outputChannelString, const karabo::data::Hash &config = karabo::data::Hash())¶ Update list of output channels that can be connected
- Parameters
outputChannelString
: string that can later be used as key “outputChannelString” of Hash argument to connectconfig
: kept for backward compatibility
-
const std::vector<InputChannel::MetaData> &
getMetaData
() const¶ Get the current meta data for input data available on this input channel. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
- Return
-
std::vector<unsigned int>
sourceToIndices
(const std::string &source) const¶ Return the list of indices of the data tokens (for read(index) ) for a given source identifier. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
- Return
- Parameters
source
:
-
std::vector<unsigned int>
trainIdToIndices
(unsigned long long trainId) const¶ Return the list of indices of the data tokens (for read(index) ) for a given train id. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
- Return
- Parameters
source
:
-
const InputChannel::MetaData &
indexToMetaData
(unsigned int index) const¶ Return the data source identifier pertinent to a data token at a given index. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
- Return
- Parameters
index
:
- Exceptions
A
: KARABO_LOGIC_EXCEPTION of there is no meta data available for the given index.
Public Static Functions
-
void
expectedParameters
(karabo::data::Schema &expected)¶ Necessary method as part of the factory/configuration system
- Parameters
expected
: [out] Description of expected parameters for this object (Schema)
Private Functions
-
void
disconnectImpl
(const std::string &outputChannelString)¶ Disconnect internals - needs protection by m_outputChannelsMutex
- Parameters
outputChannelString
: One of the “connectedOutputChannels” given at construction
-
void
prepareData
()¶ Prepares data and metadata from the active chunk.
prepareData assumes that it has exclusive access to the m_activeChunk member variable when it’s called. Is up to the caller to guarantee that assumption.
Private Members
-
InputHandler
m_inputHandler
¶ Callback on available data (per InputChannel)
-
DataHandler
m_dataHandler
¶ Callback on available data (per item in InputChannel)
-
struct
Handlers
¶ - #include <InputChannel.hh>
Container for InputChannel handlers that concern data handling.
Public Functions
-
Handlers
(const DataHandler &data, const InputHandler &eos = InputHandler())¶ Construct with data and end-of-stream handlers - input handler could be specified afterwards
-
Handlers
(const InputHandler &input, const InputHandler &eos = InputHandler())¶ Construct with input and end-of-stream handlers - data handler could be specified afterwards
-
-
-
class
Memory
¶ - #include <Memory.hh>
The Memory class is an internal utility for InputChannel and OutputChannel to provide static shared memory.
Public Static Functions
-
void
read
(karabo::data::Hash &data, const size_t dataIdx, const size_t channelIdx, const size_t chunkIdx)¶ Read the contents of a single Hash out of the cache. The passed in Hash will be cleared first.
- Parameters
data
:dataIdx
:channelIdx
:chunkIdx
:
-
Memory::DataPointer
read
(const size_t dataIdx, const size_t channelIdx, const size_t chunkIdx)¶ Read the contents of a single Hash out of the cache. A pointer tag_of a newly created Hash will be returned.
- Parameters
dataIdx
:channelIdx
:chunkIdx
:
-
void
write
(const karabo::data::Hash &data, const size_t channelIdx, const size_t chunkIdx, const MetaData &metaData, bool copyAllData = true)¶ Write the contents of a single Hash into the cache. The Hash will be serialized before control is returned to the caller. Note that the data of an NDArray inside the Hash will not be copied, i.e. the Memory internal buffer will point to the same memory as the NDArray, except if copyAllData = true. it is safe to mutate the Hash after writing it.
- Parameters
data
: inputchannelIdx
: where to store the serialised datachunkIdx
: where to store the serialised datametaData
: of the datacopyAllData
: defines whether all data (incl. NDArray data) is copied into the internal buffer (default: true)
-
void
assureAllDataIsCopied
(const size_t channelIdx, const size_t chunkIdx)¶ Ensure that the data of given chunk is not shared with anyone else, i.e. copy data if needed.
- Parameters
channelIdx
:chunkIdx
:
-
class
MetaData
¶ Inherits from Hash
Public Functions
-
MetaData
(const std::string &source, const karabo::data::Timestamp ×tamp)¶ Constructor to directly set meta data entries
- Parameters
source
: an identifier of the data producertimestamp
: a timestamp relevant for this data token.
-
void
setSource
(const std::string &source)¶ Set data source, i.e. identifier of the data producer
- Parameters
source
:
-
const std::string &
getSource
() const¶ Get data source, i.e. identifier of the data producer
- Return
-
void
setTimestamp
(const karabo::data::Timestamp ×tamp)¶ Set the timestamp relevant to this data token
- Parameters
timestamp
:
-
const karabo::data::Timestamp
getTimestamp
() const¶ Get the timestamp relevant to this data token
- Return
-
-
void
-
class
OutputChannel
¶ - #include <OutputChannel.hh>
An OutputChannel for passing data to pipelined processing.
The OutputChannel class is used for writing data to pipelined processing inputs. It supports tracking of meta data for each data token written to it. Specifically, it e.g. allows for keeping track of data producers, here called sources, and timing and train information. Meta data information enables aggregation of multiple data source into one output channel interaction with a remote host, as well as aggregation of multiple train-related data of the same source. A mixture of both scenarios is possible.
An example of these use cases
OutputChannel::Pointer output = ... // Hash data1; .... OutputChannel::MetaData meta1("THIS/IS/SOURCE/A/channel1", karabo::data::Timestamp()); output->write(data1, meta1) Hash data2_10; .... OutputChannel::MetaData meta2_10("THIS/IS/SOURCE/B/channel2", timestampForTrain10); output->write(data2_10, meta2) OutputChannel::MetaData meta2_11("THIS/IS/SOURCE/B/channel2", timestampForTrain11); output->write(data2_11, meta2_11) Hash data_this_source; ... // not passing any meta data to write will default the source to [deviceId]/[channelName] // and the timestamp to the current timestamp output->write(data_this_source); // now actually send over the network output->update();
Inherits from std::enable_shared_from_this< OutputChannel >
Public Functions
-
OutputChannel
(const karabo::data::Hash &config)¶ If this object is constructed using the factory/configuration system this method is called.
The initialize() method must not be called if constructed this way.
Deprecated: Tcp server initialization is triggered, but there is no control when and whether it succeeded. So better use the constructor with additional int argument (and set it to zero).
- See
- expectedParameters) and default-filled configuration
- Parameters
config
: Validated (
-
OutputChannel
(const karabo::data::Hash &config, int autoInit)¶ Recommended constructor, allowing guaranteed-to-work initialization.
The recommended way to call it is via the Configurator and with autoInit == 0, followed by calling initialize():
Hash config(<here state=”” non-default=”” config=”” parameters>=”“>); OutputChannel::Pointer output = Configurator<OutputChannel>::create(“OutputChannel”, cfg, 0); output->initialize();
Caveat: Make sure you do not pass a ‘bool’ instead of an ‘int’ as argument to create(..) since then the other constructor is chosen and the value of the ‘bool’ determines whether to validate cfg or not.
- See
- expectedParameters) and default-filled configuration
- Parameters
config
: Validated (
- Parameters
autoInit
: If set to 0 (strongly recommended), the constructor does not yet try to initiate the TCP server initialization and the initialize() method has to be called as “second constructor”. The advantage is that the initialization cannot fail on busy systems and one has control when the server is available for remote connections. If autoInit != 0, this constructor behaves as the other constructor and initialize() must not be called.
-
void
initialize
()¶ “Second constructor”, to be called after construction with second argument autoInit == 0.
Initializes the underlying Tcp server connection and makes it available for others.
May throw a karabo::util::NetworkException, e.g. if a non-zero port was defined in the input configuration and that is not available since used by something else.
-
karabo::data::Hash
getInitialConfiguration
() const¶ returns the initial readonly configuration parameters
Returns a Hash containing the initial information that should not be updated via
ShowConnectionHandler
andShowStatisticsHandler
. Currently only theaddress
key is included.
-
std::string
getInstanceIdName
() const¶ Concatenation of instance id and name
-
bool
hasRegisteredCopyInputChannel
(const std::string &instanceId) const¶ Check whether an InputChannel with given id is registered to receive all data
i.e. an InputChannel with “dataDistribution == copy”
- Return
- bool whether InputChannel of specified type is connected
- Parameters
instanceId
: of InputChannel
Check whether an InputChannel with given id is registered to receive a share of the data
i.e. an InputChannel with “dataDistribution == shared”
- Return
- bool whether InputChannel of specified type is connected
- Parameters
instanceId
: of InputChannel
-
void
write
(const karabo::data::Hash &data, const Memory::MetaData &metaData, bool = false)¶ Writes a Hash containing data to the output channel. Sending to the network happens when update() is called.
Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
- Parameters
data
: input Hash objectmetaData
: a MetaData object containing meta data for this data token.
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
-
void
write
(const karabo::data::Hash &data, bool = false)¶ Writes a Hash containing data to the output channel. Sending to the network happens when update() is called. Metadata is initialized to default values. Namely the sending devices device id and the output channel’s name are used as data source.
Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
- Parameters
data
: input Hash object
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
-
void
write
(const karabo::data::Hash::Pointer &data, const Memory::MetaData &metaData)¶ Writes a Hash containing data to the output channel. Sending to the network happens when update() is called. Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
- Parameters
data
: shared pointer to input Hash objectmetaData
: a MetaData object containing meta data for this data token.
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
-
void
write
(const karabo::data::Hash::Pointer &data)¶ Writes a Hash containing data to the output channel. Sending to the network happens asynchronously. Metadata is initialized to default values. Namely the sending devices device id and the output channel’s name are used as data source. Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
- Parameters
data
: shared pointer to input Hash object
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
-
void
update
(bool safeNDArray = false)¶ Update the output channel, i.e. send all data over the wire that was previously written by calling write(…). This is a synchronous method, i.e. blocks until all data is actually sent (or dropped or queued).
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
- Parameters
safeNDArray
: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are ‘safe’, i.e. their memory will not be referred to elsewhere after update is finished. Default is ‘false’, ‘true’ can avoid safety copies of NDArray content when data is queued or sent locally.
-
void
asyncUpdate
(bool safeNDArray = false, std::function<void()> &&writeDoneHandler = []() {})¶ Semi-asynchronously update the output channel, i.e. start asynchronous sending of data over the wire that was previously written to the output channel’s‘ buffer by calling write(…), but block as long as required to really start sending. The start of sending data is delayed
- for any connected input channel that is currently not ready to receive more data, but is configured with “dataDistribution” as “copy” and with “onSlowness” as “wait”,
- or if none of the connected input channels that are configured with “dataDistribution” as “shared” are currently ready to receive data and if this output channel is configured with “noInputShared” as “wait”.
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
- Parameters
safeNDArray
: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are ‘safe’, i.e. their memory will not be referred to elsewhere before ‘readyHandler’ is called. Default is ‘false’, ‘true’ can avoid safety copies of NDArray content when data is queued or sent locally.writeDoneHandler
: callback when data (that is not queued) has been sent and thus even NDArray data inside it can be re-used again (except if safeNDArray was set to ‘true’ in which case its memory may still be used in a queue).
-
void
asyncUpdateNoWait
(std::function<void()> &&readyForNextHandlerstd::function<void()> &&writeDoneHandlerbool safeNDArray, )¶ Expert method
Asynchronously update the output channel, i.e. asynchronously send all data over the wire that was previously written by calling write(…) without any blocking.
This method must not be called again before either ‘readyForNextHandler’ or ‘writeDoneHandler’ have been called. If next data should be sent, but neither handler has been called yet, one has to block or skip the data. In the latter case, the wish of a connected input channel that is configured to make the output “wait” if not ready, is ignored (policy violation).
Both handlers have to be valid function pointers.
TODO: Provide a handler called when sending data is completed, including any queued data, and thus NDArray data can be re-used again even if safeNDArray=false (i.e. buffers could be re-used).
- Parameters
readyForNextHandler
: callback when asyncUpdateNoWait may be called again (this can only be delayed if any blocking input channel [“wait”] is connected)writeDoneHandler
: callback when sending is finished (as confirmed by Tcp) or stopped due to disconnection, or data is internally queued. So now all NDArray inside the Hash passed to write(..) before can be re-used again (except if safeNDArray was set to ‘true’ in which case its memory may still be used in a queue)safeNDArray
: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are ‘safe’, i.e. their memory will not be referred to elsewhere after update is finished. False triggers a data copy if data needs to be queued.
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
-
void
signalEndOfStream
()¶ Synchronously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.
Thread safety: All the ‘write(..)’ methods, ‘[async]UpdateNoWait’ and ‘[async]SignalEndOfStream(..)’ must not be called concurrently.
-
void
asyncSignalEndOfStream
(std::function<void()> &&readyHandler)¶ Asynchonously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.
Thread safety: All the ‘write(..)’ methods, ‘update()’/’asyncUpdate(cb)’ and ‘signalEndOfStream()’/’asyncSignalEndOfStream(cb)’ must not be called concurrently.
- Parameters
readyHandler
: callback when notification has been sent or queued
Register handler that selects which of the connected input channels that have dataDistribution = “shared” is to be served.
The handler will be called during update(..)/asyncUpdateNoWait with the ids of the connected “shared” input channels (e.g. “deviceId:input”) as argument. The returned channel id will receive the data. If an empty string or an unknown id is returned, the data will be dropped.
- Parameters
selector
: takes vector<string> as argument and returns string
-
void
disable
()¶ Shut down all underlying connections, object will not be usable afterwards.
Needed if stray shared pointers may be kept somewhere.
Public Static Functions
-
void
expectedParameters
(karabo::data::Schema &expected)¶ Necessary method as part of the factory/configuration system
- Parameters
expected
: [out] Description of expected parameters for this object (Schema)
Private Functions
-
void
eraseOldChannel
(InputChannels &channelContainer, const std::string &instanceId, const karabo::net::Channel::Pointer &newChannel) const¶ Erase instance with ‘instanceId’ from ‘channelContainer’ if existing - if same as ‘newChannel’, do not close
Helper to indicate that given shared input is ready to receive more data
Requires m_registeredInputsMutex to be locked
Helper to provide id of shared input that is ready to receive more data
Requires m_registeredInputsMutex to be locked
Helper to tell whether none of the shared inputs is ready to receive more data
Requires m_registeredInputsMutex to be locked
Helper to query whether given shared input is ready to receive more data
Requires m_registeredInputsMutex to be locked
Helper to indicate that given shared input is currently not ready to receive more data
Requires m_registeredInputsMutex to be locked
-
void
pushCopyNext
(const std::string &instanceId)¶ Helper to indicate that given copy input is ready to receive more data
Requires m_registeredInputsMutex to be locked
-
bool
eraseCopyInput
(const std::string &instanceId)¶ Erase instance from container of copy channels that are ready to receive data
Requires m_registeredInputsMutex to be locked
- Return
- whether instanceId could be removed (i.e. was actually ready to receive)
- Parameters
instanceId
:
-
bool
updateChunkId
()¶ helper to set new m_chunkId
- Return
- true if new m_chunkId is valid (i.e. not equal m_invalidChunkId)
-
void
ensureValidChunkId
(std::unique_lock<std::mutex> &lockOfRegisteredInputsMutex)¶ helper for asyncUpdate() to ensure that at the end m_chunkId is valid - may block a while
- Parameters
lockOfRegisteredInputsMutex
: a scoped_lock locking m_registeredInputsMutex (may be unlocked and locked again during execution)
-
void
asyncPrepareCopy
(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock)¶ Figure out how to treat copy inputs, return via appending to reference arguments
Requires m_registeredInputsMutex to be locked
-
void
asyncPrepareDistribute
(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock, bool &queue, bool &block)¶ Figure out how to treat shared inputs, return via (appending to) reference arguments
Requires m_registeredInputsMutex to be locked
-
bool
asyncPrepareDistributeEos
(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock)¶ Figure out how to send EndOfStream for shared outputs, return via reference arguments
Requires m_registeredInputsMutex to be locked
- Return
- whether to queue for shared queue
-
void
asyncPrepareDistributeSelected
(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock)¶ Figure out how to treat shared inputs if sharedInputSelector is registered
Requires m_registeredInputsMutex to be locked
-
void
asyncPrepareDistributeLoadBal
(unsigned int chunkId, std::vector<karabo::data::Hash *> &toSendImmediately, std::vector<karabo::data::Hash *> &toQueue, std::vector<karabo::data::Hash *> &toBlock, bool &queue, bool &block)¶ Figure out how to treat shared inputs when load-balancing
Requires m_registeredInputsMutex to be locked
-
void
resetSendOngoing
(const std::string &instanceId)¶ Helper that sets the sendOngoing flag to false for given instanceId
-
void
asyncSendOne
(unsigned int chunkId, InputChannelInfo &channelInfo, std::function<void()> &&doneHandler)¶ Helper to asynchronously send chunk data to channel in given channelInfo
- Parameters
chunkId
: The chunk to sendchannelInfo
: Container with info about channel to send todoneHandler
: Callback when sending done or failed
-
void
awaitUpdateFuture
(std::future<void> &fut, const char *which)¶ Helper for waiting for future that in case of long delay adds a thread to unblock
Throws TimeoutException if not unblocked after two minutes.
-
std::string
debugId
() const¶ Provide a string identifying this output channel (useful in DEBUG logging)
-
-
class
Signal
¶ Public Functions
- template <typename… Args>
-
void
setSignature
()¶ Use like setSignature<int, data::Hash, std::string>() to ensure that any emitted signal has to take arguments of these three types in that order.
-
bool
registerSlot
(const std::string &slotInstanceId, const std::string &slotFunction)¶ Register a slot to receive an emitted signal
- Return
- bool whether freshly registered (false means: was already registered)
- Parameters
slotInstanceId
: id of the instance of the slotslotFunction
: name of the slot
-
bool
unregisterSlot
(const std::string &slotInstanceId, const std::string &slotFunction = "")¶ Undo registration of a slot
- Return
- bool whether slot registration could be undone, i.e. false if slot was not registered
- Parameters
slotInstanceId
: instance id of the slot to be removedslotFunction
: the slot - if empty string, remove all registered slots of slotInstanceId
-
void
setTopic
(const std::string &topic)¶ This function allows to use a specific topic to which all messages are emitted If the setter is not called, the topic of SignalSlotable will be used NOTE: The idea is to keep a door open for a later change where each emit will use a topic identical to the signal name. In that case the setter can just be removed.
- Parameters
topic
: The topic name
-
class
SignalSlotable
¶ - #include <SignalSlotable.hh>
The SignalSlotable class. This class implements the so-called “Signal-Slot” design pattern orginally termed by the Qt-Gui framework. However, signals and slots are not restricted to a local application but can be connected and triggered across the network. This allows for programming with network components in the same intuitive (event-driven) way as Qt allows to do with its local components (e.g. widgets).
Moreover does this implementation (unlike Qt) not require any pre-processing. Another additional feature is the ability to setup new signals and/or slots at runtime.
Furthermore, this class implements functions for the common request/response patterns.
For a full documentation of the signal-slot component see the documentation in the software-guide.
Inherits from std::enable_shared_from_this< SignalSlotable >
Subclassed by karabo::core::Device, karabo::core::DeviceServer
Public Types
-
typedef std::function<void()>
AsyncErrorHandler
¶ An AsyncErrorHandler takes no argument, but it will be called such that it can rethrow and then catch exceptions. The caught exception indicates the failure reason, e.g.:
void asyncErrorHandler () { try { throw; } catch (std::exception& e) { // or any other exception type - or several catch statements KARABO_LOG_FRAMEWORK_WARN << “Probem when trying to do something: ” << e.what(); } }
Public Functions
-
SignalSlotable
(const std::string &instanceId, const karabo::net::Broker::Pointer &connection, const int heartbeatInterval = 30, const karabo::data::Hash &instanceInfo = karabo::data::Hash())¶ Creates a functional SignalSlotable object using an existing connection.
Don’t call init() afterwards.
- Parameters
instanceId
: The future instanceId of this object in the distributed systemconnection
: An existing broker connectionheartbeatInterval
: The interval (in s) in which a heartbeat is emittedinstanceInfo
: A hash containing any important additional information
-
SignalSlotable
(const std::string &instanceId, const karabo::data::Hash &brokerConfiguration = karabo::data::Hash(), const int heartbeatInterval = 30, const karabo::data::Hash &instanceInfo = karabo::data::Hash())¶ Creates a function SignalSlotable object allowing to configure the broker connection.
Don’t call init() afterwards.
- Parameters
instanceId
: The future instanceId of this object in the distributed systembrokerConfiguration
: A single keyed Hash where the key is the broker type and the Hash at that key is the configuration for the respective broker type (a given instanceId it will be replaced by the first constructor argument). Can be empty or can contain an empty Hash() at the single key, i.e. will be expanded from defaults.heartbeatInterval
: The interval (in s) in which a heartbeat is emittedinstanceInfo
: A hash containing any important additional information
-
void
init
(const std::string &instanceId, const karabo::net::Broker::Pointer &connection, const int heartbeatInterval, const karabo::data::Hash &instanceInfo, bool consumeBroadcasts = true)¶ Initializes the SignalSlotable object (only use in conjunction with empty constructor).
- Parameters
instanceId
: The future instanceId of this object in the distributed systemconnection
: An existing broker connectionheartbeatInterval
: The interval (in s) in which a heartbeat is emittedinstanceInfo
: A hash containing any important additional informationconsumeBroadcasts
: if true (default), receive messages addressed to everybody (i.e. to ‘*’) on its own. If false, some other mechanism has to ensure to deliver these.
-
void
start
()¶ This function starts the communication.
After a call to this non-blocking function the object starts listening to messages. The uniqueness of the instanceId is validated (throws SignalSlotException if not unique) and if successful the object registers with a call to “slotInstanceNew” to the distributed system.
-
bool
eraseTrackedInstance
(const std::string &instanceId)¶ Erase instance from container of tracked instances
To be called if one is tracking instances and is sure that the given instance is not alive anymore (e.g. if another instance in the same process is dead as well). If erroneously called, the next arriving heartbeat of the instance will trigger an instanceNew event
- Return
- whether instanceId was tracked before
- Parameters
instanceId
: that shall be treated as not alive anymore
-
std::vector<string>
getAvailableSignals
(const std::string &instanceId, int timeout = 100)¶ This is a synchronous call with timeout in milliseconds return vector of device signals.
- Return
- vector of device’s signal names
- Parameters
instanceId
: of the devicetimeout
: in milliseconds
-
std::vector<string>
getAvailableSlots
(const std::string &instanceId, int timeout = 100)¶ This is a synchronous call with timeout in milliseconds return vector of device slots.
- Return
- vector of device’s slot names
- Parameters
instanceId
: of the devicetimeout
: in milliseconds
-
const std::string &
getUserName
() const¶ Retrieves currently logged in username (empty if not logged in)
- Return
- string username
-
const std::string &
getInstanceId
() const¶ Access to the identification of the current instance using signals and slots
- Return
- instanceId
-
void
updateInstanceInfo
(const karabo::data::Hash &update, bool remove = false)¶ Update and publish the instanceInfo
- Parameters
update
: a Hash containing new or updated keys - or keys to removeremove
: if false (default), merge ‘update’ to existing instance info, otherwise subtract it
-
const SignalSlotable::SlotInstancePointer &
getSenderInfo
(const std::string &slotFunction)¶ This function must only be called within a slotFunctions body. It returns the current object handling the callback which provides more information on the sender.
- Return
- instance of a Slot object (handler object for this callback)
- Parameters
slotFunction
: The string-ified name of the slotFunction you are currently in
-
bool
connect
(const std::string &signalInstanceId, const std::string &signalSignature, const std::string &slotInstanceId, const std::string &slotSignature)¶ This function tries to establish synchronously a connection between a signal and a slot, identified both by their respective instance IDs and signatures. Moreover, this SignalSlotable obeys (throughout its full lifetime or until “disconnect” is called with the same arguments) the responsibility to keep this connection alive, i.e. to reconnect if either signal or slot instance come back after they have shutdown or if they come up the first time.
- Return
- whether connection is already successfully established
- Parameters
signalInstanceId
: is the instance ID of the signal (if empty use this instance)signalSignature
: is the signature of the signalslotInstanceId
: is the instance ID of the slot (if empty use this instance)slotSignature
: is the signature of the slot
-
bool
connect
(const std::string &signal, const std::string &slot)¶ This function tries to establish a connection between a signal and a slot as “connect” with four arguments does, so see there for more details. If signal or slot instance IDs are not specified, they are interpreted as local and automatically assigned a “self” instanceId
- Return
- whether connection is already succesfully established
- Parameters
signal
: <signalInstanceId>:<signalSignature>slot
: <slotInstanceId>:<slotSignature>
-
void
asyncConnect
(const std::string &signalInstanceId, const std::string &signalSignature, const std::string &slotInstanceId, const std::string &slotSignature, const std::function<void()> &successHandler = std::function<void()>()const AsyncErrorHandler &failureHandler = AsyncErrorHandler(), int timeout = 0, )¶ This function tries to establish asynchronously a connection between a signal and a slot, identified both by their respective instance IDs and signatures. Moreover, this SignalSlotable obeys (throughout its full lifetime or until “disconnect” is called with the same arguments) the responsibility to keep this connection alive, i.e. to reconnect if either signal or slot instance come back after they have shutdown or if they come up the first time.
- Parameters
signalInstanceId
: is the instance ID of the signal (if empty use this instance)signalSignature
: is the signature of the signalslotInstanceId
: is the instance ID of the slot (if empty use this instance)slotSignature
: is the signature of the slotsuccessHandler
: is called when connection is established (maybe be empty [=default])failureHandler
: is called when connection could not be established, in the same way as an Requestor::AsyncErrorHandler - if Signal or Slot do not exist, the exception is a SignalSlotExceptiontimeout
: in milliseconds for internal async requests - non-positive (default) means the very long default timeout
-
void
asyncConnect
(const std::vector<SignalSlotConnection> &signalSlotConnections, const std::function<void()> &successHandler = std::function<void()>()const AsyncErrorHandler &failureHandler = AsyncErrorHandler(), int timeout = 0, )¶ This function tries to establish asynchronously a connection between several signals and slots.
One of the two handlers will be called exactly once. The failureHandler will be called if any signal slot connection failed, no matter whether other connections succeeded or not.
- Parameters
signalSlotConnections
: e.g. vector<SignalSlotConnection>{SignalSlotConnection(“sigInst”, “signal”, “slotInst”, “slot”), …}successHandler
: is called when all connections are established (maybe be empty [=default])failureHandler
: is called when any of the connections could not be established, no matter whether the others failed or not, in the same way as a Requestor::AsyncErrorHandler.timeout
: in milliseconds for internal async requests - non-positive (default) means the very long default timeout
-
bool
disconnect
(const std::string &signalInstanceId, const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction)¶ Disconnects a slot from a signal, identified both by their respective instance IDs and signatures. In case the connection was established by this instance, also erase it from the list of connections that have to re-established in case signal or slot instances come back after a shutdown.
- Return
- whether connection is successfully stopped, e.g. false if there was no such connection or if remote signal instance ID did not confirm in time
- Parameters
signalInstanceId
: is the instance ID of the signal (if empty use this instance)signalSignature
: is the signature of the signalslotInstanceId
: is the instance ID of the slot (if empty use this instance)slotSignature
: is the signature of the slot
-
void
asyncDisconnect
(const std::string &signalInstanceId, const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction, const std::function<void()> &successHandler = std::function<void()>()const AsyncErrorHandler &failureHandler = AsyncErrorHandler(), int timeout = 0, )¶ This function tries to disconnect a previously established connection between a signal and a slot. These two are identified both by their respective instance IDs and signatures. In case the connection was established by this instance, the function also erases it from the list of connections that have to be re-established in case signal or slot instances come back after a shutdown.
- Parameters
signalInstanceId
: is the instance ID of the signal (if empty use this instance)signalSignature
: is the signature of the signalslotInstanceId
: is the instance ID of the slot (if empty use this instance)slotSignature
: is the signature of the slotsuccessHandler
: is called when connection is successfully stopped (maybe be empty [=default])failureHandler
: is called when the disconnection failed (maybe be empty [=default])timeout
: in milliseconds for internal async requests - non-positive (default) means the very long default timeout
- template <typename… Args>
-
void
emit
(const std::string &signalFunction, const Args&... args) const¶ Emits a signal, i.e. publishes the given payload Emitting a signal is a fire-and-forget activity. The function returns immediately.
- Parameters
signalFunction
: The name of the previously registered signalargs...
: A variadic number of arguments to be published
- template <typename… Args>
-
void
call
(const std::string &instanceId, const std::string &functionName, const Args&... args) const¶ Calls a (remote) function. Calling a remote function is a fire-and-forget activity. The function returns immediately after sending the message.
- Parameters
instanceId
: Instance to be calledfunctionName
: Function on instance to be called (must be a registered slot)args
: Arguments with which to call the slot
- template <typename… Args>
-
void
reply
(const Args&... args)¶ Place the reply of a slot call
To be used inside a method registered as a slot. The reply is not directly sent, but it is registered to be sent once all methods registered to the slot (usually only one) have finished execution. So if called several times in a slot, the last call defines the actual reply.
If this method is not called inside a slot, an “empty” reply will be send without arguments. But note that Device::updateState(const State s, …) implicitly calls reply(s.name()).
See about AsyncReply to avoid blocking the thread in case reply values are known only later, e.g. after some IO operations.
- Parameters
args
: 0 to 4 objects of the types known to serialisation, e.g. float, vector<long long>, Hash,…
-
void
registerSlot
(const std::function<void()> &slotconst std::string &funcName, )¶ Register a new slot function for a slot. A new slot is generated if so necessary. It is checked that the signature of the new slot is the same as an already registered one.
-
InputChannel::Pointer
createInputChannel
(const std::string &channelName, const karabo::data::Hash &config, const DataHandler &onDataAvailableHandler = DataHandler(), const InputHandler &onInputAvailableHandler = InputHandler(), const InputHandler &onEndOfStreamEventHandler = InputHandler(), const InputChannel::ConnectionTracker &connectTracker = InputChannel::ConnectionTracker())¶ Create and register an InputChannel together with handlers
- Return
- the created InputChannel - better do not store it anywhere, it can be received via getInputChannel(channelName)
- Parameters
channelName
: name of the channel, e.g. its path in the schemaconfig
: is a Hash with a Hash at ‘channelName’ which will be passed to InputChannel::createonDataAvailableHandler
: is a DataHandler called for each data item coming through the pipelineonInputAvailableHandler
: is an InputHandler called when new data arrives - user has to loop over all itemsonEndOfStreamEventHandler
: is an InputHandler called when EOS is receivedconnectTracker
: will be called whenever the connection status of the created channel changes
-
bool
removeInputChannel
(const std::string &channelName)¶ Remove the InputChannel created via createInputChannel
- Return
- true if such a channel existed and could be removed
- Parameters
channelName
: identifies the channel (first argument that was given to createInputChannel)
-
OutputChannel::Pointer
createOutputChannel
(const std::string &channelName, const karabo::data::Hash &config, const OutputHandler &onOutputPossibleHandler = OutputHandler())¶ Create an OutputChannel under the given name
If there is already one for than name, that one (and thus all other copies of its shared_ptr) will be disabled to disconnect any connection.
- Return
- pointer to created channel - do not store anywhere! If needed, retrieve again via getOutputChannel(channelName).
- Parameters
channelName
: the name for the channelconfig
: must have a Hash at key channelName - that is passed (after removeal of the “schema” key) to Configurator<OutputChannel>::createonOutputPossibleHandler
: ?
-
bool
removeOutputChannel
(const std::string &channelName)¶ Remove the OutputChannel created via createOutputChannel
Before removal, it (and thus all other copies of its shared_ptr) will be disabled to disconnect any connection.
- Return
- true if such a channel existed and could be removed
- Parameters
channelName
: identifies the channel (first argument that was given to createOutputChannel)
-
OutputChannel::Pointer
getOutputChannel
(const std::string &name)¶ Access pointer to OutputChannel with given name. Throws ParameterException if no such output channel.
- Return
- OutpuChannel::Pointer
- Parameters
name
: of output channel (e.g. path in expectedParameters)
-
OutputChannel::Pointer
getOutputChannelNoThrow
(const std::string &name)¶ Access pointer to OutputChannel with given name.
- Return
- OutputChannel::Pointer - empty if no channel of that name
- Parameters
name
: of output channel (e.g. path in expectedParameters)
-
InputChannel::Pointer
getInputChannel
(const std::string &name)¶ Access pointer to InputChannel with given name. Throws ParameterException if no such input channel.
- Return
- InputChannel::Pointer
-
InputChannel::Pointer
getInputChannelNoThrow
(const std::string &name)¶ Access pointer to InputChannel with given name.
- Return
- InputChannel::Pointer - empty if no channel of that name
- Parameters
name
: of input channel (e.g. path in expectedParameters)
-
void
connectInputChannel
(const InputChannel::Pointer &channel, int trails = 8)¶ Deprecated, use asyncConnectInputChannel!
Connects an input channel to those as defined on the input channel’s configuration. The function is asynchronous, but gives no feedback about success or failure.
-
void
asyncConnectInputChannel
(const InputChannel::Pointer &channel, const std::function<void(bool)> &handler, const std::vector<std::string> &outputChannelsToIgnore = std::vector<std::string>(), )¶ Connect input channel to output channels defined in its configuration.
Proper asynchronous implementation with feedback handler
- Parameters
channel
: pointer to InputChannelhandler
: to report success or failure. In the latter case the argument is false and more information about the failure can be retrieved via try { throw; } catch (const std::exception&e) { const std::string reason(e.what());} in the same way as in SignalSlotable::AsyncErrorHandleroutputChannelsToIgnore
: outputChannels that shall not be connected, e.g. because they are already connected (defaults to empty vector)
-
void
connectInputChannels
(const boost::system::error_code &e)¶ Trigger connection of all not connected input channel connections
Re-triggers itself regularly via internal m_channelConnectTimer
- Parameters
e
: do nothing if evaluates to true
Protected Functions
-
string
fetchInstanceId
(const std::string &signalOrSlotId) const¶ Parses out the instanceId part of signalId or slotId
- Return
- A string representing the instanceId
- Parameters
signalOrSlotId
:
-
void
registerBroadcastHandler
(std::function<void(const karabo::data::Hash::Pointer &header, const karabo::data::Hash::Pointer &body)> handler)¶ Register a handler to be called for every received message that is addressed to everybody. NOTE: This is not thread safe - call before SignalSlotable::start starts receiving messages.
- Parameters
handler
: with header and body (as Hash::Pointer) of the message
Private Functions
- template <typename… Args>
-
SignalSlotable::SignalInstancePointer
addSignalIfNew
(const std::string &signalFunction)¶ Helper for registerSignal: If signalFunction is not yet known, creates a signal corresponding to the template argument signature and adds it to the internal container. Otherwise an empty pointer is returned.
- Return
- pointer to new Signal or empty pointer
- Parameters
signalFunction
:
-
void
ensureInstanceIdIsValid
(const std::string &instanceId)¶ If instanceId has invalid characters, throws SignalSlotException.
-
void
ensureInstanceIdIsUnique
(const std::string &instanceId)¶ If instanceId not unique in system, throws SignalSlotException.
-
std::tuple<karabo::data::Hash::Pointer, std::string, bool>
registerAsyncReply
()¶ Internal method to provide info for AsyncReply object
- Return
- tuple of slot header, slot name and whether it is a global slot call
-
void
reconnectSignals
(const std::string &newInstanceId)¶ Calls connect for all signal-slot connections that involve ‘newInstanceId’ (be it on signal or slot side) and for which this instance is responsible, i.e. its “connect” has been called for this connection before (with or without immediate success). Calling “disconnect” stops this responsibility.
-
void
registerForShortcutMessaging
()¶ Register myself for short-cut messaging (i.e. bypass broker if in same process). Must not be called before instance ID is checked to be unique in overall system.
-
void
deregisterFromShortcutMessaging
()¶ Deregister myself from short-cut messaging.
-
void
registerNewSlot
(const std::string &funcName, SlotInstancePointer instance)¶ Register a new slot instance under name funcName. This will raise an error if the slot already exists.
-
void
slotConnectToSignal
(const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction)¶ Register signal-slot connection on signal side.
-
void
slotSubscribeRemoteSignal
(const std::string &signalInstanceId, const std::string &signalFunction)¶ Slot to subscribe to remote signal.
-
void
slotUnsubscribeRemoteSignal
(const std::string &signalInstanceId, const std::string &signalFunction)¶ Slot to un-subscribe from remote signal.
-
bool
instanceHasSlot
(const std::string &slotInstanceId, const std::string &unmangledSlotFunction)¶ True if instance with ID ‘slotInstanceId’ has slot ‘slotFunction’. Internally uses “slotHasSlot” for remote instances, but shortcuts if ID is the own one. Always true if ‘slotInstanceId == “*”’ (i.e. global slot).
-
void
slotHasSlot
(const std::string &unmangledSlotFunction)¶ Slot to tell whether instance has a slot of given name.
helper for connectInputChannels()
-
bool
tryToUnregisterSlot
(const std::string &signalFunction, const std::string &slotInstanceId, const std::string &slotFunction)¶ Try to undo registration of a slot “slotInstanceId.slotFunction”. Thread-safe, locks m_signalSlotInstancesMutex.
- Return
- bool true if signal existed and given slot was registered before
- Parameters
signalFunction
: name of local signalslotInstanceId
: instance id that carries the slotslotFunction
: the slot - if empty, all registered slots of slotInstanceId
For the given replyId of a ‘request.receiveAsync’, register error handling, i.e. the timer for timeout and the handler for remote exceptions.
Private Static Functions
-
void
callErrorHandler
(const AsyncErrorHandler &handler, const std::string &message)¶ Helper that calls ‘handler’ such that it can do
try { throw; } catch (const SignalSlotException &e) { <action>}
- Parameters
message
: text given to the SignalSlotException
-
class
AsyncReply
¶ - #include <SignalSlotable.hh>
A functor to place an asynchronous reply during slot execution.
- Create only inside a slot call of a SignalSlotable
- Can be copied around as wished
- Call either the operator or the error(..) method exactly once for one of the copies.
- Call these methods from another context/thread, i.e. between creation and use of its methods you have to leave the thread at least once)
- Must not be used once the SignalSlotable object that created it is (being) destructed (e.g. protect by bind_weak to member functions of the SignalSlotable)
Public Functions
-
AsyncReply
(SignalSlotable *signalSlotable)¶ Construct functor for an asynchronous reply. Create only within a slot call of a SignalSlotable
- Parameters
signalSlotable
: pointer to the SignalSlotable whose slot is currently executed (usually: this)
- template <typename… Args>
-
void
operator()
(const Args&... args) const¶ Place the reply - almost like using SignalSlotable::reply in the synchronous case. The difference is that here the reply is immediately sent and cannot be overwritten by a following call.
- Parameters
args
: 0-4 objects of the types known to serialisation, e.g. float, vector<long long>, Hash,…
-
void
error
(const std::string &message, const std::string &details = std::string()) const¶ If a proper reply cannot be placed, please use this to reply an error
- Parameters
message
: is the short text for the RemoteExceptiondetails
: further details, usually an exception trace (e.g. data::Exception::detailedMsg()), default is empty for backward compatibility reasons
-
class
Requestor
¶ Private Functions
-
std::pair<karabo::data::Hash::Pointer, karabo::data::Hash::Pointer>
receiveResponseHashes
()¶ Receives the reply for the current request, returning shared pointers to the reply’s header and body hashes.
- Return
- A pair with a shared pointer to the header hash of the reply in the first position and a shared pointer to the body in the second.
-
void
registerErrorHandler
(const AsyncErrorHandler &errorHandler)¶ Register handler for errors in async requests, e.g. timeout or remote exception.
-
void
getSignalInstanceId
(const karabo::data::Hash::Pointer &header, std::string &result)¶ Extracts the value of the SignalInstanceId path in a response header hash.
- Parameters
header
: response header hash from where the value will be extractedresult
: the string that will be assigned the value of the SignalInstanceId path
-
std::pair<karabo::data::Hash::Pointer, karabo::data::Hash::Pointer>
-
struct
SignalSlotConnection
¶ - #include <SignalSlotable.hh>
A structure to keep information of a signal-slot connection.
-
typedef std::function<void()>
-
class
Slot
¶ Subclassed by karabo::xms::SlotN< Ret, Args >
Public Functions
-
karabo::data::Hash::Pointer
getHeaderOfSender
() const¶ Return message header that triggered calling this slot Valid while callRegisteredSlotFunctions is processed.
-
karabo::data::Hash::Pointer
-
class
SLOT_ELEMENT
¶ Inherits from karabo::xms::SlotElementBase< SLOT_ELEMENT >
- template <class A1>
-
class
SLOT_ELEMENT1
¶ Inherits from karabo::xms::SlotElementBase< SLOT_ELEMENT1< A1 > >
- template <class Derived>
-
class
SlotElementBase
¶ Inherits from karabo::data::GenericElement< Derived >
Public Functions
-
Derived &
allowedStates
(const std::vector<karabo::data::State> &value)¶ The allowedStates method serves for setting up allowed states for the element
- Return
- reference to the Element (to allow method’s chaining)
- Parameters
states
: A string describing list of possible states.sep
: A separator symbol used for parsing previous argument for list of states
-
Derived &
- template <typename Ret, typename… Args>
-
class
SlotN
¶ Inherits from karabo::xms::Slot
Public Functions
-
virtual void
doCallRegisteredSlotFunctions
(const karabo::data::Hash &body)¶ To be called under protection of m_registeredSlotFunctionsMutex
- Parameters
body
: a Hash with up to four keys “a1” to “a4” with the expected types behind
-
virtual void
-
typedef InputChannelElement
The karabo::util Namespace¶
-
namespace
karabo
::
util
¶ Namespace for package packageName
Typedefs
-
using
karabo::util::InfluxResultSet = typedef std::pair< std::vector<std::string>, std::vector<std::vector<boost::optional<std::string> >> >
-
using
karabo::util::JsonType = typedef std::optional<nlohmann::json::value_t>
Functions
- template <class T>
-
void
addPointerToHash
(Hash &hash, const std::string &path, T *const &value, const Dims &dims, const char separator = Hash::k_defaultSep)¶
- template <class T>
-
void
getPointerFromHash
(const Hash &hash, const std::string &path, T *&value, Dims &dims, const char separator = data::Hash::k_defaultSep)¶
-
void
setDims
(Hash &hash, const std::string &path, const Dims &dims, const char separator = Hash::k_defaultSep)¶
-
data::Epochstamp
stringDoubleToEpochstamp
(const std::string ×tampAsDouble)¶ Convert an std::string that represents a double of the seconds since Unix epoch to an Epochstamp
-
void
getLeaves
(const data::Hash &configuration, const data::Schema &schema, std::vector<std::string> &result, const char separator)¶
-
void
getLeaves_r
(const data::Hash &hash, const data::Schema &schema, std::vector<std::string> &result, std::string prefix, const char separator, const bool fullPaths)¶
-
void
parseSingleJsonResult
(const nl::json &respObj, InfluxResultSet &influxResult, const std::string &columnPrefixToRemove)¶
-
void
jsonResultsToInfluxResultSet
(const std::string &jsonResult, InfluxResultSet &influxResult, const std::string &columnPrefixToRemove)¶ Utility function to convert a string into an InfluxResultSet
One or multiple concatenated JSON objects containing the results of an InfluxDB query are decoded and filled into a InfluxResultSet object.
- Parameters
jsonResult
: : the string containing the JSON object(s)influxResult
: : the resultcolumnPrefixToRemove
: : remove this prefix from the column names. InfluxQL selector functions (e.g. SAMPLE) are prepended to the column name. Use this argument to remove said prefixes.
- Exceptions
karabo::util::NotSupportedException
: in case the column mismatch nlohmann::json exceptions in case of malformatted JSON objects.
-
boost::optional<std::string>
jsonValueAsString
(nl::json value)¶
-
std::string
toInfluxDurationUnit
(const TIME_UNITS &karaboDurationUnit)¶
-
std::string
epochAsMicrosecString
(const data::Epochstamp &ep)¶
-
void
getLeaves
(const karabo::data::Hash &configuration, const karabo::data::Schema &schema, std::vector<std::string> &result, const char separator = karabo::data::Hash::k_defaultSep)¶
-
void
getLeaves_r
(const karabo::data::Hash &hash, const karabo::data::Schema &schema, std::vector<std::string> &result, std::string prefix, const char separator, const bool fullPaths)¶
-
std::string
toInfluxDurationUnit
(const karabo::data::TIME_UNITS &karaboDurationUnit)¶
-
std::string
epochAsMicrosecString
(const karabo::data::Epochstamp &ep)¶
-
boost::optional<std::string>
jsonValueAsString
(nlohmann::json value)¶ Utility function to convert a json object.
- Return
- a boost::optional<std::string> a null boost::optional<std::string> matches a null JSON value
- Parameters
value
:
-
void
processJson
(const nlohmann::json &j, Hash &result)¶
-
void
processJsonObject
(nlohmann::json::const_iterator &it, Hash &result)¶
-
void
processJsonArray
(nlohmann::json::const_iterator &it, Hash &result)¶
-
void
processJsonValue
(nlohmann::json::const_iterator &it, Hash &result)¶
-
JsonType
getArrayType
(const nlohmann::json &j)¶
-
karabo::data::Hash
jsonToHash
(const std::string &j)¶ Converts a JSON string representation into a Hash object.
This function parses a JSON string and constructs a corresponding Hash object representing the JSON structure. The JSON string is expected to follow the JSON standard format.
JSON Arrays of Mixed types are unsupported.
JSON types are mapped to C++ types as below:
- JSON Integer -> long long
- JSON Decimal -> double
- JSON empty array -> empty std::vector<std::string>
- Return
- A Hash object representing the parsed JSON structure.
- Parameters
jsonString
: A string containing the JSON representation to be converted.
- Exceptions
KARABO_PARAMETER_EXCEPTION
: if the provided JSON string is invalid or cannot be parsed into a Hash object.
-
Hash
generateAutoStartHash
(const karabo::data::Hash &initHash)¶ Generates an auto-start configuration Hash based on the provided initialization Hash.
This function takes an initialization Hash representing the initial configuration of components and constructs an auto-start configuration Hash based on it.
The initialization Hash is expected to have been generated from the JSON for the init string used to autostart devices on a karabo cpp server.
An example conversion should look like this:
initHash: ‘data_logger_manager_1’ + ‘classId’ => DataLoggerManager STRING ‘serverList’ => karabo/dataLogger STRING ‘schema_printer1’ + ‘classId’ => SchemaPrinter STRING
Above transforms to autoStart hash: ‘autoStart’ @ [0] ‘DataLoggerManager’ + ‘deviceId’ => data_logger_manager_1 STRING ‘serverList’ => karabo/dataLogger STRING [1] ‘SchemaPrinter’ + ‘deviceId’ => schema_printer1 STRING
- Return
- A Hash object representing the auto-start configuration based on the provided initialization Hash.
- Parameters
initHash
: The initialization Hash containing the initial configuration of components.
-
karabo::data::Hash
generateAutoStartHash
(const karabo::data::Hash &initHash) Generates an auto-start configuration Hash based on the provided initialization Hash.
This function takes an initialization Hash representing the initial configuration of components and constructs an auto-start configuration Hash based on it.
The initialization Hash is expected to have been generated from the JSON for the init string used to autostart devices on a karabo cpp server.
An example conversion should look like this:
initHash: ‘data_logger_manager_1’ + ‘classId’ => DataLoggerManager STRING ‘serverList’ => karabo/dataLogger STRING ‘schema_printer1’ + ‘classId’ => SchemaPrinter STRING
Above transforms to autoStart hash: ‘autoStart’ @ [0] ‘DataLoggerManager’ + ‘deviceId’ => data_logger_manager_1 STRING ‘serverList’ => karabo/dataLogger STRING [1] ‘SchemaPrinter’ + ‘deviceId’ => schema_printer1 STRING
- Return
- A Hash object representing the auto-start configuration based on the provided initialization Hash.
- Parameters
initHash
: The initialization Hash containing the initial configuration of components.
- template <typename Ret, typename… Args, typename Obj>
-
std::function<Ret(Args...)> karabo::util::exec_weak_impl(Ret(Obj::*)(Args...) const f, const Obj * o)
Provides a wrapper with the same signature as f, but securing shared ownership of an object of type Obj before execution of f. Class Obj needs to derive somewhere in its inheritance tree from enable_shared_from_this();
- Return
- a wrapped version of f.
- Parameters
f
: a const member function, can have any argument types and any return valueo
: a pointer to an object that has a member function f
- template <typename Ret, typename… Args, typename Obj>
-
std::function<Ret(Args...)> karabo::util::exec_weak_impl(Ret(Obj::*)(Args...) f, Obj * o)
Provides a wrapper with the same signature as f, but securing shared ownership of an object of type Obj before execution of f. Class Obj needs to derive somewhere in its inheritance tree from enable_shared_from_this();
- Return
- a wrapped version of f.
- Parameters
f
: a non-const member function, can have any argument types and any return valueo
: a pointer to an object that has a member function f
- template <typename F, typename Obj, typename… P>
-
auto
bind_weak
(const F &f, Obj *const o, const P... p)¶ Weakly binds member function f to an object of class Obj, but assures shared ownership of the object while f is executed. This means that during the lifetime of calling f, the object cannot be destroyed, but destruction is not blocked if f is not being executed but only bound. Class Obj needs to derive from std::enable_shared_from_this and the object pointer o has to be held by a shared_ptr. This means that you cannot use bind_weak within the constructor of Obj nor for objects constructed on the stack. Note that f may have any default constructable return type: If the bound functor will be called when the object is already destroyed, the functor returns a default constructed object of the return type.
Below is an example of how to bind to a boost::asio interface.
void Device::executeStepFunction(int arg, const boost::system::error_code& error) { …. m_timer.async_wait(bind_weak(&Device::executeStepFunction, this, arg + 1, _1)); …. }
- Return
- : bound functor, compatible with boost bind.
- Parameters
f
: function to bind, give as &Foo::baro
: pointer to object to bind top
: parameters as one would give to std::bind. Placeholders are fully supported.
- template <typename F, typename Tuple>
-
void
call
(F f, Tuple &&t)¶ Call a function f with arguments unpacked from a std::tuple
- Parameters
f
:t
:
- template <class… Ts>
-
void
unpack
(const Hash &hash, Ts&... args)¶ Unpack the hash (typically coming from the network) into the parameters given by reference.
- template <typename… Args>
-
auto
unpack
(const karabo::data::Hash &h)¶ Unpack parameters into a tuple holding only references
- Return
- std::tuple<Args&…>
- Parameters
h
: Hash with keys a1, a2, etc. encoding function arguments
- template <class… Ts>
-
void
pack
(Hash &hash, const Ts&... args)¶ Pack the parameters into a hash for transport over the network.
- Parameters
hash
: Will be filled with keys a1, a2, etc. and associated valuesargs
: Any type and number of arguments to associated to hash keys
-
std::ostream &
operator<<
(std::ostream &os, const TimeProfiler &profiler)¶
Variables
-
char const *const
INFLUX_DURATION_UNIT
= "u"¶
-
unsigned int const
INFLUX_PRECISION_FACTOR
= 1000000¶
-
char const *const
DATALOGMANAGER_ID
= "Karabo_DataLoggerManager_0"¶
-
char const *const
DATALOGGER_PREFIX
= "DataLogger-"¶
-
char const *const
DATALOGREADER_PREFIX
= "DataLogReader-"¶
-
char const *const
DATALOG_LINE_REGEX
= "^([TZ0-9\\.]+)\\|([0-9\\.]+)\\|([0-9]+)\\|(.+)\\|([A-Z][0-9A-Z_]+)\\|(.*)\\|([a-z0-9_]*)\\|([A-Z]+)$"¶
-
char const *const
DATALOG_LOGOUT_REGEX
= "^([TZ0-9\\.]+)\\|([0-9\\.]+)\\|([0-9]+)\\|\\.\\|(![\\s\\S])\\|(.*)\\|([a-z0-9_]*)\\|([A-Z]+)$"¶
-
char const *const
DATALOG_INDEX_LINE_REGEX
= "^([A-Z=\\+\\-]+)[\\s]+([TZ0-9\\.]+)[\\s]+([0-9\\.]+)[\\s]+(.+)$"¶
-
char const *const
DATALOG_INDEX_TAIL_REGEX
= "^([0-9]+)[\\s]+([0-9]+)[\\s]+([a-z0-9_\\.]*)[\\s]+([0-9]+)$"¶
-
char const *const
DATALOG_NEWLINE_MANGLE
= ".KRB_NEWLINE."¶
-
const unsigned int
MAX_INFLUX_VALUE_LENGTH
= 921600u¶
-
int
key
= 0¶
- template <typename From, typename To, typename = void>
-
struct
can_static_cast
¶ Inherits from false_type
- template <typename From, typename To>
-
template<>
structcan_static_cast
<From, To, std::void_t<decltype(static_cast<To>(std::declval<From>()))>>¶ Inherits from true_type
- template <typename Base, typename Derived>
-
struct
is_virtual_base_of
¶ Inherits from std::conjunction< std::is_base_of< Base, Derived >, std::negation< can_static_cast< Base *, Derived *> > >
-
struct
MetaData
¶ - #include <DataLogUtils.hh>
A structure defining meta data as used by the data loggers
-
struct
MetaSearchResult
¶ - #include <DataLogUtils.hh>
A structure defining meta data as used by the data logger’s search results
-
class
PluginLoader
¶ - #include <PluginLoader.hh>
The PluginLoader class.
- template <typename, typename>
-
struct
static_or_dyn_cast
¶ - #include <MetaTools.hh>
Helper functions to compile-time distinguish if a dynamic_cast is needed.
-
class
TimeProfiler
¶ Public Functions
-
TimeProfiler
(const std::string &name)¶ Constructor creates a profiler with a given name
- Parameters
name
: profiler’s name
-
const std::string &
getName
() const¶ Returns the profiler name
- Return
- std::string profiler’s name
-
void
open
()¶ Initialize the profiler internal structure
-
void
close
()¶ Finalize the profiler internal structure
-
void
startPeriod
()¶ Start a new unnamed period (i.e. detail) and append it to the current open period. Unnamed periods are leaves, thus do cover other sub-periods.
-
void
startPeriod
(const std::string &periodname)¶ Start an new period with the given name. Named periods can be nested, i.e. named periods can cover other named and anonymous periods
- Parameters
periodname
: period’s name
-
void
stopPeriod
()¶ Close the last open period
-
void
stopPeriod
(const std::string &periodname)¶ Stops period “periodname” and all nested periods
- Parameters
periodname
: period’s name
-
const TimePeriod
getPeriod
(const std::string &periodname) const¶ Return the time period period “periodname” as Hash
- Return
- TimePeriod object
- Parameters
periodname
: period’s name
-
const TimePeriod
getPeriod
() const¶ Returns the overall profiler period, i.e. from open to close.
- Return
- TimePeriod object
-
const Hash &
getPeriodAsHash
(const std::string &periodname) const¶ Return the time period period “periodname” as Hash
- Return
- Hash object
- Parameters
periodname
: period’s name
-
const Hash &
getPeriodAsHash
() const¶ Returns the overall profiler period.
- Return
- Hash object
-
operator karabo::data::Hash
()¶ Serialize the profiler into Hash object.
-
std::string
format
(const std::string &fmt, int level = std::numeric_limits<int>::max()) const¶ Serialize the profiler into string using specific time format.
- Return
- string object holding the string representation
- Parameters
format
: time formatlevel
: deepest level
-
std::string
sql
() const¶ Serialize the profiler as SQL insert query, in preparation to be inserted into database.
- Return
- string object holding the SQL query string
-
void
serialize
(std::ostream &os, int level = std::numeric_limits<int>::max()) const¶ Serialize the profiler into ostream object using default time format, i.e X.Y (where X is total seconds, and Y is fraction in nanoseconds)
- Parameters
os
: output stream objectformat
: time formatlevel
: deepest level
-
-
class
Version
¶ - #include <Version.hh>
A class providing versioning information for the Karabo framework.
Public Functions
Public Static Functions
-
const Version &
getKaraboVersion
()¶ Gets a Version object of the curent Karabo’s Framework
- Return
- Version object
-
std::string
getVersion
()¶ Returns a string describing the current version of the Framework Equivalent of calling. karabo::util::Version::getKaraboVersion().getString();
- Return
- std::string
-
const Version &
-
namespace
detail
¶ Functions
-
void
unpack_r
(const Hash &hash, char i)¶
- template <class Tfirst, class… Trest>
-
void
unpack_r
(const Hash &hash, char i, Tfirst &first, Trest&... rest)¶
-
void
pack_r
(Hash &hash, char i)¶
- template <class Tfirst, class… Trest>
-
void
pack_r
(Hash &hash, char i, const Tfirst &first, const Trest&... rest)¶
-
void
-
using