Code reference

Core

class metropc.ViewStage

View execution stage.

A view may be executed at different stages in a pipeline, each with different semantics and limitations.

FRONTEND

Within the frontend prior to dispatch to pipeline (NYI).

POOL

In the first pipeline stage possibly parallelized on an event boundary. All expensive computations required for each event should be done here, if possible.

REDUCE

In the second pipeline stage processing all events synchronously to possible combine data from several events.

ASYNC

Additional stage running asynchronously alongside the reduce stage for very expensive and long running computations across several events, which are only executed when idling (NYI).

STEP

Virtual stage running within the reduce stage at the end of an operator step.

ACTION

Virtual stagae running within the reduce stage when requested by the frontend.

class metropc.ViewOutput

View output type.

The output types defines what kind of output is expected from a view. There are kinds of output types, generic and hinted. A hinted output type derives from a generic output type, but includes further information about how to handle or visualize the output.

Comparions between generic and hinted output types evaluate to True if the hinted type derives from the generic type. Comparions between hinted types evaluate to True only if they are actually identical, i.e. yields False if they are different hinted types, but derive from the same generic type.

Generic types should be single words with derived hinted types using the same word and further describe it after an underscore. The value of generic types must be smaller than 10, and any corresponding hinted type must use ten times this value as an offset, adding less than 10 itself. For the value x of the generic type, it must follow the equation: 10x + y with x, y < 10.

COMPUTE

Arbitrary output with no obvious or intended visualization, whose results are not sent to any client.

SCALAR

Each result is a scalar value, which is typically visualized as a waveform.

SCALAR_FAST

Same as SCALAR, but with the additional hint that the value is changing fast.

SCALAR_SLOW

Same as SCALAR, but with the additional hint that the value is changing slowly.

VECTOR

Each result is a one dimensional vector value, which is typically visualized as a line plot.

VECTOR_LINE

Same as VECTOR, but with the additional hint that the visualization should be a line plot.

VECTOR_DISTRIBUTION

Same as VECTOR, but with the additional hint that the visualization should be a bar plot.

MATRIX

Each value is a two dimensional matrix value, which is typically visualized as a color- or heatmap.

IMAGE

Identical to MATRIX.

POINTS

Each value is one or a list of data point(s) consisting of one or more scalars each, which are typically displayed as a binned result.

POINTS_BINNED

Same as POINTS, but with the additional hint that the visualization should bin all data points into a single result.

POINTS_SCATTER

Same as POINTS, but with the additional hint that the visualization should be a scatter plot.

class metropc.ContextStage

Context stages.

Each pipeline stage (and all its instances) possesses a corresponding context object. This type only covers the actual stages with distinct instances unlike ViewStage, which also includes virtual stages with execute views, but are running in the context of a different stage (e.g. STEP and ACTION in REDUCE)

FRONTEND
POOL

Primary pipeline stage for event processing, possibly parallelized across a large number of instances.

REDUCE

Secondary processing stage for reduction across events with exactly one instance.

ASYNC

Tertiary processing stage for long-running reduction events only performed when idle (NYI).

class metropc.core.Context(source, path='<ctx>', stage=<ContextStage.FRONTEND: 0>, version=0, parameters=None, features=[], event_alias='event', event_id_alias=None, event_counter_alias=None)

Online analysis context.

The context defines the programmable stages of the online analysis pipeline in the form of views. These views allow a Python callable to be executed with the data obtained for any number of devices or other views and return result.

The context is typically loaded from a file and may contain any valid Python code, e.g. imports to help in implement its functionality. Several methods from the context object are available in the global scope of this file when evaluated in addition to the object itself via the ‘ctx’ symbol.

stage
Type:metropc.ContextStage

Stage at which this context object is created.

increase_view_counts(view_name, delta)

Increase view counts.

The view passed to this method does not need to actually exist, but may be used just for counting purposes. The count value is restricted to an int64. It may only be called from within a view execution process.

Parameters:
  • view_name (str) – Name of the view to update.
  • delta (int) – Relative change to counts.
Returns:

None

require_feature(*features)

Require support for a specific feature.

Feature strings indicate the presence of particular optional features supported by either metropc or its frontend in the current configuration.

Parameters:*features – All features required to run this context code.
Raises:ContextError – If any required feature is not supported.
require_version(version_str)

Require a minimal metropc version.

Parsing and comparing the version strings is performed via pkg_resources.parse_version() and follows its semantics.

