Skip to content

files

ControlSource

Bases: h5py.Group

Group for a control source ("slow data").

This class extends h5py.Group with methods specific to writing data of a control source in the European XFEL file format. The internal state does not depend on using any of these methods, and the underlying file may be manipulated by any of the regular h5py methods, too.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
class ControlSource(h5py.Group):
    """Group for a control source ("slow data").

    This class extends h5py.Group with methods specific to writing data
    of a control source in the European XFEL file format. The internal
    state does not depend on using any of these methods, and the
    underlying file may be manipulated by any of the regular h5py
    methods, too.
    """

    ascii_dt = h5py.string_dtype('ascii')

    def __init__(self, group_id, source):
        super().__init__(group_id)

        self.__source = source
        self.__run_group = self.file.create_group(f'RUN/{source}')

    def get_run_group(self):
        return self.__run_group

    def get_index_group(self):
        return self.file.require_group(f'INDEX/{self.__source}')

    def create_key(self, key, values, timestamps=None, run_entry=None):
        """Create datasets for a key varying each train.

        Args:
            key (str): Source key, dots are automatically replaced by
                slashes.
            values (array_like): Source values for each train.
            timestamps (array_like, optional): Timestamps for each
                source value, 0 if omitted.
            run_entry (tuple of array_like, optional): Value and
                timestamp for the corresponding value in the RUN
                section. The first entry for the train values is used if
                omitted.

        Returns:
            None
        """

        key = escape_key(key)

        if timestamps is None:
            timestamps = np.zeros_like(values, dtype=np.uint64)
        elif len(values) != len(timestamps):
            raise ValueError('values and timestamp must be the same length')

        self.create_dataset(f'{key}/value', data=values)
        self.create_dataset(f'{key}/timestamp', data=timestamps)

        if run_entry is None:
            run_entry = (values[0], timestamps[0])

        self.create_run_key(key, *run_entry)

    def create_run_key(self, key, value, timestamp=None):
        """Create datasets for a key constant over a run.

        Args:
            key (str): Source key, dots are automatically replaced by
                slashes.
            value (Any): Key value.
            timestamp (int, optional): Timestamp of the value,
                0 if omitted.

        Returns:
            None
        """

        # TODO: Some types/shapes are still not fully correct here.

        key = escape_key(key)

        if timestamp is None:
            timestamp = 0

        if isinstance(value, list):
            shape = (1, len(value))

            try:
                dtype = type(value[0])
            except IndexError:
                # Assume empty lists are string-typed.
                dtype = self.ascii_dt
        elif isinstance(value, np.ndarray):
            shape = value.shape
            dtype = value.dtype
        else:
            shape = (1,)
            dtype = type(value)

        if dtype is str:
            dtype = self.ascii_dt

        self.__run_group.create_dataset(
            f'{key}/value', data=value, shape=shape, dtype=dtype)
        self.__run_group.create_dataset(
            f'{key}/timestamp', data=timestamp, shape=(1,), dtype=np.uint64)

    def create_index(self, num_trains):
        """Create source-specific INDEX datasets.


        Depending on whether this source has train-varying data or not,
        different count/first datasets are written.

        Args:
            num_trains (int): Total number of trains in this file.

        Returns:
            None
        """

        if len(self) > 0:
            count_func = np.ones
            first_func = np.arange
        else:
            count_func = np.zeros
            first_func = np.zeros

        index_group = self.get_index_group()
        index_group.create_dataset(
            'count', data=count_func(num_trains, dtype=np.uint64))
        index_group.create_dataset(
            'first', data=first_func(num_trains, dtype=np.uint64))

create_index(num_trains)

Create source-specific INDEX datasets.

Depending on whether this source has train-varying data or not, different count/first datasets are written.

Parameters:

Name Type Description Default
num_trains int

Total number of trains in this file.

required

Returns:

Type Description

None

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_index(self, num_trains):
    """Create source-specific INDEX datasets.


    Depending on whether this source has train-varying data or not,
    different count/first datasets are written.

    Args:
        num_trains (int): Total number of trains in this file.

    Returns:
        None
    """

    if len(self) > 0:
        count_func = np.ones
        first_func = np.arange
    else:
        count_func = np.zeros
        first_func = np.zeros

    index_group = self.get_index_group()
    index_group.create_dataset(
        'count', data=count_func(num_trains, dtype=np.uint64))
    index_group.create_dataset(
        'first', data=first_func(num_trains, dtype=np.uint64))

create_key(key, values, timestamps=None, run_entry=None)

Create datasets for a key varying each train.

Parameters:

Name Type Description Default
key str

Source key, dots are automatically replaced by slashes.

required
values array_like

Source values for each train.

required
timestamps array_like

Timestamps for each source value, 0 if omitted.

