Code reference¶
Core¶
-
class
metropc.
ViewStage
¶ View execution stage.
A view may be executed at different stages in a pipeline, each with different semantics and limitations.
-
FRONTEND
¶ Within the frontend prior to dispatch to pipeline (NYI).
-
POOL
¶ In the first pipeline stage possibly parallelized on an event boundary. All expensive computations required for each event should be done here, if possible.
-
REDUCE
¶ In the second pipeline stage processing all events synchronously to possible combine data from several events.
-
ASYNC
¶ Additional stage running asynchronously alongside the reduce stage for very expensive and long running computations across several events, which are only executed when idling (NYI).
-
STEP
¶ Virtual stage running within the reduce stage at the end of an operator step.
-
ACTION
¶ Virtual stagae running within the reduce stage when requested by the frontend.
-
-
class
metropc.
ViewOutput
¶ View output type.
The output types defines what kind of output is expected from a view. There are kinds of output types, generic and hinted. A hinted output type derives from a generic output type, but includes further information about how to handle or visualize the output.
Comparions between generic and hinted output types evaluate to True if the hinted type derives from the generic type. Comparions between hinted types evaluate to True only if they are actually identical, i.e. yields False if they are different hinted types, but derive from the same generic type.
Generic types should be single words with derived hinted types using the same word and further describe it after an underscore. The value of generic types must be smaller than 10, and any corresponding hinted type must use ten times this value as an offset, adding less than 10 itself. For the value x of the generic type, it must follow the equation: 10x + y with x, y < 10.
-
COMPUTE
¶ Arbitrary output with no obvious or intended visualization, whose results are not sent to any client.
-
SCALAR
¶ Each result is a scalar value, which is typically visualized as a waveform.
-
VECTOR
¶ Each result is a one dimensional vector value, which is typically visualized as a line plot.
-
VECTOR_LINE
¶ Same as
VECTOR
, but with the additional hint that the visualization should be a line plot.
-
VECTOR_DISTRIBUTION
¶ Same as
VECTOR
, but with the additional hint that the visualization should be a bar plot.
-
MATRIX
¶ Each value is a two dimensional matrix value, which is typically visualized as a color- or heatmap.
-
POINTS
¶ Each value is one or a list of data point(s) consisting of one or more scalars each, which are typically displayed as a binned result.
-
-
class
metropc.
ContextStage
¶ Context stages.
Each pipeline stage (and all its instances) possesses a corresponding context object. This type only covers the actual stages with distinct instances unlike
ViewStage
, which also includes virtual stages with execute views, but are running in the context of a different stage (e.g. STEP and ACTION in REDUCE)-
FRONTEND
¶
-
POOL
¶ Primary pipeline stage for event processing, possibly parallelized across a large number of instances.
-
REDUCE
¶ Secondary processing stage for reduction across events with exactly one instance.
-
ASYNC
¶ Tertiary processing stage for long-running reduction events only performed when idle (NYI).
-
-
class
metropc.core.
Context
(source, path='<ctx>', stage=<ContextStage.FRONTEND: 0>, version=0, parameters=None, features=[], event_alias='event', event_id_alias=None, event_counter_alias=None)¶ Online analysis context.
The context defines the programmable stages of the online analysis pipeline in the form of views. These views allow a Python callable to be executed with the data obtained for any number of devices or other views and return result.
The context is typically loaded from a file and may contain any valid Python code, e.g. imports to help in implement its functionality. Several methods from the context object are available in the global scope of this file when evaluated in addition to the object itself via the ‘ctx’ symbol.
-
stage
¶ Type: metropc.ContextStage Stage at which this context object is created.
-
increase_view_counts
(view_name, delta)¶ Increase view counts.
The view passed to this method does not need to actually exist, but may be used just for counting purposes. The count value is restricted to an int64. It may only be called from within a view execution process.
Parameters: - view_name (str) – Name of the view to update.
- delta (int) – Relative change to counts.
Returns: None
-
require_feature
(*features)¶ Require support for a specific feature.
Feature strings indicate the presence of particular optional features supported by either metropc or its frontend in the current configuration.
Parameters: *features – All features required to run this context code. Raises: ContextError
– If any required feature is not supported.
-
require_version
(version_str)¶ Require a minimal metropc version.
Parsing and comparing the version strings is performed via pkg_resources.parse_version() and follows its semantics.
Parameters: version_str (str) – Version string to compare against. Returns: None Raises: ContextError
– If the version requirement is not satisfied.
-
set_view_docs
(name, label, docstring=None)¶ Set view documentation.
The view passed to this method does not need to actually exist, but may also be part of an actual view’s path.
This call only takes an effect if called in the reduce stage.
Parameters: - name (str) – Name of the view to update.
- label (str) – New label.
- docstring (str, optional) – New docstring, ignored for actual views.
Returns: None
-
Builtin view implementations¶
-
class
metropc.builtin.
HistogramView
¶ Delegates to
metropc.builtin.VectorHistogramView
ormetropc.builtin.MatrixHistogramView
depending on output type.
-
class
metropc.builtin.
VectorHistogramView
(*args, bin_step=None, bin_min=None, bin_max=None, bin_count=None, bin_limit=50000, bin_margin=1, dtype=<class 'numpy.float64'>, dim_label='bin', count_clipped=False, output_every=10, clear_every=None, **kwargs)¶ View implementation to bin 1D data into histograms.
Accepts kernel data as a generator, single scalars or any Iterable of values.
-
__init__
(*args, bin_step=None, bin_min=None, bin_max=None, bin_count=None, bin_limit=50000, bin_margin=1, dtype=<class 'numpy.float64'>, dim_label='bin', count_clipped=False, output_every=10, clear_every=None, **kwargs)¶ Initialize a vector histogram view.
All bin_* arguments are identical to
HistogramView.BinnedAxis
.Parameters: - dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.
- dim_label (str, optional) – Name of the xarray dimension, ‘bin’ by default.
- count_clipped (bool, optional) – Whether to count values clipped beyond the binning range (off by default). This may occur with fixed bin counts or when the limit is reached.
- output_every (int, optional) – Output data every nth event with any input, 10 by default.
- clear_every (int, optional) – Clear data every nth event with any input, disabled by default.
-
-
class
metropc.builtin.
MatrixHistogramView
(*args, x_step=None, x_min=None, x_max=None, x_count=None, x_limit=1000, x_label='x', y_step=None, y_min=None, y_max=None, y_count=None, y_limit=1000, y_label='y', bandwidth_limit=10485760, dtype=<class 'numpy.float64'>, count_clipped=False, output_every=10, clear_every=None, **kwargs)¶ View implementation to bin 2D data into histograms.
Accepts kernel data as a generator of individual points (two scalars per iteration), tuple of axis arrays (two 1D arrays, one for each axis), other Iterable of points (two scalars per item) or (N,2) shaped ndarray.
This view uses a bandwidth limit to not send excessive amounts of data to the client. It is computed by the product of x_limit, y_limit, the dtype’s item size and output_every with an assumed input rate of 10 Hz. The default limit is 10 MiB/s, which equates to a square output matrix of 1144x1144 bins for float64.
-
__init__
(*args, x_step=None, x_min=None, x_max=None, x_count=None, x_limit=1000, x_label='x', y_step=None, y_min=None, y_max=None, y_count=None, y_limit=1000, y_label='y', bandwidth_limit=10485760, dtype=<class 'numpy.float64'>, count_clipped=False, output_every=10, clear_every=None, **kwargs)¶ Initialize a matrix histogram view.
All arguments prefixed with x_*, y_* (except *_label) are identical to their respective bin_* equivalents of
HistogramView.BinnedAxis
.Parameters: - x_label (str, optional) – Name of the xarray dimension, ‘x’ by default.
- y_label (str, optional) – Name of the xarray dimensions, ‘y’ by default.
- bandwidth_limit (int, optional) – Maximum output data rate for the view assuming 10 Hz input, 10 MiB/s by default.
- dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.
- count_clipped (bool, optional) – Whether to count values clipped beyond the binning range (off by default). This may occur with fixed bin counts or when the limit is reached.
- output_every (int, optional) – Output data every nth event with any input, 10 by default.
- clear_every (int, optional) – Clear data every nth event with any input, disabled by default.
-
-
class
metropc.builtin.HistogramView.
BinnedAxis
(bin_step, bin_min, bin_max, bin_count, bin_limit)¶ Manages coordinate axis for data binning.
Creates and manages a coordinate axis for binned data according to a predefined set of specifications. It may be set to grow automatically up to a certain limit or fixed on one or two boundaries. After creation, the align() method allows to adapt it to any new set of additional data points added. This class only covers a single dimension, i.e. 2D data may use two instances for the width and height, respectively.
Bins must have a uniform width with no gaps. The value of a bin is defined as its center with its edges at +/- bin_step/2.
-
__init__
(bin_step, bin_min, bin_max, bin_count, bin_limit)¶ Initialize a binned axis.
All arguments except for bin_limit may be None to indicate they are unspecified. While conceptually like a default value, all arguments are positional and must be passed explicitly. This results in several different combinations to realize different operating modes:
- No argument given
- Use bin_step=1.0 and automatically expand the coordinates up to bin_limit.
- Only bin_step
- Use the supplied bin_step and automatically expand the coordinates up to bin_limit.
- bin_min or bin_max
- Use the supplied bin_step or 1.0 by default and automatically expand the coordinates up to bin_limit in any non-specified direction.
- Exactly two of bin_count, bin_min, bin_max.
- Uses the supplied bin_step or 1.0 by default and construct fixed coordinates, ignore bin_limit.
- bin_count and bin_min and bin_max
- Compute bin_step and construct fixed coordinates, ignore bin_limit.
The buffer is auto-expanding in one or both direction if bin_count is None. However, bin_count may become implicitly not None if both bin_min and bin_max are specified.
Parameters: - bin_step (float or None) – Size of a single bin, 1.0 by default and not unambiguous from other definitions.
- bin_min (float or None) – Minimum value to include in binning, expanding by default.
- bin_max (float or None) – Maximum value to include in binning, expanding by default. Note that the coordinates will end at bin_max - bin_size.
- bin_count (int or None) – Number of bins, expanding by default up to bin_limit.
- bin_limit (int) – Maximum number of bins when auto-expanding, but ignored for some configurations.
-
-
class
metropc.builtin.
LocalAverageView
(*args, N=10, **kwargs)¶ View implementation to get local average of view results.
A local average collects a set number of results from view invocations, suppressing their output, and returns their average when the buffer is full. It is then cleared and collections starts again. The buffer size therefore determines the output rate of this view implementation.
Accepts any scalar or ndarray value as input, either directly or multiple values as a generator. An array axis may also be treated as multiple values if reduce_axis is passed to its constructor.
There can only be one result per event at most, so if more inputs are generated than required for the next average, only the first complete average is returned and any further inputs discarded.
-
__init__
(*args, N=10, **kwargs)¶ Initialize a local average view.
Parameters: - N (int, optional) – Number of view results to average, 10 by default.
- reduce_axis (int, optional) – Array axis index to reduce when averaging, i.e. add multiple values to the average, disabled by default (the full input is averaged).
-
-
class
metropc.builtin.
GlobalAverageView
(*args, output_every=10, throttling=None, **kwargs)¶ View implementation to get global average of view results.
A global average continuously collects the result of view invocations, suppressing their output, and periodically returns their average at that time. The intermediate result can only be cleared explicitly.
Accepts any scalar or ndarray value as input, either directly or multiple values as a generator. An array axis may also be treated as multiple values if reduce_axis is passed to its constructor.
-
__init__
(*args, output_every=10, throttling=None, **kwargs)¶ Initialize a global average view.
Parameters: - output_every (int, optional) – Output data every nth event with any input, 10 by default.
- throttling (int, optional) – Deprecated name of output_every for compatibility with previous context code, None and ignored by default.
- reduce_axis (int, optional) – Array axis index to reduce when averaging, i.e. add multiple values to the average, disabled by default (the full input is averaged).
-
-
class
metropc.builtin.
MovingAverageView
(*args, N=100, accumulating_input=False, smooth_start=True, output_every=10, throttling=None, **kwargs)¶ View implementation to get moving average of view results.
A moving average continuously collects the result of view invocations, suppressing their output, and periodically returns the average over a set of recent results. It is similar to a local average, but decouples averaging and output, so the averaging window can be larger than the output rate may cover. If both windows are identical (i.e. N == output_every), it is identical to the local average.
Accepts any scalar or ndarray value as input.
-
__init__
(*args, N=100, accumulating_input=False, smooth_start=True, output_every=10, throttling=None, **kwargs)¶ Initialize a moving average view.
Parameters: - N (int, optional) – Number of view results to average, 100 by default
- accumulating_input (bool, optional) – Whether each input data event represents an individual measurement result (default) or the accumulation of results. In the latter case, the difference between input events is averaged.
- smooth_start (bool, optional) – Whether the result is scaled to regular signal intensity before the first full buffer, enabled by default. May be disabled for very large buffers to avoid an allocation, copy and rescaling on each output before the buffer is filled.
- output_every (int, optional) – Output data every nth event with any input, 10 by default.
- throttling (int, optional) – Deprecated name of output_every for compatibility with previous context code, None and ignored by default.
-
-
class
metropc.builtin.
ExtremumView
(*args, op, **kwargs)¶ View implementation only returning extreme results.
Each input is expected to two values, one actual result and one comparative value, which is passed to the comparison operator. If the operator evaluates to True when comparing the current and previous extremum, the result is returned.
Accepts any number of 2-tuples either directly or as a generator.
-
__init__
(*args, op, **kwargs)¶ Initialize an extreme value view.
Parameters: op (Callable) – Comparison operator taking two values and returning True if the current value is more extreme than the previous extreme (e.g. greater-than always yields the largest values).
-
-
class
metropc.builtin.
StepAverageView
(*args, **kwargs)¶ View implementation get step average of view results.
A step average contains all results from view invocations that occured during an operator step.
Accepts any scalar or ndarray value as input.
-
__init__
(*args, **kwargs)¶ Initialize a step average view.
-
-
class
metropc.builtin.
StepStackedView
(*args, dim_label='step', **kwargs)¶ View implementation to stack results from each step.
This view is always executed on the step boundary, taking the current result from its arguments and stacking the view result alongside a new axis, prepended to any existing axes.
Accepts any ndarray value as input.
-
__init__
(*args, dim_label='step', **kwargs)¶ Initialize a step stacked view.
Parameters: dim_label (str, optional) – Name of the xarray dimension the data per step is stacked on, ‘step’ by default.
-
Frontend API¶
Client API¶
-
metropc.client.
decode_protocol
()¶ Identical to
metropc.protocol.decode_protocol()
, but automatically decodesb'index'
opcodes intometropc.client.IndexEntry
objects.
-
metropc.protocol.
decode_protocol
(frames) Decode metropc message.
This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.
Parameters: frames – (Iterable or Iterator of ByteString): Received message frames. Returns: Opcode/path of this message. (Mapping): Contained data payload. Return type: (bytes)
ZeroMQ Protocol¶
metropc protocol.
All communications between the frontend, pipeline stages and clients uses ZeroMQ. This allows the distribution of these components to scale transparently from threads (via inproc transport) over processes (ipc) to nodes (tcp).
A metropc message consists of an opcode or path followed by the data payload in the form of a dict-like mapping of strings to arbitrary values. If possible, an optimized encoding/decoding fastpath for the data values is provided. This fastpath currently supports only the first level of this dictionary (i.e. not for nested dictionaries) and values of these types, either directly or in tuples of less than 256 elements: bytes, bool, int up to 64 bit, float, ndarray, DataArray, None. If the fastpath cannot be applied, the value is pickled.
A message always begins with two fixed message frames:
- Frame 0: UTF8-encoded opcode/path
- Frame 1: Type IDs of data values
For messages between the frontend and pipeline stages, the first message is an opcode denoting an operation to perform. Between the output and clients, it’s a data path the content of the message is about. The type IDs describes the type of values for fastpath serialization in the data mapping in the same order as they are encoded in the message. Each data key entry begins with the number of values for this key to represent tuples with up 255 elements, followed by single characters for each encoded type. For example, the type IDs for a data value like (42, b’foo’) are encoded as b’ib’ with b’i’ for integer and ‘b’ for a bytes string. The type IDs for all keys are directly concatenated to form a single long string.
The header is followed by the message content with one frame per UTF-8 encoded key and a type-dependant number x of frames for its value:
- Frame N: UTF8-encoded key
- Frame N+1 … N+x: Encoded value
This module defines the basic encoding/decoding scheme and should not be used directly by any code outside of metropc, but rather the imports via the corresponding API modules frontend and client. These add additional functionality required for these componets, e.g. identity control in the frontend.
-
metropc.protocol.
encode_protocol
(opcode, data={}, parts=None)¶ Encode metropc message.
This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.
Parameters: - opcode (bytes) – Opcode/path of this message.
- data (Mapping) – Data payload mapping strings to values.
- parts (list, optional) – List to fill encoded frames into, a new list is created if None (default).
Returns: Return type: (list of ByteString) Message frames
-
metropc.protocol.
decode_protocol
(frames)¶ Decode metropc message.
This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.
Parameters: frames – (Iterable or Iterator of ByteString): Received message frames. Returns: Opcode/path of this message. (Mapping): Contained data payload. Return type: (bytes)