Parameters:version_str (str) – Version string to compare against.
Returns:None
Raises:ContextError – If the version requirement is not satisfied.
set_view_docs(name, label, docstring=None)

Set view documentation.

The view passed to this method does not need to actually exist, but may also be part of an actual view’s path.

This call only takes an effect if called in the reduce stage.

Parameters:
  • name (str) – Name of the view to update.
  • label (str) – New label.
  • docstring (str, optional) – New docstring, ignored for actual views.
Returns:

None

Builtin view implementations

class metropc.builtin.HistogramView

Delegates to metropc.builtin.VectorHistogramView or metropc.builtin.MatrixHistogramView depending on output type.

class metropc.builtin.VectorHistogramView(*args, bin_step=None, bin_min=None, bin_max=None, bin_count=None, bin_limit=50000, bin_margin=1, dtype=<class 'numpy.float64'>, dim_label='bin', count_clipped=False, output_every=10, clear_every=None, **kwargs)

View implementation to bin 1D data into histograms.

Accepts kernel data as a generator, single scalars or any Iterable of values.

__init__(*args, bin_step=None, bin_min=None, bin_max=None, bin_count=None, bin_limit=50000, bin_margin=1, dtype=<class 'numpy.float64'>, dim_label='bin', count_clipped=False, output_every=10, clear_every=None, **kwargs)

Initialize a vector histogram view.

All bin_* arguments are identical to HistogramView.BinnedAxis.

Parameters:
  • dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.
  • dim_label (str, optional) – Name of the xarray dimension, ‘bin’ by default.
  • count_clipped (bool, optional) – Whether to count values clipped beyond the binning range (off by default). This may occur with fixed bin counts or when the limit is reached.
  • output_every (int, optional) – Output data every nth event with any input, 10 by default.
  • clear_every (int, optional) – Clear data every nth event with any input, disabled by default.
class metropc.builtin.MatrixHistogramView(*args, x_step=None, x_min=None, x_max=None, x_count=None, x_limit=1000, x_label='x', y_step=None, y_min=None, y_max=None, y_count=None, y_limit=1000, y_label='y', bandwidth_limit=10485760, dtype=<class 'numpy.float64'>, count_clipped=False, output_every=10, clear_every=None, **kwargs)

View implementation to bin 2D data into histograms.

Accepts kernel data as a generator of individual points (two scalars per iteration), tuple of axis arrays (two 1D arrays, one for each axis), other Iterable of points (two scalars per item) or (N,2) shaped ndarray.

This view uses a bandwidth limit to not send excessive amounts of data to the client. It is computed by the product of x_limit, y_limit, the dtype’s item size and output_every with an assumed input rate of 10 Hz. The default limit is 10 MiB/s, which equates to a square output matrix of 1144x1144 bins for float64.

__init__(*args, x_step=None, x_min=None, x_max=None, x_count=None, x_limit=1000, x_label='x', y_step=None, y_min=None, y_max=None, y_count=None, y_limit=1000, y_label='y', bandwidth_limit=10485760, dtype=<class 'numpy.float64'>, count_clipped=False, output_every=10, clear_every=None, **kwargs)

Initialize a matrix histogram view.

All arguments prefixed with x_*, y_* (except *_label) are identical to their respective bin_* equivalents of HistogramView.BinnedAxis.

Parameters:
  • x_label (str, optional) – Name of the xarray dimension, ‘x’ by default.
  • y_label (str, optional) – Name of the xarray dimensions, ‘y’ by default.
  • bandwidth_limit (int, optional) – Maximum output data rate for the view assuming 10 Hz input, 10 MiB/s by default.
  • dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.
  • count_clipped (bool, optional) – Whether to count values clipped beyond the binning range (off by default). This may occur with fixed bin counts or when the limit is reached.
  • output_every (int, optional) – Output data every nth event with any input, 10 by default.
  • clear_every (int, optional) – Clear data every nth event with any input, disabled by default.
class metropc.builtin.HistogramView.BinnedAxis(bin_step, bin_min, bin_max, bin_count, bin_limit)

Manages coordinate axis for data binning.

Creates and manages a coordinate axis for binned data according to a predefined set of specifications. It may be set to grow automatically up to a certain limit or fixed on one or two boundaries. After creation, the align() method allows to adapt it to any new set of additional data points added. This class only covers a single dimension, i.e. 2D data may use two instances for the width and height, respectively.