None
run_entry tuple of array_like

Value and timestamp for the corresponding value in the RUN section. The first entry for the train values is used if omitted.

None

Returns:

Type Description

None

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_key(self, key, values, timestamps=None, run_entry=None):
    """Create datasets for a key varying each train.

    Args:
        key (str): Source key, dots are automatically replaced by
            slashes.
        values (array_like): Source values for each train.
        timestamps (array_like, optional): Timestamps for each
            source value, 0 if omitted.
        run_entry (tuple of array_like, optional): Value and
            timestamp for the corresponding value in the RUN
            section. The first entry for the train values is used if
            omitted.

    Returns:
        None
    """

    key = escape_key(key)

    if timestamps is None:
        timestamps = np.zeros_like(values, dtype=np.uint64)
    elif len(values) != len(timestamps):
        raise ValueError('values and timestamp must be the same length')

    self.create_dataset(f'{key}/value', data=values)
    self.create_dataset(f'{key}/timestamp', data=timestamps)

    if run_entry is None:
        run_entry = (values[0], timestamps[0])

    self.create_run_key(key, *run_entry)

create_run_key(key, value, timestamp=None)

Create datasets for a key constant over a run.

Parameters:

Name Type Description Default
key str

Source key, dots are automatically replaced by slashes.

required
value Any

Key value.

required
timestamp int

Timestamp of the value, 0 if omitted.

None

Returns:

Type Description

None

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_run_key(self, key, value, timestamp=None):
    """Create datasets for a key constant over a run.

    Args:
        key (str): Source key, dots are automatically replaced by
            slashes.
        value (Any): Key value.
        timestamp (int, optional): Timestamp of the value,
            0 if omitted.

    Returns:
        None
    """

    # TODO: Some types/shapes are still not fully correct here.

    key = escape_key(key)

    if timestamp is None:
        timestamp = 0

    if isinstance(value, list):
        shape = (1, len(value))

        try:
            dtype = type(value[0])
        except IndexError:
            # Assume empty lists are string-typed.
            dtype = self.ascii_dt
    elif isinstance(value, np.ndarray):
        shape = value.shape
        dtype = value.dtype
    else:
        shape = (1,)
        dtype = type(value)

    if dtype is str:
        dtype = self.ascii_dt

    self.__run_group.create_dataset(
        f'{key}/value', data=value, shape=shape, dtype=dtype)
    self.__run_group.create_dataset(
        f'{key}/timestamp', data=timestamp, shape=(1,), dtype=np.uint64)

DataFile

Bases: h5py.File

European XFEL HDF5 data file.

This class extends the h5py.File with methods specific to writing data in the European XFEL file format. The internal state does not depend on using any of these methods, and the underlying file may be manipulated by any of the regular h5py methods, too.

