ana_tools
combine_constants(ctimes_a, ctimes_b)
¶
Combine two numpy arrays of timestamp
:param ctimes_a: First numpy array of timestamps :param ctimes_b: Second numpy array of timestamps :return: Combined list of timestamps, Indexes of first and second lists
Two lists of timestamps comes from two lists of calibration constants, which should be grouped in one. E.g. SlopesFF and SlopesPC. Input lists may have different number of items with may have intersects values. Input lists are not ordered.
Output is a list of ordered values, which are taken from input lists and assigned to elements of the list of pairs.
Another output is a list of pairs of indexes, which corresponds to elements of input lists, that corresponding values closely match each other.
Example: .. code-block:: python
# Calibration constants SlopesFF and SlopesPC retrieved
# for a given module withoin certain time period
cdataPC_vs_time = np.array(dataPC["data"])
cdataFF_vs_time = np.array(dataFF["data"])
# Timespamps from SlopesFF
# [datetime.datetime(2018, 8, 2, 16, 32, 15)
# datetime.datetime(2018, 6, 6, 14, 57, 29)
# datetime.datetime(2018, 6, 6, 13, 6, 54)
# datetime.datetime(2018, 6, 6, 10, 58, 6)
# datetime.datetime(2018, 9, 6, 13, 24, 46)
# datetime.datetime(2018, 8, 23, 16, 36, 46)]
ctimesFF = np.array(dataFF["ctime"])
# Timespamps from SlopesPC
# [datetime.datetime(2018, 6, 6, 13, 6, 54)
# datetime.datetime(2018, 6, 6, 10, 58, 52)
# datetime.datetime(2018, 6, 6, 9, 52, 32)
# datetime.datetime(2018, 9, 14, 10, 41, 9)
# datetime.datetime(2018, 8, 1, 10, 55, 45)]
ctimesPC = np.array(dataPC["ctime"])
# Get combined time stamps and indexes, which shows
# how to combine constants
ctime, icomb = combine_constants(ctimesFF, ctimesPC)
# ctime values:
# [datetime.datetime(2018, 6, 6, 9, 52, 32),
# datetime.datetime(2018, 6, 6, 10, 58, 6),
# datetime.datetime(2018, 6, 6, 10, 58, 52),
# datetime.datetime(2018, 6, 6, 13, 6, 54),
# datetime.datetime(2018, 6, 6, 13, 6, 54),
# datetime.datetime(2018, 6, 6, 14, 57, 29),
# datetime.datetime(2018, 8, 1, 10, 55, 45),
# datetime.datetime(2018, 8, 2, 16, 32, 15),
# datetime.datetime(2018, 8, 23, 16, 36, 46),
# datetime.datetime(2018, 9, 6, 13, 24, 46)]
# icomb:
# [(3, 2), (3, 1), (2, 1), (2, 4), (2, 4), (1, 4), (0, 4),
# (0, 3), (5, 3), (4, 3)]
# create a list of gain constants by combining SlopesPC and FF
gain_vs_time = []
for iFF, iPC in icomb:
gain_vs_time.append(cdataFF_vs_time[iFF] * cdataPC_vs_time[iPC])
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def combine_constants(ctimes_a, ctimes_b):
"""
Combine two numpy arrays of timestamp
:param ctimes_a: First numpy array of timestamps
:param ctimes_b: Second numpy array of timestamps
:return: Combined list of timestamps, Indexes of first and second lists
Two lists of timestamps comes from two lists of calibration constants,
which should be grouped in one. E.g. SlopesFF and SlopesPC.
Input lists may have different number of items with may have
intersects values. Input lists are not ordered.
Output is a list of ordered values, which are taken from
input lists and assigned to elements of the list of pairs.
Another output is a list of pairs of indexes, which corresponds
to elements of input lists, that corresponding values closely match
each other.
Example:
.. code-block:: python
# Calibration constants SlopesFF and SlopesPC retrieved
# for a given module withoin certain time period
cdataPC_vs_time = np.array(dataPC["data"])
cdataFF_vs_time = np.array(dataFF["data"])
# Timespamps from SlopesFF
# [datetime.datetime(2018, 8, 2, 16, 32, 15)
# datetime.datetime(2018, 6, 6, 14, 57, 29)
# datetime.datetime(2018, 6, 6, 13, 6, 54)
# datetime.datetime(2018, 6, 6, 10, 58, 6)
# datetime.datetime(2018, 9, 6, 13, 24, 46)
# datetime.datetime(2018, 8, 23, 16, 36, 46)]
ctimesFF = np.array(dataFF["ctime"])
# Timespamps from SlopesPC
# [datetime.datetime(2018, 6, 6, 13, 6, 54)
# datetime.datetime(2018, 6, 6, 10, 58, 52)
# datetime.datetime(2018, 6, 6, 9, 52, 32)
# datetime.datetime(2018, 9, 14, 10, 41, 9)
# datetime.datetime(2018, 8, 1, 10, 55, 45)]
ctimesPC = np.array(dataPC["ctime"])
# Get combined time stamps and indexes, which shows
# how to combine constants
ctime, icomb = combine_constants(ctimesFF, ctimesPC)
# ctime values:
# [datetime.datetime(2018, 6, 6, 9, 52, 32),
# datetime.datetime(2018, 6, 6, 10, 58, 6),
# datetime.datetime(2018, 6, 6, 10, 58, 52),
# datetime.datetime(2018, 6, 6, 13, 6, 54),
# datetime.datetime(2018, 6, 6, 13, 6, 54),
# datetime.datetime(2018, 6, 6, 14, 57, 29),
# datetime.datetime(2018, 8, 1, 10, 55, 45),
# datetime.datetime(2018, 8, 2, 16, 32, 15),
# datetime.datetime(2018, 8, 23, 16, 36, 46),
# datetime.datetime(2018, 9, 6, 13, 24, 46)]
# icomb:
# [(3, 2), (3, 1), (2, 1), (2, 4), (2, 4), (1, 4), (0, 4),
# (0, 3), (5, 3), (4, 3)]
# create a list of gain constants by combining SlopesPC and FF
gain_vs_time = []
for iFF, iPC in icomb:
gain_vs_time.append(cdataFF_vs_time[iFF] * cdataPC_vs_time[iPC])
"""
sort_ind_a = np.argsort(ctimes_a)
sort_ind_b = np.argsort(ctimes_b)
t_asort = ctimes_a[sort_ind_a]
t_bsort = ctimes_b[sort_ind_b]
t_sort = np.array(sorted(np.hstack((t_asort, t_bsort))))
icomb = []
ctime = []
for time in t_sort:
if len(ctime) == len(t_sort) - 1:
break
ctime.append(time)
if time in t_asort:
i_a = np.where(t_asort == time)[0][0]
if time >= t_bsort[-1]:
i_b = len(t_bsort) - 1
else:
i_b = np.where(t_bsort > time)[0][0]
else:
if time >= t_asort[-1]:
i_a = len(t_asort) - 1
else:
i_a = np.where(t_asort > time)[0][0]
i_b = np.where(t_bsort == time)[0][0]
icomb.append((sort_ind_a[i_a], sort_ind_b[i_b]))
return ctime, icomb
combine_lists(*args, names=None)
¶
Combine several lists to a list of dictionary or a list of lists Each dictionary contain a set of elements, one from each list
:param args: several lists to be combined :param names: list of names (the length equal to the number of args) :return: list of dictionary of list of lists
Example: .. code-block:: python
# define input lists
a = [100,200]
b = [0,1,2]
c = [25]
comb = combine_lists(a,b,c)
# comb: [[100, 0, 25], [100, 1, 25], [100, 2, 25],
# [200, 0, 25], [200, 1, 25], [200, 2, 25]]
comb = combine_lists(a,b,c, names=['Voltage', 'Gain', 'Temperature'])
# comb: [{'Voltage': 100, 'Gain': 0, 'Temperature': 25},
# {'Voltage': 100, 'Gain': 1, 'Temperature': 25},
# {'Voltage': 100, 'Gain': 2, 'Temperature': 25},
# {'Voltage': 200, 'Gain': 0, 'Temperature': 25},
# {'Voltage': 200, 'Gain': 1, 'Temperature': 25},
# {'Voltage': 200, 'Gain': 2, 'Temperature': 25}]
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def combine_lists(*args, names=None):
"""
Combine several lists to a list of dictionary or a list of lists
Each dictionary contain a set of elements, one from each list
:param args: several lists to be combined
:param names: list of names (the length equal to the number of args)
:return: list of dictionary of list of lists
Example:
.. code-block:: python
# define input lists
a = [100,200]
b = [0,1,2]
c = [25]
comb = combine_lists(a,b,c)
# comb: [[100, 0, 25], [100, 1, 25], [100, 2, 25],
# [200, 0, 25], [200, 1, 25], [200, 2, 25]]
comb = combine_lists(a,b,c, names=['Voltage', 'Gain', 'Temperature'])
# comb: [{'Voltage': 100, 'Gain': 0, 'Temperature': 25},
# {'Voltage': 100, 'Gain': 1, 'Temperature': 25},
# {'Voltage': 100, 'Gain': 2, 'Temperature': 25},
# {'Voltage': 200, 'Gain': 0, 'Temperature': 25},
# {'Voltage': 200, 'Gain': 1, 'Temperature': 25},
# {'Voltage': 200, 'Gain': 2, 'Temperature': 25}]
"""
params = list(map(tuple, args))
possible_params = [[]]
for param in params:
possible_params = [x + [y] for x in possible_params for y in param]
if isinstance(names, (list, tuple)):
assert len(names) == len(args)
return [dict(zip(names, par)) for par in possible_params]
return possible_params
from_multiset(x)
¶
Convert multiset to sorted list
:param x: Multi-set to be converted :return: Sorted list
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
get_range(data, scale, threshold=-1000)
¶
Return a range calculated by median absolute deviations
:param data: numpy.array of data points :param scale: range in units of median absolute deviations :param threshold: lower threshold for data to be considered :return: Range [min, max] calculated by median absolute deviations
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def get_range(data, scale, threshold = -1000):
"""
Return a range calculated by median absolute deviations
:param data: numpy.array of data points
:param scale: range in units of median absolute deviations
:param threshold: lower threshold for data to be considered
:return: Range [min, max] calculated by median absolute deviations
"""
med = np.nanmedian(data[data>threshold])
mad = np.nanmedian(np.abs(data[data>threshold].flatten() - med))
return med - scale * mad, med + scale * mad
hm_combine(data, mem_cells=32, fname=None, htype=None, **kwargs)
¶
Plot heatmap for calibration report
Plotting is based on pyDetLib heatmapPlot with additional insets depending on heatmap type. HMType.INSET_1D insets additional 1D histogram for each line of the heatmap HMType.INSET_AXIS insets axis for memory cells
:param data: 2D numpy.array of data points to plot :param mem_cells: number of memory cells shown in data :param fname: file name of output file :param htype: type of figure :param kwargs: other keywords supported by pyDetLib heatmapPlot
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def hm_combine(data, mem_cells=32, fname=None, htype=None, **kwargs):
"""
Plot heatmap for calibration report
Plotting is based on pyDetLib heatmapPlot with additional
insets depending on heatmap type.
HMType.INSET_1D insets additional 1D histogram for each line
of the heatmap
HMType.INSET_AXIS insets axis for memory cells
:param data: 2D numpy.array of data points to plot
:param mem_cells: number of memory cells shown in data
:param fname: file name of output file
:param htype: type of figure
:param kwargs: other keywords supported by pyDetLib heatmapPlot
"""
from XFELDetAna import xfelpyanatools as xana
fig = plt.figure(figsize=(10, 12))
ax = fig.add_subplot(111)
data_txt = np.full(data.shape, '').astype(str)
data_txt[data == IMType.NO_CONST.value] = 'X'
data_txt[data == IMType.ALL_NAN.value] = 'n'
data_txt[data == IMType.ALL_BAD.value] = 'B'
data_txt[data == IMType.STRANGE_VAL.value] = '0'
data_txt[data == IMType.NO_BPMAP.value] = 'N'
data[data == IMType.NO_CONST.value] = np.nan
data[data == IMType.ALL_NAN.value] = np.nan
data[data == IMType.ALL_BAD.value] = np.nan
data[data == IMType.STRANGE_VAL.value] = np.nan
data[data == IMType.NO_BPMAP.value] = np.nan
xana.heatmapPlot(data, add_panels=False, cmap='viridis',
cb_pad=0.6 if htype == HMType.INSET_AXIS else 0.1,
use_axis=ax,
**kwargs)
plt.setp(ax.yaxis.get_majorticklabels(), rotation=90)
pad = kwargs.get('pad', [0.125, 0.125, 0.1, 0.18])
pad_b = pad[0]
pad_l = pad[1]
pad_t = pad[2]
pad_r = pad[3]
mar = 0.003
vmin = kwargs.get('vmin', None)
vmax = kwargs.get('vmax', None)
h_frame = 1 - pad_b - pad_t
w_frame = 1 - pad_l - pad_r
# Loop over data dimensions and create text annotations.
for i in range(data.shape[0]):
for j in range(data.shape[1]):
_ = ax.text(j+0.5, i+0.5, data_txt[i, j],
horizontalalignment="center",
verticalalignment="center",
color="black")
if htype == HMType.INSET_1D:
ax.tick_params(axis='y', which='major', pad=50)
for y in range(data.shape[0]):
h_frame = (1 - pad_b - pad_t) / data.shape[0]
w_frame = 1 - pad_l - pad_r
mean = np.nanmean(data[y])
if vmin and vmax:
mean = np.nanmean(
data[y, np.where((data[y] > vmin) & (data[y] < vmax))]
)
ax = plt.axes(
[pad_l, pad_b + h_frame * y + mar, w_frame, h_frame - 2 * mar],
frame_on=False,
yticks=[mean],
yticklabels=['{:4.2f}'.format(mean)],
xticks=[],
xticklabels=[]
)
ax.tick_params(axis='y', which='major', pad=25)
if vmin and vmax:
ax.set_ylim(vmin, vmax)
d = [{'x': np.arange(data.shape[1]),
'y': data[y],
'drawstyle': 'steps-mid',
'linestyle': 'dotted',
'linewidth': 2,
'color': 'white',
},
{'x': np.array([0, data.shape[1] - 1]),
'y': np.array([mean, mean]),
'drawstyle': 'steps-mid',
'linewidth': 1.0,
'linestyle': 'dotted',
'color': 'white',
},
]
y_lab = ''
# Locate label in a middle of the axis
if y == int(data.shape[0] / 2):
y_lab = 'Average over time'
_ = xana.simplePlot(d,
x_label="",
y_label=y_lab,
use_axis=ax,
y_log=False)
if htype == HMType.INSET_AXIS:
# Group y axis by pixels and memory cells
# Axis inset shows memory cells
nypix = data.shape[0] / mem_cells
ax2 = plt.axes([pad_l, pad_b, w_frame, h_frame / nypix],
frame_on=False, yticks=range(2),
yticklabels=[mem_cells, 0],
xticks=[], xticklabels=[]
)
ax2.yaxis.set_label_position("right")
ax2.set_ylabel('Memory cell ID', color='r')
ax2.tick_params('y', colors='r', right=True, left=False,
labelright=True, labelleft=False)
if fname:
fig.savefig(fname, bbox_inches='tight')
load_data_from_hdf5(filelist)
¶
Load Saved calibration constant from h5 file
:param filelist: List of file to be parsed with glob :return: dictionary with calibration constant
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def load_data_from_hdf5(filelist):
"""
Load Saved calibration constant from h5 file
:param filelist: List of file to be parsed with glob
:return: dictionary with calibration constant
"""
data = {}
files = glob.glob(filelist)
# Loop over files
for filename in files:
with h5py.File(filename, 'r') as f:
# Loop over calibration constants
for cKey in f.keys():
if cKey not in data:
data[cKey] = {}
# Loop over modules
for mKey in f.get(cKey):
if mKey not in data[cKey]:
data[cKey][mKey] = {}
# Loop over module data
for tKey in f.get('{}/{}'.format(cKey, mKey)):
if tKey == 'ctime':
ctime = data[cKey][mKey].get("ctime", [])
for timeKey in f.get("/".join((cKey, mKey, tKey))):
path = "/".join((cKey, mKey, tKey, timeKey))
value = f.get(path)[()]
value = dateutil.parser.parse(value)
ctime.append(value)
data[cKey][mKey]["ctime"] = ctime
continue
# Load ndarray if they are there
item = f.get("/".join((cKey, mKey, tKey)))
if isinstance(item, h5py.Dataset):
data[cKey][mKey][tKey] = item[()]
continue
# Loop over stored data
for dKey in f.get("/".join((cKey, mKey, tKey))):
# Loop over metadata
if dKey == "mdata":
mdata_d = {}
for mdKey in f.get(
"/".join((cKey, mKey, tKey, 'mdata'))):
mdata_d[mdKey] = f.get(
"/".join(
(cKey, mKey, tKey, 'mdata',
mdKey)))[()]
mdata_l = data[cKey][mKey].get("mdata", [])
mdata_l.append(mdata_d)
data[cKey][mKey]["mdata"] = mdata_l
continue
if dKey not in data[cKey][mKey]:
data[cKey][mKey][dKey] = []
value = f.get(
"/".join((cKey, mKey, tKey, dKey)))[()]
if dKey == "ctime":
value = dateutil.parser.parse(value)
data[cKey][mKey][dKey].append(value)
return data
multi_intersect(a, b)
¶
Combine two lists in one. Keep only elements present in both lists.
:param a: First list :param b: Second list :return: Combined list.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
multi_union(a, b)
¶
Combine two lists in one. If the same element is present in both lists, it counts only once.
:param a: First list :param b: Second list :return: Combined list.
Example:
.. code-block:: python
a = [5,5]
b = [4,4,4,5]
comb = multi_union(a, b)
# comb: [4,4,4,5,5]
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def multi_union(a, b):
"""
Combine two lists in one.
If the same element is present in both lists, it counts only once.
:param a: First list
:param b: Second list
:return: Combined list.
Example:
.. code-block:: python
a = [5,5]
b = [4,4,4,5]
comb = multi_union(a, b)
# comb: [4,4,4,5,5]
"""
aa = to_multiset(a)
bb = to_multiset(b)
return from_multiset(aa | bb)
recursively_save_dict_contents_to_group(h5file, path, dic)
¶
Save dictionary to h5 file recursively
:param h5file: h5 file :param path: Path in h5 file :param dic: Dictionary to be saved
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def recursively_save_dict_contents_to_group(h5file, path, dic):
"""
Save dictionary to h5 file recursively
:param h5file: h5 file
:param path: Path in h5 file
:param dic: Dictionary to be saved
"""
for key, item in dic.items():
newpath = '/'.join((path, str(key)))
# Prepare datatime to save
if isinstance(item, datetime.datetime):
item = item.isoformat()
# Iterate over dictionary
if isinstance(item, dict):
recursively_save_dict_contents_to_group(h5file, newpath, item)
# Iterate over list
elif isinstance(item, (list, tuple)):
for i, x in enumerate(item):
newpath = '/'.join((path, str(key), str(i)))
if isinstance(x, dict):
recursively_save_dict_contents_to_group(h5file, newpath, x)
elif isinstance(x, datetime.datetime):
h5file[newpath] = x.isoformat()
else:
raise ValueError('Cannot save a list of %s ' % type(item))
# Save strings, scalars or array of scalars
elif (np.isscalar(item) or
isinstance(item, (str, np.ndarray))):
h5file[newpath] = item
# Other types cannot be saved and will result in an error
else:
raise ValueError('Cannot save %s type.' % type(item))
save_dict_to_hdf5(dic, filename)
¶
Save dictionary to h5 file
:param dic: Python dictionary to be saved :param filename: Filename including path
Dictionary may contain other dictionary, lists, scalars, strings, datatime and ndarray of scalars.
Dictionaries and lists are iterated to be stored in h5 file. datatime is converted to integer.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def save_dict_to_hdf5(dic, filename):
"""
Save dictionary to h5 file
:param dic: Python dictionary to be saved
:param filename: Filename including path
Dictionary may contain other dictionary, lists, scalars, strings,
datatime and ndarray of scalars.
Dictionaries and lists are iterated to be stored in h5 file.
datatime is converted to integer.
"""
with h5py.File(filename, 'w') as h5file:
recursively_save_dict_contents_to_group(h5file, '/', dic)
to_multiset(x)
¶
Convert list to multiset
:param x: List to be converted :return: ordered multiset: Item of list, index of this element
E.g. (2,3,3,4,2) gives {(2, 0), (2, 1), (3, 0), (3, 1), (4, 0)}
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def to_multiset(x):
"""
Convert list to multiset
:param x: List to be converted
:return: ordered multiset: Item of list, index of this element
E.g. (2,3,3,4,2) gives {(2, 0), (2, 1), (3, 0), (3, 1), (4, 0)}
"""
result = set()
max_rep = len(x)
for elt in x:
for n in range(max_rep):
n_elt = (elt, n)
if n_elt not in result:
result.add(n_elt)
break
return result