Bins must have a uniform width with no gaps. The value of a bin is defined as its center with its edges at +/- bin_step/2.

__init__(bin_step, bin_min, bin_max, bin_count, bin_limit)

Initialize a binned axis.

All arguments except for bin_limit may be None to indicate they are unspecified. While conceptually like a default value, all arguments are positional and must be passed explicitly. This results in several different combinations to realize different operating modes:

  • No argument given
    Use bin_step=1.0 and automatically expand the coordinates up to bin_limit.
  • Only bin_step
    Use the supplied bin_step and automatically expand the coordinates up to bin_limit.
  • bin_min or bin_max
    Use the supplied bin_step or 1.0 by default and automatically expand the coordinates up to bin_limit in any non-specified direction.
  • Exactly two of bin_count, bin_min, bin_max.
    Uses the supplied bin_step or 1.0 by default and construct fixed coordinates, ignore bin_limit.
  • bin_count and bin_min and bin_max
    Compute bin_step and construct fixed coordinates, ignore bin_limit.

The buffer is auto-expanding in one or both direction if bin_count is None. However, bin_count may become implicitly not None if both bin_min and bin_max are specified.

Parameters:
  • bin_step (float or None) – Size of a single bin, 1.0 by default and not unambiguous from other definitions.
  • bin_min (float or None) – Minimum value to include in binning, expanding by default.
  • bin_max (float or None) – Maximum value to include in binning, expanding by default. Note that the coordinates will end at bin_max - bin_size.
  • bin_count (int or None) – Number of bins, expanding by default up to bin_limit.
  • bin_limit (int) – Maximum number of bins when auto-expanding, but ignored for some configurations.
class metropc.builtin.LocalAverageView(*args, N=10, **kwargs)

View implementation to get local average of view results.

A local average collects a set number of results from view invocations, suppressing their output, and returns their average when the buffer is full. It is then cleared and collections starts again. The buffer size therefore determines the output rate of this view implementation.

Accepts any scalar or ndarray value as input, either directly or multiple values as a generator. An array axis may also be treated as multiple values if reduce_axis is passed to its constructor.

There can only be one result per event at most, so if more inputs are generated than required for the next average, only the first complete average is returned and any further inputs discarded.

__init__(*args, N=10, **kwargs)

Initialize a local average view.

Parameters:
  • N (int, optional) – Number of view results to average, 10 by default.
  • reduce_axis (int, optional) – Array axis index to reduce when averaging, i.e. add multiple values to the average, disabled by default (the full input is averaged).
class metropc.builtin.GlobalAverageView(*args, output_every=10, throttling=None, **kwargs)

View implementation to get global average of view results.

A global average continuously collects the result of view invocations, suppressing their output, and periodically returns their average at that time. The intermediate result can only be cleared explicitly.

Accepts any scalar or ndarray value as input, either directly or multiple values as a generator. An array axis may also be treated as multiple values if reduce_axis is passed to its constructor.

__init__(*args, output_every=10, throttling=None, **kwargs)

Initialize a global average view.

Parameters:
  • output_every (int, optional) – Output data every nth event with any input, 10 by default.
  • throttling (int, optional) – Deprecated name of output_every for compatibility with previous context code, None and ignored by default.
  • reduce_axis (int, optional) – Array axis index to reduce when averaging, i.e. add multiple values to the average, disabled by default (the full input is averaged).
class metropc.builtin.MovingAverageView(*args, N=100, accumulating_input=False, smooth_start=True, output_every=10, throttling=None, **kwargs)

View implementation to get moving average of view results.

A moving average continuously collects the result of view invocations, suppressing their output, and periodically returns the average over a set of recent results. It is similar to a local average, but decouples averaging and output, so the averaging window can be larger than the output rate may cover. If both windows are identical (i.e. N == output_every), it is identical to the local average.

Accepts any scalar or ndarray value as input.

__init__(*args, N=100, accumulating_input=False, smooth_start=True, output_every=10, throttling=None, **kwargs)

Initialize a moving average view.

Parameters:
  • N (int, optional) – Number of view results to average, 100 by default
  • accumulating_input (bool, optional) – Whether each input data event represents an individual measurement result (default) or the accumulation of results. In the latter case, the difference between input events is averaged.
  • smooth_start (bool, optional) – Whether the result is scaled to regular signal intensity before the first full buffer, enabled by default. May be disabled for very large buffers to avoid an allocation, copy and rescaling on each output before the buffer is filled.
  • output_every (int, optional) – Output data every nth event with any input, 10 by default.
  • throttling (int, optional) – Deprecated name of output_every for compatibility with previous context code, None and ignored by default.