Please refer to https://extra-data.readthedocs.io/en/latest/data_format.html for details of the file format.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
class DataFile(h5py.File):
    """European XFEL HDF5 data file.

    This class extends the h5py.File with methods specific to writing
    data in the European XFEL file format. The internal state does not
    depend on using any of these methods, and the underlying file may be
    manipulated by any of the regular h5py methods, too.

    Please refer to
    https://extra-data.readthedocs.io/en/latest/data_format.html for
    details of the file format.
    """

    filename_format = '{prefix}-R{run:04d}-{aggregator}-S{sequence:05d}.h5'
    aggregator_pattern = re.compile(r'^[A-Z]{2,}\d{2}$')
    instrument_source_pattern = re.compile(r'^[\w\/-]+:[\w.]+$')

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__control_sources = set()
        self.__instrument_sources = set()
        self.__run = 0
        self.__sequence = 0

    @classmethod
    def from_details(cls, folder, aggregator, run, sequence, prefix='CORR',
                     mode='w', *args, **kwargs):
        """Open or create a file based on European XFEL details.

        This methis is a wrapper to construct the filename based its
        components.

        Args:
            folder (Path, str): Parent location for this file.
            aggregator (str): Name of the data aggregator, must satisfy
                DataFile.aggregator_pattern.
            run (int): Run number.
            sequence (int): Sequence number.
            prefix (str, optional): First filename component, 'CORR' by
                default.
            args, kwargs: Any additional arguments are passed on to
                h5py.File


        Returns:
            (DataFile) Opened file object.
        """

        if not isinstance(folder, Path):
            folder = Path(folder)

        if not cls.aggregator_pattern.match(aggregator):
            raise ValueError(f'invalid aggregator format, must satisfy '
                             f'{cls.aggregator_pattern.pattern}')

        filename = cls.filename_format.format(
            prefix=prefix, aggregator=aggregator, run=run, sequence=sequence)

        self = cls((folder / filename).resolve(), mode, *args, **kwargs)
        self.__run = run
        self.__sequence = sequence

        return self

    def create_index(self, train_ids, timestamps=None, flags=None,
                     origins=None, from_file=None):
        """Create global INDEX datasets.

        These datasets are agnostic of any source and describe the
        trains contained in this file.

        Args:
            train_ids (array_like): Train IDs contained in this file.
            timestamps (array_like, optional): Timestamp of each train,
                0 if omitted.
            flags (array_like, optional): Whether the time server is the
                initial origin of each train, 1 if omitted.
            origins (array_like, optional): Which source is the initial
                origin of each train, -1 (time server) if omitted.
            from_file (str, Path or extra_data.FileAccess, optional):
                Existing data file to take timestamps, flags and origins
                information from if present.

        Returns:
            None
        """

        if from_file is not None:
            from extra_data import FileAccess

            if not isinstance(from_file, FileAccess):
                from_file = FileAccess(from_file)

            sel_trains = np.isin(from_file.train_ids, train_ids)

            if 'INDEX/timestamp' in from_file.file:
                timestamps = from_file.file['INDEX/timestamp'][sel_trains]

            flags = from_file.validity_flag[sel_trains]

            if 'INDEX/origin' in from_file.file:
                origins = from_file.file['INDEX/origin'][sel_trains]

        self.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)

        if timestamps is None:
            timestamps = np.zeros_like(train_ids, dtype=np.uint64)
        elif len(timestamps) != len(train_ids):
            raise ValueError('timestamps and train_ids must be same length')

        self.create_dataset('INDEX/timestamp', data=timestamps,
                            dtype=np.uint64)

        if flags is None:
            flags = np.ones_like(train_ids, dtype=np.int32)
        elif len(flags) != len(train_ids):
            raise ValueError('flags and train_ids must be same length')

        self.create_dataset('INDEX/flag', data=flags, dtype=np.int32)

        if origins is None:
            origins = np.full_like(train_ids, -1, dtype=np.int32)
        elif len(origins) != len(train_ids):
            raise ValueError('origins and train_ids must be same length')

        self.create_dataset('INDEX/origin', data=origins, dtype=np.int32)

    def create_control_source(self, source):
        """Create group for a control source ("slow data").

        Control sources created via this method are not required to be
        passed to create_metadata() again.

        Args:
            source (str): Karabo device ID.

        Returns:
            (ControlSource) Created group in CONTROL.
        """

        self.__control_sources.add(source)
        return ControlSource(self.create_group(f'CONTROL/{source}').id, source)

    def create_instrument_source(self, source):
        """Create group for an instrument source ("fast data").

        Instrument sources created via this method are not required to be
        passed to create_metadata() again.

        Args:
            source (str): Karabp pipeline path, must satisfy
                DataFile.instrument_source_pattern.

        Returns:
            (InstrumentSource) Created group in INSTRUMENT.
        """

        if not self.instrument_source_pattern.match(source):
            raise ValueError(f'invalid source format, must satisfy '
                             f'{self.instrument_source_pattern.pattern}')

        self.__instrument_sources.add(source)
        return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id,
                                source)

    def create_metadata(self, like=None, *,
                        creation_date=None, update_date=None, proposal=0,
                        run=0, sequence=None, daq_library='1.x',
                        karabo_framework='2.x', control_sources=(),
                        instrument_channels=()):
        """Create METADATA datasets.

        Args:
            like (DataCollection, optional): Take proposal, run,
                daq_library, karabo_framework from an EXtra-data data
                collection, overwriting any of these arguments passed.
            creation_date (datetime, optional): Creation date and time,
                now if omitted.
            update_date (datetime, optional): Update date and time,
                now if omitted.
            proposal (int, optional): Proposal number, 0 if omitted and
                no DataCollection passed.
            run (int, optional): Run number, 0 if omitted, no
                DataCollection is passed or object not created via
                from_details.
            sequence (int, optional): Sequence number, 0 if omitted and
                object not created via from_details.
            daq_library (str, optional): daqLibrary field, '1.x' if
                omitted and no DataCollection passed.
            karabo_framework (str, optional): karaboFramework field,
                '2.x' if omitted and no DataCollection is passed.
            control_sources (Iterable, optional): Control sources in
                this file, sources created via create_control_source are
                automatically included.
            instrument_channels (Iterable, optional): Instrument
                channels (source and first component of data hash) in
                this file, channels created via create_instrument_source
                are automatically included.

        Returns:
            None
        """

        if like is not None:
            metadata = like.run_metadata()
            proposal = metadata.get('proposalNumber', proposal)
            run = metadata.get('runNumber', run)
            daq_library = metadata.get('daqLibrary', daq_library)
            karabo_framework = metadata.get('karaboFramework',
                                            karabo_framework)

        else:
            if run is None:
                run = self.__run

            if sequence is None:
                sequence = self.__sequence

        if creation_date is None:
            creation_date = datetime.fromtimestamp(0, tz=timezone.utc)
        elif creation_date is True:
            creation_date = datetime.now(timezone.utc)

        if update_date is None:
            update_date = creation_date
        elif update_date is True:
            update_date = datetime.now(timezone.utc)

        md_group = self.require_group('METADATA')
        md_group.create_dataset(
            'creationDate', shape=(1,),
            data=creation_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
        md_group.create_dataset('daqLibrary', shape=(1,),
                                data=daq_library.encode('ascii'))
        md_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.2')

        # Start with the known and specified control sources
        sources = {name: 'CONTROL'
                   for name in chain(self.__control_sources, control_sources)}

        # Add in the specified instrument data channels.
        sources.update({full_channel: 'INSTRUMENT'
                        for full_channel in instrument_channels})

        # Add in those already in the file, if not already passed.
        sources.update({f'{name}/{channel}': 'INSTRUMENT'
                        for name in self.__instrument_sources
                        for channel in self[f'INSTRUMENT/{name}']})

        source_names = sorted(sources.keys())
        data_sources_shape = (len(sources),)
        md_group.create_dataset('dataSources/dataSourceId',
                                shape=data_sources_shape,
                                data=[f'{sources[name]}/{name}'.encode('ascii')
                                      for name in source_names])
        md_group.create_dataset('dataSources/deviceId',
                                shape=data_sources_shape,
                                data=[name.encode('ascii')
                                      for name in source_names])
        md_group.create_dataset('dataSources/root', shape=data_sources_shape,
                                data=[sources[name].encode('ascii')
                                      for name in source_names])

        md_group.create_dataset(
            'karaboFramework', shape=(1,),
            data=karabo_framework.encode('ascii'))
        md_group.create_dataset(
            'proposalNumber', shape=(1,), dtype=np.uint32, data=proposal)
        md_group.create_dataset(
            'runNumber', shape=(1,), dtype=np.uint32, data=run)
        md_group.create_dataset(
            'sequenceNumber', shape=(1,), dtype=np.uint32, data=sequence)
        md_group.create_dataset(
            'updateDate', shape=(1,),
            data=update_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))

