The Pipeline Interface¶
The conveyor belt example given in the introduction of the C++ and Python APIs is typical for a device that represents some hardware that is controlled and monitored. No data is produced by these devices, but just property changes are communicated. This is done via the central message broker.
Other devices like cameras produce large amounts of data that cannot be distributed this way. Instead, the data should be sent directly from one device producing it to one or more other devices that have registered themselves at the producer for that purpose. In the Karabo framework this can be done using a point-to-point protocol between so called output and input channels of devices.
Sending Data: Output Channel¶
First, we have to tell the framework what data is to be sent via the output
channel, i.e. to declare its schema.
This is done inside the expectedParameters
method.
Here is an example of a device sending a 32-bit integer, a string and
a vector of 64-bit integers:
Schema data;
INT32_ELEMENT(data).key("dataId")
.readOnly()
.commit();
STRING_ELEMENT(data).key("string")
.readOnly()
.commit();
VECTOR_INT64_ELEMENT(data).key("vector_int64")
.readOnly()
.commit();
Next (but still within expectedParameters
), the output channel has to be
declared. Here we create one with the key output:
OUTPUT_CHANNEL(expected).key("output")
.displayedName("Output")
.dataSchema(data)
.commit();
Whenever the device should write data to this output channel,
a Hash
should be created that matches this schema
Hash data;
data.set("dataId", 5);
data.set("string", std::string("This is a string to be sent."));
std::vector<long long> vec = ...; // create and fill the array here
data.set("vector_int64", vec);
Note that Karabo does not check that the data sent matches the declared schema.
Finally, the data is sent by calling the device method
this->writeChannel("output", data);
with the key of the channel as the first and the Data
object as the second
argument. The current timestamp will be added to the Data
object as a
meta data information.
Once the data stream is finished, i.e. no further data is to be sent, the end of stream method has to be called with the output channel key as argument to inform all input channels that receive the data:
this->signalEndOfStream("output");
Receiving Data: Input Channel¶
For input channels one also needs to declare what data they expect to receive.
This is done in exactly the same way as for output channels inside the
expectedParameters
method.
Declaring the input channel is also analogue to the way an output channel is
declared:
INPUT_CHANNEL(expected).key("input")
.displayedName("Input")
.description("Input channel: client") // optional, for GUI
.dataSchema(data)
.commit();
The next step is to prepare a member function of the device that should be called whenever new data arrives. The signature of that function has to be
void onData(const karabo::util::Hash& data,
const karabo::xms::InputChannel::MetaData& meta);
Inside the function the data sent can be unpacked from the Hash:
int id = data.get<int>("dataId");
const std::string& str = data.get<std::string>("string");
const vector<long long>& vec = data.get<std::vector<long long> >("vector_int64");
Finally, the framework has to be informed that this method should be called
whenever data arrives. This has to be done in the initialize()
member
function (or, more precisely, in the function registered in the constructor
using the KARABO_INITIAL_FUNCTION
macro) in the following way:
KARABO_ON_DATA("input", onData);
with the key of the input channel as first and the function name as the second argument.
A similar macro can be used to register a member function that should be called
when the data stream terminates, i.e. when the sending device calls
this->signalEndOfStream("<output channel name>");
:
KARABO_ON_EOS("input", onEndOfStream);
The signature of this member function has to be
void onEndOfStream(const karabo::xms::InputChannel::Pointer& input);
Note
A simple way of ensuring that input and output channels work with the same data schema is to move schema creation to a static function which is available to all devices working on this type of data, e.g. by means of a dependency or library.
Hierarchies in the Schema¶
The data that is sent from an output to an input channel can have a hierarchical
structure. This structure is declared in the usual way in
expectedParameters
, for both input and output channels:
Schema data;
// Add whatever data on first hierarchy level:
// ...
// First level done - now add second level:
NODE_ELEMENT(data).key("node")
.commit();
FLOAT_ELEMENT(data).key("node.afloat")
.readOnly()
.commit();
When writing to an output channel, one first has to create and fill the node. Then the node can be added and the data can be sent:
Hash data; // top level data structure
// Here e.g. fill top level content:
// ...
Hash node;
float floatValue = 1.3f; // or whatever...
node.set("afloat", floatValue);
data.set("node", node);
this->writeChannel("output", data);
In the onData
member function of a device receiving the data in an input
channel, the node can be unpacked in the following way:
void onData(const karabo::xms::Data& data,
const karabo::xms::InputChannel::MetaData& meta);
{
// ...
Hash node(data.get<Hash>("node"));
const float afloat = node.get<float>("afloat");
// ...
}
Treatment of Array Data¶
- Arrays are described in Karabo using the
NDArray
class. An
NDArray
consists of typed data and a shape.
It is meant to map directly to a numpy.ndarray
object in the Bound API,
so its interface closely matches numpy.ndarray
.
NDARRAY_ELEMENT(expected).key("arrayStack")
.shape("-1,100,100") // Variable dimension along the slowest axis
.readOnly().noInitialValue()
.commit();
In the above example -1
in the shape definition indicates a variable size of
this dimension; e.g. the first dimension is of variable size. If the shape
contains no negative numbers, the array is said to have a ‘fixed’ shape.
In Python, a transparent conversion to and from numpy.ndarray
elements
is performed:
a = np.ones((10, 100, 100))
self.set("arrayStack", a)
b = self.get("arrayStack")
type(b)
>>> numpy.ndarray
c = np.ones((10, 10, 100))
self.set("arrayStack", c)
>>> ValueError("Setting 'arrayStack' failed because dimension 2 in \
(10, 10, 100) mismatched array shape definition (-1, 100, 100)")
The NDArray
C++ class is a convenience class meant to
simplify supporting n-dimensional arrays within the Device
and Hash
classes. In C++ the Device::set
method is overwritten to accept NDArray
objects directly:
typedef std::vector<double> DoubleVector;
typedef boost::shared_ptr<DoubleVector> DoubleVectorPtr;
DoubleVectorPtr v(new DoubleVector(10*100*100, 1));
NDArray<double> arr(v, Dims(10, 100, 100));
set("arrayStack", arr);
// ... Then access the array
NDArray a = get<NDArray>("arrayStack");
const Dims & d = a.getDims();
DoubleVectorPtr v1 = a.getData();
Using the above constructor no copy of the data is performed. Alternatively, the a copying constructor may be used
typedef std::vector<double> DoubleVector;
typedef boost::shared_ptr<DoubleVector> DoubleVectorPtr;
DoubleVector v(10*100*100, 1);
NDArray arr(v, Dims(10, 100, 100));
set("arrayStack", arr);
// ... Then access the array
NDArray<double> a = get<NDArray >("arrayStack");
const Dims & d = a.getDims();
DoubleVectorPtr v1 = a.getData();
In this case NDArray
will create a copy of the data, but internally also
maintains it as a boost::shared_ptr
, thus avoiding additional copies from
there on. In either case access to the data is via a boost::shared_ptr
using getData()
.
Internally, NDArray
uses a ByteArray
to hold its data, while additionally,
defining the shape as an attribute in a standardized fashion. ``NDArray``s
can be placed and retrieved from Hashes in the accustomed way:
Treatment of Image Data¶
As with array data, image data can similarly be sent using the class
ImageData
which extends on-top of the NDArray
class with some predefined properties,
i.e. it serves as a special node with convenience methods for conversions to
and from more useful image data formats. The schema of an output channel for
image data is defined in expectedParameters
as follows:
Schema data;
IMAGEDATA(data).key("image")
.encodingType(karabo::xms::Encoding::RGBA)
.bitsPerPixel(12)
.isBigEndian(true)
.commit();
OUTPUT_CHANNEL(expected).key("output") // or any other key
.displayedName("Output") // or whatever name you choose
.dataSchema(data)
.commit();
For input channels simply replace OUTPUT_CHANNEL
by INPUT_CHANNEL
.
Image data refers to array-like data from camera interfaces. It may be
represented as an IMAGEDATA``(or ``IMAGEDATA_ELEMENT
) element, which
has fixed properties appropriate to the camera origin of the data. These are:
pixels: The N-dimensional array containing the pixels
dims: The length of the array reflects total dimensionality and each element the extension in this dimension
dimTypes: Any dimension should have an enumerated type
dimScales: Dimension Scales
encoding: Describes the color space of pixel encoding of the data (e.g. GRAY, RGB, JPG, PNG etc
bitsPerPixel: The number of bits needed for each pixel
roiOffsets: Describes the offset of the Region-of-Interest; it will contain zeros if the image has no ROI defined
geometry: optional hierarchical detector geometry information
header: Hash containing user-defined header data
Interface per TCP Message¶
Point-to-point communication in the Karabo framework generally uses TCP for
data transfer between devices.
Whenever writeChannel
is called for an output channel, the data is sent as
a separate message to all connected input channels.
There might be circumstances where it is advantageous to pack more than one
data item into a TCP message. For this a lower level API is provided as
described in the following.
To sent several data items in a single TCP message, the following few lines
of code should be used instead of this->writeChannel(channelName, data)
:
data.attachTimestamp(this->getActualTimestamp());
karabo::xms::OutputChannel::Pointer channel = this->getOutputChannel(channelName);
channel->write(data);
Once there is enough data accumulated to be actually sent,
channel->update();
has to be called.
For a device with an input channel it does not matter much whether several
data items that it receives have been sent in a single TCP message or not.
A member function registered with KARABO_ON_DATA
will be called
for each item. Nevertheless, in case it matters which data items are sent
together (which should not be the case), the device can register a method
that receives all data items in one go.
Instead of using KARABO_ON_DATA
, such a method has to be registered
using KARABO_ON_INPUT
. The signature of this method has to be
void onInput(const karabo::xms::InputChannel::Pointer& input);
Inside the method one has to loop over the data items. Finally one has to
tell the InputChannel
that reading the data is done by calling
update()
at the very end of the method:
for (size_t i = 0; i < input->size(); ++i) {
Hash data(input->read(i));
... // whatever you want to do with the data
}
// Tell the input channel that you are done with all data
input->update();
Compliance with Data Management¶
While the pipeline processing interface generally allows free form Hashes to be passed between devices, leaving it up to the device logic to correctly interpret these, there are limitations if data is be written to or retrieved from the data management system. Specifically, Hashes need to follow a certain structure, and the concept of meta-data needs to be understood.
Meta Data¶
So far we have simply written to output channels and ignored the fact that each data token written has meta data pertinent to it. This meta data currently contains source and timing information, but is by design extensible. If not explicitly set, the source corresponds to the writing device’s id and the output channel name, and the timing information to the train and timestamp for when the data was written. Frequently, source information should be maintained though, i.e. the writing device is not the data producer. In this case we explicitly set the source or forward existing meta data:
using namespace karabo::xms;
OutputChannel::Pointer channel = this->getOutputChannel(channelName);
for (size_t i = 0; i < input->size(); ++i) {
Hash data;
const InputChannel::MetaData& meta = (input->read(i, data));
... // whatever you want to do with the data
channel->write(data, meta);
const InputChannel::MetaData& meta2;
meta2.setSource("myProcessor");
channel->write(data, meta2);
}
// Tell the input channel that you are done with all data
input->update();
Metadata can be accessed either via read
or by indices. Data tokens for
the same source can be written subsequently to the same output channel, allowing
e.g. to bunch multiple trains before actually writing data out to the network.
Warning
The data management service expects only one train per data token per source to be sent to it.
In all cases the source information will be used by the data management system to correlate incoming data with data producers.
Hash Structure¶
For data interacting with the data management system and additional restriction applies in terms of Hash structure. Generally, data of similar types is organized hierarchical in nodes. The following data types exist:
Train Data¶
Is data that occurs on a per train basis. It can be in form of scalars, vectors or arrays. For interaction with the data management system a data token written to the channel always corresponds to a train. The Hash that is written must match the following policy:
on the root hierarchy level an unsigned long long element giving the trainId exists
freely named nodes enclosing exist on the same hierarchy level, which have an attribute
daqDataType
set toTRAIN
. Underneath these nodes scalar vector and array elements exist.
Pulse Data¶
Pulse data is data that has pulse resolution. Data can either exist for all pulses of a train or a subset. In either case the following limitations apply:
the enclosing node element has the
daqDataType
set toPULSE
.a vector unsigned long long element with key
pulseId
needs to be located directly underneath this node elementany number of freely vector elements with the same length as
pulseId
may reside underneath this node, or in further sub nodes. There is a 1:1 relation between the index in these elements and the pulse id given inpulseId
at this index.any number of freely NDArray elements with the last dimension of the same length as
pulseId
may reside underneath this node, or in further sub nodes. There is a 1:1 relation between the index of the last dimension in these elements and the pulse id given inpulseId
at this index. The other dimensions may not change from data token to data token.
There may be any number of these node elements, all following the above structure.
They may be freely named, except that the key masterPulseData
is reserved.
All train and pulse data elements must always be present in all hashes, even if the arrays or vectors are empty.
All train and pulse data elements must be specified in the output channel’s data schema. Adding additional elements in between data tokens, specifically between runs is not allowed.
Defining and Configuring Topologies¶
Pipelined processing in Karabo supports a variety of recurring topologies defining how data is passed through the system.
Copying vs. Sharing Data¶
An input channel may selected if it would like to receive data in copy or shared mode. In the first case it will receive a copy of all data sent by output channel it is connected to. In shared mode, the output channel is allowed to balance data distribution according to how it is configured. There are two options on the output channel:
- round robin
distributes data evenly on all connected input channels operating in shared mode. As indicated by the option name channels subsequently get data send to them. If the next channel in line is not available yet, writes to the output channel block until the data can be sent to this channel.
- load-balanced
distributes data on all connected input channels but does not enforce a particular distribution order. Upon writing data to an output channel it is sent to the next available input channel. This scenario should be used if data recipients are expected to have different processing times on data packages.
Best-Effort and Assured Delivery Configuration¶
Both input and output channels may be configured on what to do if the counter part is not available, i.e. no input is ready to receive data from a given output. Options are to
- throw
an exception.
- queue
the data and deliver it once an input becomes available. The write call to the output channel will not block.
- wait
for an input to become available, effectively blocking the write call to the output channel.
- drop
the data being sent out, i.e. do not send it but proceed with-out blocking the write call to the output.
Note
Queuing data may involve significant memory usage and thus should be used with care if large amounts of data are passed.
By default the channels are configured to wait behavior, which assures delivery but has the side effect of possibly stalling a complete processing pipeline by back-propagation. If a pipeline device with an input and output channels is used as a pipeline end-point, it is important to configure the last, unconnected output to drop to avoid this scenario from happening.