class metropc.builtin.ExtremumView(*args, op, **kwargs)

View implementation only returning extreme results.

Each input is expected to two values, one actual result and one comparative value, which is passed to the comparison operator. If the operator evaluates to True when comparing the current and previous extremum, the result is returned.

Accepts any number of 2-tuples either directly or as a generator.

__init__(*args, op, **kwargs)

Initialize an extreme value view.

Parameters:op (Callable) – Comparison operator taking two values and returning True if the current value is more extreme than the previous extreme (e.g. greater-than always yields the largest values).
class metropc.builtin.StepAverageView(*args, **kwargs)

View implementation get step average of view results.

A step average contains all results from view invocations that occured during an operator step.

Accepts any scalar or ndarray value as input.

__init__(*args, **kwargs)

Initialize a step average view.

class metropc.builtin.StepStackedView(*args, dim_label='step', **kwargs)

View implementation to stack results from each step.

This view is always executed on the step boundary, taking the current result from its arguments and stacking the view result alongside a new axis, prepended to any existing axes.

Accepts any ndarray value as input.

__init__(*args, dim_label='step', **kwargs)

Initialize a step stacked view.

Parameters:dim_label (str, optional) – Name of the xarray dimension the data per step is stacked on, ‘step’ by default.

Frontend API

Client API

metropc.client.decode_protocol()

Identical to metropc.protocol.decode_protocol(), but automatically decodes b'index' opcodes into metropc.client.IndexEntry objects.

metropc.protocol.decode_protocol(frames)

Decode metropc message.

This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.

Parameters:frames – (Iterable or Iterator of ByteString): Received message frames.
Returns:Opcode/path of this message. (Mapping): Contained data payload.
Return type:(bytes)

ZeroMQ Protocol

metropc protocol.

All communications between the frontend, pipeline stages and clients uses ZeroMQ. This allows the distribution of these components to scale transparently from threads (via inproc transport) over processes (ipc) to nodes (tcp).

A metropc message consists of an opcode or path followed by the data payload in the form of a dict-like mapping of strings to arbitrary values. If possible, an optimized encoding/decoding fastpath for the data values is provided. This fastpath currently supports only the first level of this dictionary (i.e. not for nested dictionaries) and values of these types, either directly or in tuples of less than 256 elements: bytes, bool, int up to 64 bit, float, ndarray, DataArray, None. If the fastpath cannot be applied, the value is pickled.

A message always begins with two fixed message frames:

  • Frame 0: UTF8-encoded opcode/path
  • Frame 1: Type IDs of data values

For messages between the frontend and pipeline stages, the first message is an opcode denoting an operation to perform. Between the output and clients, it’s a data path the content of the message is about. The type IDs describes the type of values for fastpath serialization in the data mapping in the same order as they are encoded in the message. Each data key entry begins with the number of values for this key to represent tuples with up 255 elements, followed by single characters for each encoded type. For example, the type IDs for a data value like (42, b’foo’) are encoded as b’ib’ with b’i’ for integer and ‘b’ for a bytes string. The type IDs for all keys are directly concatenated to form a single long string.

The header is followed by the message content with one frame per UTF-8 encoded key and a type-dependant number x of frames for its value:

  • Frame N: UTF8-encoded key
  • Frame N+1 … N+x: Encoded value

This module defines the basic encoding/decoding scheme and should not be used directly by any code outside of metropc, but rather the imports via the corresponding API modules frontend and client. These add additional functionality required for these componets, e.g. identity control in the frontend.

metropc.protocol.encode_protocol(opcode, data={}, parts=None)

Encode metropc message.

This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.

Parameters:
  • opcode (bytes) – Opcode/path of this message.
  • data (Mapping) – Data payload mapping strings to values.
  • parts (list, optional) – List to fill encoded frames into, a new list is created if None (default).
Returns:

Return type:

(list of ByteString) Message frames

metropc.protocol.decode_protocol(frames)

Decode metropc message.

This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.

Parameters:frames – (Iterable or Iterator of ByteString): Received message frames.
Returns:Opcode/path of this message. (Mapping): Contained data payload.
Return type:(bytes)