create_control_source(source)

Create group for a control source ("slow data").

Control sources created via this method are not required to be passed to create_metadata() again.

Parameters:

Name Type Description Default
source str

Karabo device ID.

required

Returns:

Type Description

(ControlSource) Created group in CONTROL.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_control_source(self, source):
    """Create group for a control source ("slow data").

    Control sources created via this method are not required to be
    passed to create_metadata() again.

    Args:
        source (str): Karabo device ID.

    Returns:
        (ControlSource) Created group in CONTROL.
    """

    self.__control_sources.add(source)
    return ControlSource(self.create_group(f'CONTROL/{source}').id, source)

create_index(train_ids, timestamps=None, flags=None, origins=None, from_file=None)

Create global INDEX datasets.

These datasets are agnostic of any source and describe the trains contained in this file.

Parameters:

Name Type Description Default
train_ids array_like

Train IDs contained in this file.

required
timestamps array_like

Timestamp of each train, 0 if omitted.

None
flags array_like

Whether the time server is the initial origin of each train, 1 if omitted.

None
origins array_like

Which source is the initial origin of each train, -1 (time server) if omitted.

None
from_file str, Path or extra_data.FileAccess

Existing data file to take timestamps, flags and origins information from if present.

None

Returns:

Type Description

None

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_index(self, train_ids, timestamps=None, flags=None,
                 origins=None, from_file=None):
    """Create global INDEX datasets.

    These datasets are agnostic of any source and describe the
    trains contained in this file.

    Args:
        train_ids (array_like): Train IDs contained in this file.
        timestamps (array_like, optional): Timestamp of each train,
            0 if omitted.
        flags (array_like, optional): Whether the time server is the
            initial origin of each train, 1 if omitted.
        origins (array_like, optional): Which source is the initial
            origin of each train, -1 (time server) if omitted.
        from_file (str, Path or extra_data.FileAccess, optional):
            Existing data file to take timestamps, flags and origins
            information from if present.

    Returns:
        None
    """

    if from_file is not None:
        from extra_data import FileAccess

        if not isinstance(from_file, FileAccess):
            from_file = FileAccess(from_file)

        sel_trains = np.isin(from_file.train_ids, train_ids)

        if 'INDEX/timestamp' in from_file.file:
            timestamps = from_file.file['INDEX/timestamp'][sel_trains]

        flags = from_file.validity_flag[sel_trains]

        if 'INDEX/origin' in from_file.file:
            origins = from_file.file['INDEX/origin'][sel_trains]

    self.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)

    if timestamps is None:
        timestamps = np.zeros_like(train_ids, dtype=np.uint64)
    elif len(timestamps) != len(train_ids):
        raise ValueError('timestamps and train_ids must be same length')

    self.create_dataset('INDEX/timestamp', data=timestamps,
                        dtype=np.uint64)

    if flags is None:
        flags = np.ones_like(train_ids, dtype=np.int32)
    elif len(flags) != len(train_ids):
        raise ValueError('flags and train_ids must be same length')

    self.create_dataset('INDEX/flag', data=flags, dtype=np.int32)

    if origins is None:
        origins = np.full_like(train_ids, -1, dtype=np.int32)
    elif len(origins) != len(train_ids):
        raise ValueError('origins and train_ids must be same length')

    self.create_dataset('INDEX/origin', data=origins, dtype=np.int32)

