files
ControlSource
¶
Bases: h5py.Group
Group for a control source ("slow data").
This class extends h5py.Group with methods specific to writing data of a control source in the European XFEL file format. The internal state does not depend on using any of these methods, and the underlying file may be manipulated by any of the regular h5py methods, too.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
class ControlSource(h5py.Group):
"""Group for a control source ("slow data").
This class extends h5py.Group with methods specific to writing data
of a control source in the European XFEL file format. The internal
state does not depend on using any of these methods, and the
underlying file may be manipulated by any of the regular h5py
methods, too.
"""
ascii_dt = h5py.string_dtype('ascii')
def __init__(self, group_id, source):
super().__init__(group_id)
self.__source = source
self.__run_group = self.file.create_group(f'RUN/{source}')
def get_run_group(self):
return self.__run_group
def get_index_group(self):
return self.file.require_group(f'INDEX/{self.__source}')
def create_key(self, key, values, timestamps=None, run_entry=None):
"""Create datasets for a key varying each train.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
values (array_like): Source values for each train.
timestamps (array_like, optional): Timestamps for each
source value, 0 if omitted.
run_entry (tuple of array_like, optional): Value and
timestamp for the corresponding value in the RUN
section. The first entry for the train values is used if
omitted.
Returns:
None
"""
key = escape_key(key)
if timestamps is None:
timestamps = np.zeros_like(values, dtype=np.uint64)
elif len(values) != len(timestamps):
raise ValueError('values and timestamp must be the same length')
self.create_dataset(f'{key}/value', data=values)
self.create_dataset(f'{key}/timestamp', data=timestamps)
if run_entry is None:
run_entry = (values[0], timestamps[0])
self.create_run_key(key, *run_entry)
def create_run_key(self, key, value, timestamp=None):
"""Create datasets for a key constant over a run.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
value (Any): Key value.
timestamp (int, optional): Timestamp of the value,
0 if omitted.
Returns:
None
"""
# TODO: Some types/shapes are still not fully correct here.
key = escape_key(key)
if timestamp is None:
timestamp = 0
if isinstance(value, list):
shape = (1, len(value))
try:
dtype = type(value[0])
except IndexError:
# Assume empty lists are string-typed.
dtype = self.ascii_dt
elif isinstance(value, np.ndarray):
shape = value.shape
dtype = value.dtype
else:
shape = (1,)
dtype = type(value)
if dtype is str:
dtype = self.ascii_dt
self.__run_group.create_dataset(
f'{key}/value', data=value, shape=shape, dtype=dtype)
self.__run_group.create_dataset(
f'{key}/timestamp', data=timestamp, shape=(1,), dtype=np.uint64)
def create_index(self, num_trains):
"""Create source-specific INDEX datasets.
Depending on whether this source has train-varying data or not,
different count/first datasets are written.
Args:
num_trains (int): Total number of trains in this file.
Returns:
None
"""
if len(self) > 0:
count_func = np.ones
first_func = np.arange
else:
count_func = np.zeros
first_func = np.zeros
index_group = self.get_index_group()
index_group.create_dataset(
'count', data=count_func(num_trains, dtype=np.uint64))
index_group.create_dataset(
'first', data=first_func(num_trains, dtype=np.uint64))
create_index(num_trains)
¶
Create source-specific INDEX datasets.
Depending on whether this source has train-varying data or not, different count/first datasets are written.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_trains |
int
|
Total number of trains in this file. |
required |
Returns:
Type | Description |
---|---|
None |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_index(self, num_trains):
"""Create source-specific INDEX datasets.
Depending on whether this source has train-varying data or not,
different count/first datasets are written.
Args:
num_trains (int): Total number of trains in this file.
Returns:
None
"""
if len(self) > 0:
count_func = np.ones
first_func = np.arange
else:
count_func = np.zeros
first_func = np.zeros
index_group = self.get_index_group()
index_group.create_dataset(
'count', data=count_func(num_trains, dtype=np.uint64))
index_group.create_dataset(
'first', data=first_func(num_trains, dtype=np.uint64))
create_key(key, values, timestamps=None, run_entry=None)
¶
Create datasets for a key varying each train.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Source key, dots are automatically replaced by slashes. |
required |
values |
array_like
|
Source values for each train. |
required |
timestamps |
array_like
|
Timestamps for each source value, 0 if omitted. |
None
|
run_entry |
tuple of array_like
|
Value and timestamp for the corresponding value in the RUN section. The first entry for the train values is used if omitted. |
None
|
Returns:
Type | Description |
---|---|
None |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_key(self, key, values, timestamps=None, run_entry=None):
"""Create datasets for a key varying each train.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
values (array_like): Source values for each train.
timestamps (array_like, optional): Timestamps for each
source value, 0 if omitted.
run_entry (tuple of array_like, optional): Value and
timestamp for the corresponding value in the RUN
section. The first entry for the train values is used if
omitted.
Returns:
None
"""
key = escape_key(key)
if timestamps is None:
timestamps = np.zeros_like(values, dtype=np.uint64)
elif len(values) != len(timestamps):
raise ValueError('values and timestamp must be the same length')
self.create_dataset(f'{key}/value', data=values)
self.create_dataset(f'{key}/timestamp', data=timestamps)
if run_entry is None:
run_entry = (values[0], timestamps[0])
self.create_run_key(key, *run_entry)
create_run_key(key, value, timestamp=None)
¶
Create datasets for a key constant over a run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Source key, dots are automatically replaced by slashes. |
required |
value |
Any
|
Key value. |
required |
timestamp |
int
|
Timestamp of the value, 0 if omitted. |
None
|
Returns:
Type | Description |
---|---|
None |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_run_key(self, key, value, timestamp=None):
"""Create datasets for a key constant over a run.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
value (Any): Key value.
timestamp (int, optional): Timestamp of the value,
0 if omitted.
Returns:
None
"""
# TODO: Some types/shapes are still not fully correct here.
key = escape_key(key)
if timestamp is None:
timestamp = 0
if isinstance(value, list):
shape = (1, len(value))
try:
dtype = type(value[0])
except IndexError:
# Assume empty lists are string-typed.
dtype = self.ascii_dt
elif isinstance(value, np.ndarray):
shape = value.shape
dtype = value.dtype
else:
shape = (1,)
dtype = type(value)
if dtype is str:
dtype = self.ascii_dt
self.__run_group.create_dataset(
f'{key}/value', data=value, shape=shape, dtype=dtype)
self.__run_group.create_dataset(
f'{key}/timestamp', data=timestamp, shape=(1,), dtype=np.uint64)
DataFile
¶
Bases: h5py.File
European XFEL HDF5 data file.
This class extends the h5py.File with methods specific to writing data in the European XFEL file format. The internal state does not depend on using any of these methods, and the underlying file may be manipulated by any of the regular h5py methods, too.
Please refer to https://extra-data.readthedocs.io/en/latest/data_format.html for details of the file format.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
class DataFile(h5py.File):
"""European XFEL HDF5 data file.
This class extends the h5py.File with methods specific to writing
data in the European XFEL file format. The internal state does not
depend on using any of these methods, and the underlying file may be
manipulated by any of the regular h5py methods, too.
Please refer to
https://extra-data.readthedocs.io/en/latest/data_format.html for
details of the file format.
"""
filename_format = '{prefix}-R{run:04d}-{aggregator}-S{sequence:05d}.h5'
aggregator_pattern = re.compile(r'^[A-Z]{2,}\d{2}$')
instrument_source_pattern = re.compile(r'^[\w\/-]+:[\w.]+$')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__control_sources = set()
self.__instrument_sources = set()
self.__run = 0
self.__sequence = 0
@classmethod
def from_details(cls, folder, aggregator, run, sequence, prefix='CORR',
mode='w', *args, **kwargs):
"""Open or create a file based on European XFEL details.
This methis is a wrapper to construct the filename based its
components.
Args:
folder (Path, str): Parent location for this file.
aggregator (str): Name of the data aggregator, must satisfy
DataFile.aggregator_pattern.
run (int): Run number.
sequence (int): Sequence number.
prefix (str, optional): First filename component, 'CORR' by
default.
args, kwargs: Any additional arguments are passed on to
h5py.File
Returns:
(DataFile) Opened file object.
"""
if not isinstance(folder, Path):
folder = Path(folder)
if not cls.aggregator_pattern.match(aggregator):
raise ValueError(f'invalid aggregator format, must satisfy '
f'{cls.aggregator_pattern.pattern}')
filename = cls.filename_format.format(
prefix=prefix, aggregator=aggregator, run=run, sequence=sequence)
self = cls((folder / filename).resolve(), mode, *args, **kwargs)
self.__run = run
self.__sequence = sequence
return self
def create_index(self, train_ids, timestamps=None, flags=None,
origins=None, from_file=None):
"""Create global INDEX datasets.
These datasets are agnostic of any source and describe the
trains contained in this file.
Args:
train_ids (array_like): Train IDs contained in this file.
timestamps (array_like, optional): Timestamp of each train,
0 if omitted.
flags (array_like, optional): Whether the time server is the
initial origin of each train, 1 if omitted.
origins (array_like, optional): Which source is the initial
origin of each train, -1 (time server) if omitted.
from_file (str, Path or extra_data.FileAccess, optional):
Existing data file to take timestamps, flags and origins
information from if present.
Returns:
None
"""
if from_file is not None:
from extra_data import FileAccess
if not isinstance(from_file, FileAccess):
from_file = FileAccess(from_file)
sel_trains = np.isin(from_file.train_ids, train_ids)
if 'INDEX/timestamp' in from_file.file:
timestamps = from_file.file['INDEX/timestamp'][sel_trains]
flags = from_file.validity_flag[sel_trains]
if 'INDEX/origin' in from_file.file:
origins = from_file.file['INDEX/origin'][sel_trains]
self.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)
if timestamps is None:
timestamps = np.zeros_like(train_ids, dtype=np.uint64)
elif len(timestamps) != len(train_ids):
raise ValueError('timestamps and train_ids must be same length')
self.create_dataset('INDEX/timestamp', data=timestamps,
dtype=np.uint64)
if flags is None:
flags = np.ones_like(train_ids, dtype=np.int32)
elif len(flags) != len(train_ids):
raise ValueError('flags and train_ids must be same length')
self.create_dataset('INDEX/flag', data=flags, dtype=np.int32)
if origins is None:
origins = np.full_like(train_ids, -1, dtype=np.int32)
elif len(origins) != len(train_ids):
raise ValueError('origins and train_ids must be same length')
self.create_dataset('INDEX/origin', data=origins, dtype=np.int32)
def create_control_source(self, source):
"""Create group for a control source ("slow data").
Control sources created via this method are not required to be
passed to create_metadata() again.
Args:
source (str): Karabo device ID.
Returns:
(ControlSource) Created group in CONTROL.
"""
self.__control_sources.add(source)
return ControlSource(self.create_group(f'CONTROL/{source}').id, source)
def create_instrument_source(self, source):
"""Create group for an instrument source ("fast data").
Instrument sources created via this method are not required to be
passed to create_metadata() again.
Args:
source (str): Karabp pipeline path, must satisfy
DataFile.instrument_source_pattern.
Returns:
(InstrumentSource) Created group in INSTRUMENT.
"""
if not self.instrument_source_pattern.match(source):
raise ValueError(f'invalid source format, must satisfy '
f'{self.instrument_source_pattern.pattern}')
self.__instrument_sources.add(source)
return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id,
source)
def create_metadata(self, like=None, *,
creation_date=None, update_date=None, proposal=0,
run=0, sequence=None, daq_library='1.x',
karabo_framework='2.x', control_sources=(),
instrument_channels=()):
"""Create METADATA datasets.
Args:
like (DataCollection, optional): Take proposal, run,
daq_library, karabo_framework from an EXtra-data data
collection, overwriting any of these arguments passed.
creation_date (datetime, optional): Creation date and time,
now if omitted.
update_date (datetime, optional): Update date and time,
now if omitted.
proposal (int, optional): Proposal number, 0 if omitted and
no DataCollection passed.
run (int, optional): Run number, 0 if omitted, no
DataCollection is passed or object not created via
from_details.
sequence (int, optional): Sequence number, 0 if omitted and
object not created via from_details.
daq_library (str, optional): daqLibrary field, '1.x' if
omitted and no DataCollection passed.
karabo_framework (str, optional): karaboFramework field,
'2.x' if omitted and no DataCollection is passed.
control_sources (Iterable, optional): Control sources in
this file, sources created via create_control_source are
automatically included.
instrument_channels (Iterable, optional): Instrument
channels (source and first component of data hash) in
this file, channels created via create_instrument_source
are automatically included.
Returns:
None
"""
if like is not None:
metadata = like.run_metadata()
proposal = metadata.get('proposalNumber', proposal)
run = metadata.get('runNumber', run)
daq_library = metadata.get('daqLibrary', daq_library)
karabo_framework = metadata.get('karaboFramework',
karabo_framework)
else:
if run is None:
run = self.__run
if sequence is None:
sequence = self.__sequence
if creation_date is None:
creation_date = datetime.fromtimestamp(0, tz=timezone.utc)
elif creation_date is True:
creation_date = datetime.now(timezone.utc)
if update_date is None:
update_date = creation_date
elif update_date is True:
update_date = datetime.now(timezone.utc)
md_group = self.require_group('METADATA')
md_group.create_dataset(
'creationDate', shape=(1,),
data=creation_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
md_group.create_dataset('daqLibrary', shape=(1,),
data=daq_library.encode('ascii'))
md_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.2')
# Start with the known and specified control sources
sources = {name: 'CONTROL'
for name in chain(self.__control_sources, control_sources)}
# Add in the specified instrument data channels.
sources.update({full_channel: 'INSTRUMENT'
for full_channel in instrument_channels})
# Add in those already in the file, if not already passed.
sources.update({f'{name}/{channel}': 'INSTRUMENT'
for name in self.__instrument_sources
for channel in self[f'INSTRUMENT/{name}']})
source_names = sorted(sources.keys())
data_sources_shape = (len(sources),)
md_group.create_dataset('dataSources/dataSourceId',
shape=data_sources_shape,
data=[f'{sources[name]}/{name}'.encode('ascii')
for name in source_names])
md_group.create_dataset('dataSources/deviceId',
shape=data_sources_shape,
data=[name.encode('ascii')
for name in source_names])
md_group.create_dataset('dataSources/root', shape=data_sources_shape,
data=[sources[name].encode('ascii')
for name in source_names])
md_group.create_dataset(
'karaboFramework', shape=(1,),
data=karabo_framework.encode('ascii'))
md_group.create_dataset(
'proposalNumber', shape=(1,), dtype=np.uint32, data=proposal)
md_group.create_dataset(
'runNumber', shape=(1,), dtype=np.uint32, data=run)
md_group.create_dataset(
'sequenceNumber', shape=(1,), dtype=np.uint32, data=sequence)
md_group.create_dataset(
'updateDate', shape=(1,),
data=update_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
create_control_source(source)
¶
Create group for a control source ("slow data").
Control sources created via this method are not required to be passed to create_metadata() again.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str
|
Karabo device ID. |
required |
Returns:
Type | Description |
---|---|
(ControlSource) Created group in CONTROL. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_control_source(self, source):
"""Create group for a control source ("slow data").
Control sources created via this method are not required to be
passed to create_metadata() again.
Args:
source (str): Karabo device ID.
Returns:
(ControlSource) Created group in CONTROL.
"""
self.__control_sources.add(source)
return ControlSource(self.create_group(f'CONTROL/{source}').id, source)
create_index(train_ids, timestamps=None, flags=None, origins=None, from_file=None)
¶
Create global INDEX datasets.
These datasets are agnostic of any source and describe the trains contained in this file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
train_ids |
array_like
|
Train IDs contained in this file. |
required |
timestamps |
array_like
|
Timestamp of each train, 0 if omitted. |
None
|
flags |
array_like
|
Whether the time server is the initial origin of each train, 1 if omitted. |
None
|
origins |
array_like
|
Which source is the initial origin of each train, -1 (time server) if omitted. |
None
|
from_file |
str, Path or extra_data.FileAccess
|
Existing data file to take timestamps, flags and origins information from if present. |
None
|
Returns:
Type | Description |
---|---|
None |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_index(self, train_ids, timestamps=None, flags=None,
origins=None, from_file=None):
"""Create global INDEX datasets.
These datasets are agnostic of any source and describe the
trains contained in this file.
Args:
train_ids (array_like): Train IDs contained in this file.
timestamps (array_like, optional): Timestamp of each train,
0 if omitted.
flags (array_like, optional): Whether the time server is the
initial origin of each train, 1 if omitted.
origins (array_like, optional): Which source is the initial
origin of each train, -1 (time server) if omitted.
from_file (str, Path or extra_data.FileAccess, optional):
Existing data file to take timestamps, flags and origins
information from if present.
Returns:
None
"""
if from_file is not None:
from extra_data import FileAccess
if not isinstance(from_file, FileAccess):
from_file = FileAccess(from_file)
sel_trains = np.isin(from_file.train_ids, train_ids)
if 'INDEX/timestamp' in from_file.file:
timestamps = from_file.file['INDEX/timestamp'][sel_trains]
flags = from_file.validity_flag[sel_trains]
if 'INDEX/origin' in from_file.file:
origins = from_file.file['INDEX/origin'][sel_trains]
self.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)
if timestamps is None:
timestamps = np.zeros_like(train_ids, dtype=np.uint64)
elif len(timestamps) != len(train_ids):
raise ValueError('timestamps and train_ids must be same length')
self.create_dataset('INDEX/timestamp', data=timestamps,
dtype=np.uint64)
if flags is None:
flags = np.ones_like(train_ids, dtype=np.int32)
elif len(flags) != len(train_ids):
raise ValueError('flags and train_ids must be same length')
self.create_dataset('INDEX/flag', data=flags, dtype=np.int32)
if origins is None:
origins = np.full_like(train_ids, -1, dtype=np.int32)
elif len(origins) != len(train_ids):
raise ValueError('origins and train_ids must be same length')
self.create_dataset('INDEX/origin', data=origins, dtype=np.int32)
create_instrument_source(source)
¶
Create group for an instrument source ("fast data").
Instrument sources created via this method are not required to be passed to create_metadata() again.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str
|
Karabp pipeline path, must satisfy DataFile.instrument_source_pattern. |
required |
Returns:
Type | Description |
---|---|
(InstrumentSource) Created group in INSTRUMENT. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_instrument_source(self, source):
"""Create group for an instrument source ("fast data").
Instrument sources created via this method are not required to be
passed to create_metadata() again.
Args:
source (str): Karabp pipeline path, must satisfy
DataFile.instrument_source_pattern.
Returns:
(InstrumentSource) Created group in INSTRUMENT.
"""
if not self.instrument_source_pattern.match(source):
raise ValueError(f'invalid source format, must satisfy '
f'{self.instrument_source_pattern.pattern}')
self.__instrument_sources.add(source)
return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id,
source)
create_metadata(like=None, *, creation_date=None, update_date=None, proposal=0, run=0, sequence=None, daq_library='1.x', karabo_framework='2.x', control_sources=(), instrument_channels=())
¶
Create METADATA datasets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
like |
DataCollection
|
Take proposal, run, daq_library, karabo_framework from an EXtra-data data collection, overwriting any of these arguments passed. |
None
|
creation_date |
datetime
|
Creation date and time, now if omitted. |
None
|
update_date |
datetime
|
Update date and time, now if omitted. |
None
|
proposal |
int
|
Proposal number, 0 if omitted and no DataCollection passed. |
0
|
run |
int
|
Run number, 0 if omitted, no DataCollection is passed or object not created via from_details. |
0
|
sequence |
int
|
Sequence number, 0 if omitted and object not created via from_details. |
None
|
daq_library |
str
|
daqLibrary field, '1.x' if omitted and no DataCollection passed. |
'1.x'
|
karabo_framework |
str
|
karaboFramework field, '2.x' if omitted and no DataCollection is passed. |
'2.x'
|
control_sources |
Iterable
|
Control sources in this file, sources created via create_control_source are automatically included. |
()
|
instrument_channels |
Iterable
|
Instrument channels (source and first component of data hash) in this file, channels created via create_instrument_source are automatically included. |
()
|
Returns:
Type | Description |
---|---|
None |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_metadata(self, like=None, *,
creation_date=None, update_date=None, proposal=0,
run=0, sequence=None, daq_library='1.x',
karabo_framework='2.x', control_sources=(),
instrument_channels=()):
"""Create METADATA datasets.
Args:
like (DataCollection, optional): Take proposal, run,
daq_library, karabo_framework from an EXtra-data data
collection, overwriting any of these arguments passed.
creation_date (datetime, optional): Creation date and time,
now if omitted.
update_date (datetime, optional): Update date and time,
now if omitted.
proposal (int, optional): Proposal number, 0 if omitted and
no DataCollection passed.
run (int, optional): Run number, 0 if omitted, no
DataCollection is passed or object not created via
from_details.
sequence (int, optional): Sequence number, 0 if omitted and
object not created via from_details.
daq_library (str, optional): daqLibrary field, '1.x' if
omitted and no DataCollection passed.
karabo_framework (str, optional): karaboFramework field,
'2.x' if omitted and no DataCollection is passed.
control_sources (Iterable, optional): Control sources in
this file, sources created via create_control_source are
automatically included.
instrument_channels (Iterable, optional): Instrument
channels (source and first component of data hash) in
this file, channels created via create_instrument_source
are automatically included.
Returns:
None
"""
if like is not None:
metadata = like.run_metadata()
proposal = metadata.get('proposalNumber', proposal)
run = metadata.get('runNumber', run)
daq_library = metadata.get('daqLibrary', daq_library)
karabo_framework = metadata.get('karaboFramework',
karabo_framework)
else:
if run is None:
run = self.__run
if sequence is None:
sequence = self.__sequence
if creation_date is None:
creation_date = datetime.fromtimestamp(0, tz=timezone.utc)
elif creation_date is True:
creation_date = datetime.now(timezone.utc)
if update_date is None:
update_date = creation_date
elif update_date is True:
update_date = datetime.now(timezone.utc)
md_group = self.require_group('METADATA')
md_group.create_dataset(
'creationDate', shape=(1,),
data=creation_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
md_group.create_dataset('daqLibrary', shape=(1,),
data=daq_library.encode('ascii'))
md_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.2')
# Start with the known and specified control sources
sources = {name: 'CONTROL'
for name in chain(self.__control_sources, control_sources)}
# Add in the specified instrument data channels.
sources.update({full_channel: 'INSTRUMENT'
for full_channel in instrument_channels})
# Add in those already in the file, if not already passed.
sources.update({f'{name}/{channel}': 'INSTRUMENT'
for name in self.__instrument_sources
for channel in self[f'INSTRUMENT/{name}']})
source_names = sorted(sources.keys())
data_sources_shape = (len(sources),)
md_group.create_dataset('dataSources/dataSourceId',
shape=data_sources_shape,
data=[f'{sources[name]}/{name}'.encode('ascii')
for name in source_names])
md_group.create_dataset('dataSources/deviceId',
shape=data_sources_shape,
data=[name.encode('ascii')
for name in source_names])
md_group.create_dataset('dataSources/root', shape=data_sources_shape,
data=[sources[name].encode('ascii')
for name in source_names])
md_group.create_dataset(
'karaboFramework', shape=(1,),
data=karabo_framework.encode('ascii'))
md_group.create_dataset(
'proposalNumber', shape=(1,), dtype=np.uint32, data=proposal)
md_group.create_dataset(
'runNumber', shape=(1,), dtype=np.uint32, data=run)
md_group.create_dataset(
'sequenceNumber', shape=(1,), dtype=np.uint32, data=sequence)
md_group.create_dataset(
'updateDate', shape=(1,),
data=update_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
from_details(folder, aggregator, run, sequence, prefix='CORR', mode='w', *args, **kwargs)
classmethod
¶
Open or create a file based on European XFEL details.
This methis is a wrapper to construct the filename based its components.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
folder |
Path, str
|
Parent location for this file. |
required |
aggregator |
str
|
Name of the data aggregator, must satisfy DataFile.aggregator_pattern. |
required |
run |
int
|
Run number. |
required |
sequence |
int
|
Sequence number. |
required |
prefix |
str
|
First filename component, 'CORR' by default. |
'CORR'
|
args, |
kwargs
|
Any additional arguments are passed on to h5py.File |
required |
Returns:
Type | Description |
---|---|
(DataFile) Opened file object. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
@classmethod
def from_details(cls, folder, aggregator, run, sequence, prefix='CORR',
mode='w', *args, **kwargs):
"""Open or create a file based on European XFEL details.
This methis is a wrapper to construct the filename based its
components.
Args:
folder (Path, str): Parent location for this file.
aggregator (str): Name of the data aggregator, must satisfy
DataFile.aggregator_pattern.
run (int): Run number.
sequence (int): Sequence number.
prefix (str, optional): First filename component, 'CORR' by
default.
args, kwargs: Any additional arguments are passed on to
h5py.File
Returns:
(DataFile) Opened file object.
"""
if not isinstance(folder, Path):
folder = Path(folder)
if not cls.aggregator_pattern.match(aggregator):
raise ValueError(f'invalid aggregator format, must satisfy '
f'{cls.aggregator_pattern.pattern}')
filename = cls.filename_format.format(
prefix=prefix, aggregator=aggregator, run=run, sequence=sequence)
self = cls((folder / filename).resolve(), mode, *args, **kwargs)
self.__run = run
self.__sequence = sequence
return self
InstrumentSource
¶
Bases: h5py.Group
Group for an instrument source ("fast data").
This class extends h5py.Group with methods specific to writing data of a control source in the European XFEL file format. The internal state does not depend on using any of these methods, and the underlying file may be manipulated by any of the regular h5py methods, too.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
class InstrumentSource(h5py.Group):
"""Group for an instrument source ("fast data").
This class extends h5py.Group with methods specific to writing data
of a control source in the European XFEL file format. The internal
state does not depend on using any of these methods, and the
underlying file may be manipulated by any of the regular h5py
methods, too.
"""
key_pattern = re.compile(r'^\w+\/[\w\/]+$')
def __init__(self, group_id, source):
super().__init__(group_id)
self.__source = source
def get_index_group(self, channel):
return self.file.require_group(f'INDEX/{self.__source}/{channel}')
def create_key(self, key, data=None, **kwargs):
"""Create dataset for a key.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
data (array_like, optional): Key data to initialize the
dataset to.
kwargs: Any additional keyword arguments are passed to
create_dataset.
Returns:
(h5py.Dataset) Created dataset
"""
key = escape_key(key)
if not self.key_pattern.match(key):
raise ValueError(f'invalid key format, must satisfy '
f'{self.key_pattern.pattern}')
return self.create_dataset(key, data=data, **kwargs)
def create_compressed_key(self, key, data, comp_threads=8):
"""Create a compressed dataset for a key.
This method makes use of lower-level access in h5py to compress
the data separately in multiple threads and write it directly to
file rather than go through HDF's compression filters.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
data (np.ndarray): Key data.ss
comp_threads (int, optional): Number of threads to use for
compression, 8 by default.
Returns:
(h5py.Dataset) Created dataset
"""
key = escape_key(key)
if not self.key_pattern.match(key):
raise ValueError(f'invalid key format, must satisfy '
f'{self.key_pattern.pattern}')
from cal_tools.tools import write_compressed_frames
return write_compressed_frames(data, self, key,
comp_threads=comp_threads)
def create_index(self, *args, **channels):
"""Create source-specific INDEX datasets.
Instrument data is indexed by channel, which is the first
component in its key. If channels have already been created, the
index may be applied to all channels by passing them as a
positional argument.
"""
if not channels:
try:
count = int(args[0])
except IndexError:
raise ValueError('positional arguments required if no '
'explicit channels are passed') from None
# Allow ValueError to propagate directly.
channels = {channel: count for channel in self}
for channel, count in channels.items():
index_group = self.get_index_group(channel)
index_group.create_dataset('count', data=count, dtype=np.uint64)
index_group.create_dataset(
'first', data=get_pulse_offsets(count), dtype=np.uint64)
create_compressed_key(key, data, comp_threads=8)
¶
Create a compressed dataset for a key.
This method makes use of lower-level access in h5py to compress the data separately in multiple threads and write it directly to file rather than go through HDF's compression filters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Source key, dots are automatically replaced by slashes. |
required |
data |
np.ndarray
|
Key data.ss |
required |
comp_threads |
int
|
Number of threads to use for compression, 8 by default. |
8
|
Returns:
Type | Description |
---|---|
(h5py.Dataset) Created dataset |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_compressed_key(self, key, data, comp_threads=8):
"""Create a compressed dataset for a key.
This method makes use of lower-level access in h5py to compress
the data separately in multiple threads and write it directly to
file rather than go through HDF's compression filters.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
data (np.ndarray): Key data.ss
comp_threads (int, optional): Number of threads to use for
compression, 8 by default.
Returns:
(h5py.Dataset) Created dataset
"""
key = escape_key(key)
if not self.key_pattern.match(key):
raise ValueError(f'invalid key format, must satisfy '
f'{self.key_pattern.pattern}')
from cal_tools.tools import write_compressed_frames
return write_compressed_frames(data, self, key,
comp_threads=comp_threads)
create_index(*args, **channels)
¶
Create source-specific INDEX datasets.
Instrument data is indexed by channel, which is the first component in its key. If channels have already been created, the index may be applied to all channels by passing them as a positional argument.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_index(self, *args, **channels):
"""Create source-specific INDEX datasets.
Instrument data is indexed by channel, which is the first
component in its key. If channels have already been created, the
index may be applied to all channels by passing them as a
positional argument.
"""
if not channels:
try:
count = int(args[0])
except IndexError:
raise ValueError('positional arguments required if no '
'explicit channels are passed') from None
# Allow ValueError to propagate directly.
channels = {channel: count for channel in self}
for channel, count in channels.items():
index_group = self.get_index_group(channel)
index_group.create_dataset('count', data=count, dtype=np.uint64)
index_group.create_dataset(
'first', data=get_pulse_offsets(count), dtype=np.uint64)
create_key(key, data=None, **kwargs)
¶
Create dataset for a key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Source key, dots are automatically replaced by slashes. |
required |
data |
array_like
|
Key data to initialize the dataset to. |
None
|
kwargs |
Any additional keyword arguments are passed to create_dataset. |
{}
|
Returns:
Type | Description |
---|---|
(h5py.Dataset) Created dataset |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def create_key(self, key, data=None, **kwargs):
"""Create dataset for a key.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
data (array_like, optional): Key data to initialize the
dataset to.
kwargs: Any additional keyword arguments are passed to
create_dataset.
Returns:
(h5py.Dataset) Created dataset
"""
key = escape_key(key)
if not self.key_pattern.match(key):
raise ValueError(f'invalid key format, must satisfy '
f'{self.key_pattern.pattern}')
return self.create_dataset(key, data=data, **kwargs)
escape_key(key)
¶
Escapes a key name from Karabo to HDF notation.
get_pulse_offsets(pulses_per_train)
¶
Compute pulse offsets from pulse counts.
Given an array of number of pulses per train (INDEX/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pulses_per_train |
array_like
|
Pulse count per train. |
required |
Returns:
Type | Description |
---|---|
(array_like) Offet of first pulse for each train. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def get_pulse_offsets(pulses_per_train):
"""Compute pulse offsets from pulse counts.
Given an array of number of pulses per train (INDEX/<source>/count),
computes the offsets (INDEX/<source>/first) for the first pulse of a
train inthe data array.
Args:
pulses_per_train (array_like): Pulse count per train.
Returns:
(array_like) Offet of first pulse for each train.
"""
pulse_offsets = np.zeros_like(pulses_per_train)
np.cumsum(pulses_per_train[:-1], out=pulse_offsets[1:])
return pulse_offsets
sequence_pulses(train_ids, pulses_per_train=1, pulse_offsets=None, trains_per_sequence=256)
¶
Split trains into sequences.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
train_ids |
array_like
|
Train IDs to sequence. |
required |
pulses_per_train |
int or array_like
|
Pulse count per train. If scalar, it is assumed to be constant for all trains. If omitted, it is 1 by default. |
1
|
pulse_offsets |
array_like
|
Offsets for the first pulse in each train, computed from pulses_per_train if omitted. |
None
|
trains_per_sequence |
int
|
Number of trains per sequence, 256 by default. |
256
|
Yields:
Type | Description |
---|---|
(int, array_like, array_like) Current sequence ID, train mask, pulse mask. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def sequence_pulses(train_ids, pulses_per_train=1, pulse_offsets=None,
trains_per_sequence=256):
"""Split trains into sequences.
Args:
train_ids (array_like): Train IDs to sequence.
pulses_per_train (int or array_like, optional): Pulse count per
train. If scalar, it is assumed to be constant for all
trains. If omitted, it is 1 by default.
pulse_offsets (array_like, optional): Offsets for the first
pulse in each train, computed from pulses_per_train if
omitted.
trains_per_sequence (int, optional): Number of trains
per sequence, 256 by default.
Yields:
(int, array_like, array_like)
Current sequence ID, train mask, pulse mask.
"""
if isinstance(pulses_per_train, Integral):
pulses_per_train = np.full_like(train_ids, pulses_per_train,
dtype=np.uint64)
if pulse_offsets is None:
pulse_offsets = get_pulse_offsets(pulses_per_train)
for seq_id, train_mask in sequence_trains(train_ids, trains_per_sequence):
start = train_mask.start
stop = train_mask.stop - 1
pulse_mask = np.s_[
pulse_offsets[start]:pulse_offsets[stop]+pulses_per_train[stop]]
yield seq_id, train_mask, pulse_mask
sequence_trains(train_ids, trains_per_sequence=256)
¶
Iterate over sequences for a list of trains.
For pulse-resolved data, sequence_pulses may be used instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
train_ids |
array_like
|
Train IDs to sequence. |
required |
trains_per_sequence |
int
|
Number of trains per sequence, 256 by default. |
256
|
Yields:
Type | Description |
---|---|
(int, slice) Current sequence ID, train mask. |
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/files.py
def sequence_trains(train_ids, trains_per_sequence=256):
"""Iterate over sequences for a list of trains.
For pulse-resolved data, sequence_pulses may be used instead.
Args:
train_ids (array_like): Train IDs to sequence.
trains_per_sequence (int, optional): Number of trains
per sequence, 256 by default.
Yields:
(int, slice) Current sequence ID, train mask.
"""
num_trains = len(train_ids)
for seq_id, start in enumerate(range(0, num_trains, trains_per_sequence)):
train_mask = slice(
*np.s_[start:start+trains_per_sequence].indices(num_trains))
yield seq_id, train_mask