Source code for imageSourcePy.ImageSourceMdl

#############################################################################
# Author: parenti
# Created on April 5, 2024, 07:37 AM
# Copyright (C) European XFEL GmbH Schenefeld. All rights reserved.
#############################################################################

from karabo.middlelayer import (
    Configurable, DaqDataType, Device, EncodingType, Image, ImageData, Node,
    OutputChannel, UInt16)

from ._version import version as deviceVersion


def channelSchema(shape, encoding, dtype):

    class DataNode(Configurable):
        daqDataType = DaqDataType.TRAIN
        image = Image(
            displayedName="Image",
            shape=shape,
            encoding=encoding,
            dtype=dtype)

    class ChannelNode(Configurable):
        data = Node(DataNode)

    return ChannelNode


[docs]class ImageSource(Device): """ Base class for image sources. It provides two output channels - 'output' and 'daqOutput' - for sending out images, and three functions - 'update_output_schema', 'write_channels' and 'signal_eos'. The function 'update_output_schema' will update the schema for the output channels and make it fit for the DAQ. The function 'write_channels' will write the input data to both the output channels, taking care of reshaping them for the DAQ. The function 'signal_eos' will send an end-of-stream signal to both the output channels. """ # provide version for classVersion property __version__ = deviceVersion INITIAL_SHAPE = [1, 1] INITIAL_ENCODING = EncodingType.GRAY INITIAL_DTYPE = UInt16 output = OutputChannel( channelSchema(INITIAL_SHAPE, INITIAL_ENCODING, INITIAL_DTYPE), displayedName="Output") # Second output channel for the DAQ daq_shape = list(reversed(INITIAL_SHAPE)) daqOutput = OutputChannel( channelSchema(daq_shape, INITIAL_ENCODING, INITIAL_DTYPE), displayedName="DAQ Output") def __init__(self, configuration): super().__init__(configuration) self._shape = self.INITIAL_SHAPE self._encoding = self.INITIAL_ENCODING self._dtype = self.INITIAL_DTYPE
[docs] async def update_output_schema(self, shape, encoding, dtype): """ Update the schema of 'output' and 'daqOutput' channels :param shape: the shape of image, e.g. (height, width) :param encoding: the encoding of the image. e.g. EncodingType.GRAY :param dtype: the data type, e.g. UInt16 :return: """ if ( shape == self._shape and encoding == self._encoding and dtype == self._dtype): # Nothing to be done return output_schema = channelSchema(shape, encoding, dtype) daq_shape = list(reversed(shape)) daq_output_schema = channelSchema(daq_shape, encoding, dtype) await self.setOutputSchema( "output", output_schema, "daqOutput", daq_output_schema) self._shape = shape self._encoding = encoding self._dtype = dtype
[docs] async def write_channels( self, data, binning=None, bpp=None, encoding=None, roi_offsets=None, timestamp=None): """ Write an image to 'output' and 'daqOutput' channels :param data: the image data as numpy.ndarray :param binning: the image binning, e.g. (1, 1) :param bpp: the bits-per-pixel, e.g. 12 :param encoding: the image encoding, e.g. EncodingType.GRAY :param roi_offsets: the ROI offset, e.g. (0, 0) :param timestamp: the image timestamp - if none the current timestamp will be used :return: """ image_data = ImageData( data, binning=binning, bitsPerPixel=bpp, encoding=encoding, roiOffsets=roi_offsets) self.output.schema.data.image = image_data await self.output.writeData(timestamp) # Reshape image for DAQ # NB DAQ wants shape in CImg order, eg (width, height) data = data.reshape(*reversed(data.shape)) image_data = ImageData( data, binning=binning, bitsPerPixel=bpp, encoding=encoding, roiOffsets=roi_offsets) self.daqOutput.schema.data.image = image_data await self.daqOutput.writeData(timestamp)
[docs] async def signal_eos(self): """ Send an end-of-stream signal to 'output' and 'daqOutput' channels :return: """ await self.output.writeEndOfStream() await self.daqOutput.writeEndOfStream()