create_instrument_source(source)

Create group for an instrument source ("fast data").

Instrument sources created via this method are not required to be passed to create_metadata() again.

Parameters:

Name Type Description Default
source str

Karabp pipeline path, must satisfy DataFile.instrument_source_pattern.

required

Returns:

Type Description

(InstrumentSource) Created group in INSTRUMENT.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_instrument_source(self, source):
    """Create group for an instrument source ("fast data").

    Instrument sources created via this method are not required to be
    passed to create_metadata() again.

    Args:
        source (str): Karabp pipeline path, must satisfy
            DataFile.instrument_source_pattern.

    Returns:
        (InstrumentSource) Created group in INSTRUMENT.
    """

    if not self.instrument_source_pattern.match(source):
        raise ValueError(f'invalid source format, must satisfy '
                         f'{self.instrument_source_pattern.pattern}')

    self.__instrument_sources.add(source)
    return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id,
                            source)

create_metadata(like=None, *, creation_date=None, update_date=None, proposal=0, run=0, sequence=None, daq_library='1.x', karabo_framework='2.x', control_sources=(), instrument_channels=())

Create METADATA datasets.

Parameters:

Name Type Description Default
like DataCollection

Take proposal, run, daq_library, karabo_framework from an EXtra-data data collection, overwriting any of these arguments passed.

None
creation_date datetime

Creation date and time, now if omitted.

None
update_date datetime

Update date and time, now if omitted.

None
proposal int

Proposal number, 0 if omitted and no DataCollection passed.

0
run int

Run number, 0 if omitted, no DataCollection is passed or object not created via from_details.

0
sequence int

Sequence number, 0 if omitted and object not created via from_details.

None
daq_library str

daqLibrary field, '1.x' if omitted and no DataCollection passed.

'1.x'
karabo_framework str

karaboFramework field, '2.x' if omitted and no DataCollection is passed.

'2.x'
control_sources Iterable

Control sources in this file, sources created via create_control_source are automatically included.

()
instrument_channels Iterable

Instrument channels (source and first component of data hash) in this file, channels created via create_instrument_source are automatically included.

()

Returns:

Type Description

None

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_metadata(self, like=None, *,
                    creation_date=None, update_date=None, proposal=0,
                    run=0, sequence=None, daq_library='1.x',
                    karabo_framework='2.x', control_sources=(),
                    instrument_channels=()):
    """Create METADATA datasets.

    Args:
        like (DataCollection, optional): Take proposal, run,
            daq_library, karabo_framework from an EXtra-data data
            collection, overwriting any of these arguments passed.
        creation_date (datetime, optional): Creation date and time,
            now if omitted.
        update_date (datetime, optional): Update date and time,
            now if omitted.
        proposal (int, optional): Proposal number, 0 if omitted and
            no DataCollection passed.
        run (int, optional): Run number, 0 if omitted, no
            DataCollection is passed or object not created via
            from_details.
        sequence (int, optional): Sequence number, 0 if omitted and
            object not created via from_details.
        daq_library (str, optional): daqLibrary field, '1.x' if
            omitted and no DataCollection passed.
        karabo_framework (str, optional): karaboFramework field,
            '2.x' if omitted and no DataCollection is passed.
        control_sources (Iterable, optional): Control sources in
            this file, sources created via create_control_source are
            automatically included.
        instrument_channels (Iterable, optional): Instrument
            channels (source and first component of data hash) in
            this file, channels created via create_instrument_source
            are automatically included.

    Returns:
        None
    """

    if like is not None:
        metadata = like.run_metadata()
        proposal = metadata.get('proposalNumber', proposal)
        run = metadata.get('runNumber', run)
        daq_library = metadata.get('daqLibrary', daq_library)
        karabo_framework = metadata.get('karaboFramework',
                                        karabo_framework)

    else:
        if run is None:
            run = self.__run

        if sequence is None:
            sequence = self.__sequence

    if creation_date is None:
        creation_date = datetime.fromtimestamp(0, tz=timezone.utc)
    elif creation_date is True:
        creation_date = datetime.now(timezone.utc)

    if update_date is None:
        update_date = creation_date
    elif update_date is True:
        update_date = datetime.now(timezone.utc)

    md_group = self.require_group('METADATA')
    md_group.create_dataset(
        'creationDate', shape=(1,),
        data=creation_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
    md_group.create_dataset('daqLibrary', shape=(1,),
                            data=daq_library.encode('ascii'))
    md_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.2')

    # Start with the known and specified control sources
    sources = {name: 'CONTROL'
               for name in chain(self.__control_sources, control_sources)}

    # Add in the specified instrument data channels.
    sources.update({full_channel: 'INSTRUMENT'
                    for full_channel in instrument_channels})

    # Add in those already in the file, if not already passed.
    sources.update({f'{name}/{channel}': 'INSTRUMENT'
                    for name in self.__instrument_sources
                    for channel in self[f'INSTRUMENT/{name}']})

    source_names = sorted(sources.keys())
    data_sources_shape = (len(sources),)
    md_group.create_dataset('dataSources/dataSourceId',
                            shape=data_sources_shape,
                            data=[f'{sources[name]}/{name}'.encode('ascii')
                                  for name in source_names])
    md_group.create_dataset('dataSources/deviceId',
                            shape=data_sources_shape,
                            data=[name.encode('ascii')
                                  for name in source_names])
    md_group.create_dataset('dataSources/root', shape=data_sources_shape,
                            data=[sources[name].encode('ascii')
                                  for name in source_names])

    md_group.create_dataset(
        'karaboFramework', shape=(1,),
        data=karabo_framework.encode('ascii'))
    md_group.create_dataset(
        'proposalNumber', shape=(1,), dtype=np.uint32, data=proposal)
    md_group.create_dataset(
        'runNumber', shape=(1,), dtype=np.uint32, data=run)
    md_group.create_dataset(
        'sequenceNumber', shape=(1,), dtype=np.uint32, data=sequence)
    md_group.create_dataset(
        'updateDate', shape=(1,),
        data=update_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))

from_details(folder, aggregator, run, sequence, prefix='CORR', mode='w', *args, **kwargs) classmethod

Open or create a file based on European XFEL details.

This methis is a wrapper to construct the filename based its components.

Parameters:

Name Type Description Default
folder Path, str

Parent location for this file.

required
aggregator str

Name of the data aggregator, must satisfy DataFile.aggregator_pattern.

required
run int

Run number.

required
sequence int

Sequence number.

required
prefix str

First filename component, 'CORR' by default.

'CORR'
args, kwargs

Any additional arguments are passed on to h5py.File

required

Returns:

Type Description

(DataFile) Opened file object.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
@classmethod
def from_details(cls, folder, aggregator, run, sequence, prefix='CORR',
                 mode='w', *args, **kwargs):
    """Open or create a file based on European XFEL details.

    This methis is a wrapper to construct the filename based its
    components.

    Args:
        folder (Path, str): Parent location for this file.
        aggregator (str): Name of the data aggregator, must satisfy
            DataFile.aggregator_pattern.
        run (int): Run number.
        sequence (int): Sequence number.
        prefix (str, optional): First filename component, 'CORR' by
            default.
        args, kwargs: Any additional arguments are passed on to
            h5py.File


    Returns:
        (DataFile) Opened file object.
    """

    if not isinstance(folder, Path):
        folder = Path(folder)

    if not cls.aggregator_pattern.match(aggregator):
        raise ValueError(f'invalid aggregator format, must satisfy '
                         f'{cls.aggregator_pattern.pattern}')

    filename = cls.filename_format.format(
        prefix=prefix, aggregator=aggregator, run=run, sequence=sequence)

    self = cls((folder / filename).resolve(), mode, *args, **kwargs)
    self.__run = run
    self.__sequence = sequence

    return self

InstrumentSource

Bases: h5py.Group

Group for an instrument source ("fast data").

This class extends h5py.Group with methods specific to writing data of a control source in the European XFEL file format. The internal state does not depend on using any of these methods, and the underlying file may be manipulated by any of the regular h5py methods, too.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
class InstrumentSource(h5py.Group):
    """Group for an instrument source ("fast data").

    This class extends h5py.Group with methods specific to writing data
    of a control source in the European XFEL file format. The internal
    state does not depend on using any of these methods, and the
    underlying file may be manipulated by any of the regular h5py
    methods, too.
    """

    key_pattern = re.compile(r'^\w+\/[\w\/]+$')

    def __init__(self, group_id, source):
        super().__init__(group_id)

        self.__source = source

    def get_index_group(self, channel):
        return self.file.require_group(f'INDEX/{self.__source}/{channel}')

    def create_key(self, key, data=None, **kwargs):
        """Create dataset for a key.

        Args:
            key (str): Source key, dots are automatically replaced by
                slashes.
            data (array_like, optional): Key data to initialize the
                dataset to.
            kwargs: Any additional keyword arguments are passed to
                create_dataset.

        Returns:
            (h5py.Dataset) Created dataset
        """

        key = escape_key(key)

        if not self.key_pattern.match(key):
            raise ValueError(f'invalid key format, must satisfy '
                             f'{self.key_pattern.pattern}')

        return self.create_dataset(key, data=data, **kwargs)

    def create_compressed_key(self, key, data, comp_threads=8):
        """Create a compressed dataset for a key.

        This method makes use of lower-level access in h5py to compress
        the data separately in multiple threads and write it directly to
        file rather than go through HDF's compression filters.

        Args:
            key (str): Source key, dots are automatically replaced by
                slashes.
            data (np.ndarray): Key data.ss
            comp_threads (int, optional): Number of threads to use for
                compression, 8 by default.

        Returns:
            (h5py.Dataset) Created dataset
        """

        key = escape_key(key)

        if not self.key_pattern.match(key):
            raise ValueError(f'invalid key format, must satisfy '
                             f'{self.key_pattern.pattern}')

        from cal_tools.tools import write_compressed_frames
        return write_compressed_frames(data, self, key,
                                       comp_threads=comp_threads)

    def create_index(self, *args, **channels):
        """Create source-specific INDEX datasets.

        Instrument data is indexed by channel, which is the first
        component in its key. If channels have already been created, the
        index may be applied to all channels by passing them as a
        positional argument.
        """

        if not channels:
            try:
                count = int(args[0])
            except IndexError:
                raise ValueError('positional arguments required if no '
                                 'explicit channels are passed') from None
                # Allow ValueError to propagate directly.

            channels = {channel: count for channel in self}

        for channel, count in channels.items():
            index_group = self.get_index_group(channel)
            index_group.create_dataset('count', data=count, dtype=np.uint64)
            index_group.create_dataset(
                'first', data=get_pulse_offsets(count), dtype=np.uint64)

create_compressed_key(key, data, comp_threads=8)

Create a compressed dataset for a key.

This method makes use of lower-level access in h5py to compress the data separately in multiple threads and write it directly to file rather than go through HDF's compression filters.

Parameters:

Name Type Description Default
key str

Source key, dots are automatically replaced by slashes.

required
data np.ndarray

Key data.ss

required
comp_threads int

Number of threads to use for compression, 8 by default.

8

Returns:

Type Description

(h5py.Dataset) Created dataset

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_compressed_key(self, key, data, comp_threads=8):
    """Create a compressed dataset for a key.

    This method makes use of lower-level access in h5py to compress
    the data separately in multiple threads and write it directly to
    file rather than go through HDF's compression filters.

    Args:
        key (str): Source key, dots are automatically replaced by
            slashes.
        data (np.ndarray): Key data.ss
        comp_threads (int, optional): Number of threads to use for
            compression, 8 by default.

    Returns:
        (h5py.Dataset) Created dataset
    """

    key = escape_key(key)

    if not self.key_pattern.match(key):
        raise ValueError(f'invalid key format, must satisfy '
                         f'{self.key_pattern.pattern}')

    from cal_tools.tools import write_compressed_frames
    return write_compressed_frames(data, self, key,
                                   comp_threads=comp_threads)

create_index(*args, **channels)

Create source-specific INDEX datasets.

Instrument data is indexed by channel, which is the first component in its key. If channels have already been created, the index may be applied to all channels by passing them as a positional argument.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_index(self, *args, **channels):
    """Create source-specific INDEX datasets.

    Instrument data is indexed by channel, which is the first
    component in its key. If channels have already been created, the
    index may be applied to all channels by passing them as a
    positional argument.
    """

    if not channels:
        try:
            count = int(args[0])
        except IndexError:
            raise ValueError('positional arguments required if no '
                             'explicit channels are passed') from None
            # Allow ValueError to propagate directly.

        channels = {channel: count for channel in self}

    for channel, count in channels.items():
        index_group = self.get_index_group(channel)
        index_group.create_dataset('count', data=count, dtype=np.uint64)
        index_group.create_dataset(
            'first', data=get_pulse_offsets(count), dtype=np.uint64)

create_key(key, data=None, **kwargs)

Create dataset for a key.

Parameters:

Name Type Description Default
key str

Source key, dots are automatically replaced by slashes.

required
data array_like

Key data to initialize the dataset to.

None
kwargs

Any additional keyword arguments are passed to create_dataset.

{}

Returns:

Type Description

(h5py.Dataset) Created dataset

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_key(self, key, data=None, **kwargs):
    """Create dataset for a key.

    Args:
        key (str): Source key, dots are automatically replaced by
            slashes.
        data (array_like, optional): Key data to initialize the
            dataset to.
        kwargs: Any additional keyword arguments are passed to
            create_dataset.

    Returns:
        (h5py.Dataset) Created dataset
    """

    key = escape_key(key)

    if not self.key_pattern.match(key):
        raise ValueError(f'invalid key format, must satisfy '
                         f'{self.key_pattern.pattern}')

    return self.create_dataset(key, data=data, **kwargs)

escape_key(key)

Escapes a key name from Karabo to HDF notation.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def escape_key(key):
    """Escapes a key name from Karabo to HDF notation."""
    return key.replace('.', '/')

get_pulse_offsets(pulses_per_train)

Compute pulse offsets from pulse counts.

Given an array of number of pulses per train (INDEX//count), computes the offsets (INDEX//first) for the first pulse of a train inthe data array.

Parameters:

Name Type Description Default
pulses_per_train array_like

Pulse count per train.

required

Returns:

Type Description

(array_like) Offet of first pulse for each train.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def get_pulse_offsets(pulses_per_train):
    """Compute pulse offsets from pulse counts.

    Given an array of number of pulses per train (INDEX/<source>/count),
    computes the offsets (INDEX/<source>/first) for the first pulse of a
    train inthe data array.

    Args:
        pulses_per_train (array_like): Pulse count per train.

    Returns:
        (array_like) Offet of first pulse for each train.
    """

    pulse_offsets = np.zeros_like(pulses_per_train)
    np.cumsum(pulses_per_train[:-1], out=pulse_offsets[1:])

    return pulse_offsets

sequence_pulses(train_ids, pulses_per_train=1, pulse_offsets=None, trains_per_sequence=256)

Split trains into sequences.

Parameters:

Name Type Description Default
train_ids array_like

Train IDs to sequence.

required
pulses_per_train int or array_like

Pulse count per train. If scalar, it is assumed to be constant for all trains. If omitted, it is 1 by default.

1
pulse_offsets array_like

Offsets for the first pulse in each train, computed from pulses_per_train if omitted.

None
trains_per_sequence int

Number of trains per sequence, 256 by default.

256

Yields:

Type Description

(int, array_like, array_like) Current sequence ID, train mask, pulse mask.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def sequence_pulses(train_ids, pulses_per_train=1, pulse_offsets=None,
                    trains_per_sequence=256):
    """Split trains into sequences.

    Args:
        train_ids (array_like): Train IDs to sequence.
        pulses_per_train (int or array_like, optional): Pulse count per
            train. If scalar, it is assumed to be constant for all
            trains. If omitted, it is 1 by default.
        pulse_offsets (array_like, optional): Offsets for the first
            pulse in each train, computed from pulses_per_train if
            omitted.
        trains_per_sequence (int, optional): Number of trains
            per sequence, 256 by default.

    Yields:
        (int, array_like, array_like)
            Current sequence ID, train mask, pulse mask.
    """

    if isinstance(pulses_per_train, Integral):
        pulses_per_train = np.full_like(train_ids, pulses_per_train,
                                        dtype=np.uint64)

    if pulse_offsets is None:
        pulse_offsets = get_pulse_offsets(pulses_per_train)

    for seq_id, train_mask in sequence_trains(train_ids, trains_per_sequence):
        start = train_mask.start
        stop = train_mask.stop - 1
        pulse_mask = np.s_[
            pulse_offsets[start]:pulse_offsets[stop]+pulses_per_train[stop]]

        yield seq_id, train_mask, pulse_mask

sequence_trains(train_ids, trains_per_sequence=256)

Iterate over sequences for a list of trains.

For pulse-resolved data, sequence_pulses may be used instead.

Parameters:

Name Type Description Default
train_ids array_like

Train IDs to sequence.

required
trains_per_sequence int

Number of trains per sequence, 256 by default.

256

Yields:

Type Description

(int, slice) Current sequence ID, train mask.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def sequence_trains(train_ids, trains_per_sequence=256):
    """Iterate over sequences for a list of trains.

    For pulse-resolved data, sequence_pulses may be used instead.

    Args:
        train_ids (array_like): Train IDs to sequence.
        trains_per_sequence (int, optional): Number of trains
            per sequence, 256 by default.

    Yields:
        (int, slice) Current sequence ID, train mask.
    """

    num_trains = len(train_ids)

    for seq_id, start in enumerate(range(0, num_trains, trains_per_sequence)):
        train_mask = slice(
            *np.s_[start:start+trains_per_sequence].indices(num_trains))
        yield seq_id, train_mask