Skip to content

ana_tools

combine_constants(ctimes_a, ctimes_b)

Combine two numpy arrays of timestamp

:param ctimes_a: First numpy array of timestamps :param ctimes_b: Second numpy array of timestamps :return: Combined list of timestamps, Indexes of first and second lists

Two lists of timestamps comes from two lists of calibration constants, which should be grouped in one. E.g. SlopesFF and SlopesPC. Input lists may have different number of items with may have intersects values. Input lists are not ordered.

Output is a list of ordered values, which are taken from input lists and assigned to elements of the list of pairs.

Another output is a list of pairs of indexes, which corresponds to elements of input lists, that corresponding values closely match each other.

Example: .. code-block:: python

# Calibration constants SlopesFF and SlopesPC retrieved
# for a given module withoin certain time period
cdataPC_vs_time = np.array(dataPC["data"])
cdataFF_vs_time = np.array(dataFF["data"])

# Timespamps from SlopesFF
# [datetime.datetime(2018, 8, 2, 16, 32, 15)
#  datetime.datetime(2018, 6, 6, 14, 57, 29)
#  datetime.datetime(2018, 6, 6, 13, 6, 54)
#  datetime.datetime(2018, 6, 6, 10, 58, 6)
#  datetime.datetime(2018, 9, 6, 13, 24, 46)
#  datetime.datetime(2018, 8, 23, 16, 36, 46)]
ctimesFF = np.array(dataFF["ctime"])

# Timespamps from SlopesPC
# [datetime.datetime(2018, 6, 6, 13, 6, 54)
#  datetime.datetime(2018, 6, 6, 10, 58, 52)
#  datetime.datetime(2018, 6, 6, 9, 52, 32)
#  datetime.datetime(2018, 9, 14, 10, 41, 9)
#  datetime.datetime(2018, 8, 1, 10, 55, 45)]
ctimesPC = np.array(dataPC["ctime"])

# Get combined time stamps and indexes, which shows
# how to combine constants
ctime, icomb = combine_constants(ctimesFF, ctimesPC)

# ctime values:
# [datetime.datetime(2018, 6, 6, 9, 52, 32),
#  datetime.datetime(2018, 6, 6, 10, 58, 6),
#  datetime.datetime(2018, 6, 6, 10, 58, 52),
#  datetime.datetime(2018, 6, 6, 13, 6, 54),
#  datetime.datetime(2018, 6, 6, 13, 6, 54),
#  datetime.datetime(2018, 6, 6, 14, 57, 29),
#  datetime.datetime(2018, 8, 1, 10, 55, 45),
#  datetime.datetime(2018, 8, 2, 16, 32, 15),
#  datetime.datetime(2018, 8, 23, 16, 36, 46),
#  datetime.datetime(2018, 9, 6, 13, 24, 46)]

# icomb:
# [(3, 2), (3, 1), (2, 1), (2, 4), (2, 4), (1, 4), (0, 4),
#  (0, 3), (5, 3), (4, 3)]

# create a list of gain constants by combining SlopesPC and FF
gain_vs_time = []
for iFF, iPC in icomb:
    gain_vs_time.append(cdataFF_vs_time[iFF] * cdataPC_vs_time[iPC])
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def combine_constants(ctimes_a, ctimes_b):
    """
    Combine two numpy arrays of timestamp

    :param ctimes_a: First numpy array of timestamps
    :param ctimes_b: Second numpy array of timestamps
    :return: Combined list of timestamps, Indexes of first and second lists

    Two lists of timestamps comes from two lists of calibration constants,
    which should be grouped in one. E.g. SlopesFF and SlopesPC.
    Input lists may have different number of items with may have
    intersects values. Input lists are not ordered.

    Output is a list of ordered values, which are taken from
    input lists and assigned to elements of the list of pairs.

    Another output is a list of pairs of indexes, which corresponds
    to elements of input lists, that corresponding values closely match
    each other.

    Example:
    .. code-block:: python

        # Calibration constants SlopesFF and SlopesPC retrieved
        # for a given module withoin certain time period
        cdataPC_vs_time = np.array(dataPC["data"])
        cdataFF_vs_time = np.array(dataFF["data"])

        # Timespamps from SlopesFF
        # [datetime.datetime(2018, 8, 2, 16, 32, 15)
        #  datetime.datetime(2018, 6, 6, 14, 57, 29)
        #  datetime.datetime(2018, 6, 6, 13, 6, 54)
        #  datetime.datetime(2018, 6, 6, 10, 58, 6)
        #  datetime.datetime(2018, 9, 6, 13, 24, 46)
        #  datetime.datetime(2018, 8, 23, 16, 36, 46)]
        ctimesFF = np.array(dataFF["ctime"])

        # Timespamps from SlopesPC
        # [datetime.datetime(2018, 6, 6, 13, 6, 54)
        #  datetime.datetime(2018, 6, 6, 10, 58, 52)
        #  datetime.datetime(2018, 6, 6, 9, 52, 32)
        #  datetime.datetime(2018, 9, 14, 10, 41, 9)
        #  datetime.datetime(2018, 8, 1, 10, 55, 45)]
        ctimesPC = np.array(dataPC["ctime"])

        # Get combined time stamps and indexes, which shows
        # how to combine constants
        ctime, icomb = combine_constants(ctimesFF, ctimesPC)

        # ctime values:
        # [datetime.datetime(2018, 6, 6, 9, 52, 32),
        #  datetime.datetime(2018, 6, 6, 10, 58, 6),
        #  datetime.datetime(2018, 6, 6, 10, 58, 52),
        #  datetime.datetime(2018, 6, 6, 13, 6, 54),
        #  datetime.datetime(2018, 6, 6, 13, 6, 54),
        #  datetime.datetime(2018, 6, 6, 14, 57, 29),
        #  datetime.datetime(2018, 8, 1, 10, 55, 45),
        #  datetime.datetime(2018, 8, 2, 16, 32, 15),
        #  datetime.datetime(2018, 8, 23, 16, 36, 46),
        #  datetime.datetime(2018, 9, 6, 13, 24, 46)]

        # icomb:
        # [(3, 2), (3, 1), (2, 1), (2, 4), (2, 4), (1, 4), (0, 4),
        #  (0, 3), (5, 3), (4, 3)]

        # create a list of gain constants by combining SlopesPC and FF
        gain_vs_time = []
        for iFF, iPC in icomb:
            gain_vs_time.append(cdataFF_vs_time[iFF] * cdataPC_vs_time[iPC])

    """
    sort_ind_a = np.argsort(ctimes_a)
    sort_ind_b = np.argsort(ctimes_b)

    t_asort = ctimes_a[sort_ind_a]
    t_bsort = ctimes_b[sort_ind_b]

    t_sort = np.array(sorted(np.hstack((t_asort, t_bsort))))

    icomb = []
    ctime = []
    for time in t_sort:
        if len(ctime) == len(t_sort) - 1:
            break
        ctime.append(time)
        if time in t_asort:
            i_a = np.where(t_asort == time)[0][0]
            if time >= t_bsort[-1]:
                i_b = len(t_bsort) - 1
            else:
                i_b = np.where(t_bsort > time)[0][0]
        else:
            if time >= t_asort[-1]:
                i_a = len(t_asort) - 1
            else:
                i_a = np.where(t_asort > time)[0][0]
            i_b = np.where(t_bsort == time)[0][0]
        icomb.append((sort_ind_a[i_a], sort_ind_b[i_b]))

    return ctime, icomb

combine_lists(*args, names=None)

Combine several lists to a list of dictionary or a list of lists Each dictionary contain a set of elements, one from each list

:param args: several lists to be combined :param names: list of names (the length equal to the number of args) :return: list of dictionary of list of lists

Example: .. code-block:: python

# define input lists
a = [100,200]
b = [0,1,2]
c = [25]

comb = combine_lists(a,b,c)
# comb: [[100, 0, 25], [100, 1, 25], [100, 2, 25],
#        [200, 0, 25], [200, 1, 25], [200, 2, 25]]

comb = combine_lists(a,b,c, names=['Voltage', 'Gain', 'Temperature'])
# comb: [{'Voltage': 100, 'Gain': 0, 'Temperature': 25},
#        {'Voltage': 100, 'Gain': 1, 'Temperature': 25},
#        {'Voltage': 100, 'Gain': 2, 'Temperature': 25},
#        {'Voltage': 200, 'Gain': 0, 'Temperature': 25},
#        {'Voltage': 200, 'Gain': 1, 'Temperature': 25},
#        {'Voltage': 200, 'Gain': 2, 'Temperature': 25}]
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def combine_lists(*args, names=None):
    """
    Combine several lists to a list of dictionary or a list of lists
    Each dictionary contain a set of elements, one from each list

    :param args: several lists to be combined
    :param names: list of names (the length equal to the number of args)
    :return: list of dictionary of list of lists

    Example:
    .. code-block:: python

        # define input lists
        a = [100,200]
        b = [0,1,2]
        c = [25]

        comb = combine_lists(a,b,c)
        # comb: [[100, 0, 25], [100, 1, 25], [100, 2, 25],
        #        [200, 0, 25], [200, 1, 25], [200, 2, 25]]

        comb = combine_lists(a,b,c, names=['Voltage', 'Gain', 'Temperature'])
        # comb: [{'Voltage': 100, 'Gain': 0, 'Temperature': 25},
        #        {'Voltage': 100, 'Gain': 1, 'Temperature': 25},
        #        {'Voltage': 100, 'Gain': 2, 'Temperature': 25},
        #        {'Voltage': 200, 'Gain': 0, 'Temperature': 25},
        #        {'Voltage': 200, 'Gain': 1, 'Temperature': 25},
        #        {'Voltage': 200, 'Gain': 2, 'Temperature': 25}]
    """

    params = list(map(tuple, args))
    possible_params = [[]]
    for param in params:
        possible_params = [x + [y] for x in possible_params for y in param]

    if isinstance(names, (list, tuple)):
        assert len(names) == len(args)
        return [dict(zip(names, par)) for par in possible_params]
    return possible_params

from_multiset(x)

Convert multiset to sorted list

:param x: Multi-set to be converted :return: Sorted list

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def from_multiset(x):
    """
    Convert multiset to sorted list

    :param x: Multi-set to be converted
    :return: Sorted list
    """
    return sorted([elt for elt, n in x])

get_range(data, scale, threshold=-1000)

Return a range calculated by median absolute deviations

:param data: numpy.array of data points :param scale: range in units of median absolute deviations :param threshold: lower threshold for data to be considered :return: Range [min, max] calculated by median absolute deviations

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def get_range(data, scale, threshold = -1000):
    """
    Return a range calculated by median absolute deviations

    :param data: numpy.array of data points
    :param scale: range in units of median absolute deviations
    :param threshold: lower threshold for data to be considered
    :return: Range [min, max] calculated by median absolute deviations
    """
    med = np.nanmedian(data[data>threshold])
    mad = np.nanmedian(np.abs(data[data>threshold].flatten() - med))
    return med - scale * mad, med + scale * mad

hm_combine(data, mem_cells=32, fname=None, htype=None, **kwargs)

Plot heatmap for calibration report

Plotting is based on pyDetLib heatmapPlot with additional insets depending on heatmap type. HMType.INSET_1D insets additional 1D histogram for each line of the heatmap HMType.INSET_AXIS insets axis for memory cells

:param data: 2D numpy.array of data points to plot :param mem_cells: number of memory cells shown in data :param fname: file name of output file :param htype: type of figure :param kwargs: other keywords supported by pyDetLib heatmapPlot

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def hm_combine(data, mem_cells=32, fname=None, htype=None, **kwargs):
    """
    Plot heatmap for calibration report

    Plotting is based on pyDetLib heatmapPlot with additional
    insets depending on heatmap type.
    HMType.INSET_1D insets additional 1D histogram for each line
    of the heatmap
    HMType.INSET_AXIS insets axis for memory cells

    :param data: 2D numpy.array of data points to plot
    :param mem_cells: number of memory cells shown in data
    :param fname: file name of output file
    :param htype: type of figure
    :param kwargs: other keywords supported by pyDetLib heatmapPlot
    """

    from XFELDetAna import xfelpyanatools as xana

    fig = plt.figure(figsize=(10, 12))
    ax = fig.add_subplot(111)

    data_txt = np.full(data.shape, '').astype(str)
    data_txt[data == IMType.NO_CONST.value] = 'X'
    data_txt[data == IMType.ALL_NAN.value] = 'n'
    data_txt[data == IMType.ALL_BAD.value] = 'B'
    data_txt[data == IMType.STRANGE_VAL.value] = '0'
    data_txt[data == IMType.NO_BPMAP.value] = 'N'

    data[data == IMType.NO_CONST.value] = np.nan
    data[data == IMType.ALL_NAN.value] = np.nan
    data[data == IMType.ALL_BAD.value] = np.nan
    data[data == IMType.STRANGE_VAL.value] = np.nan
    data[data == IMType.NO_BPMAP.value] = np.nan

    xana.heatmapPlot(data, add_panels=False, cmap='viridis',
                     cb_pad=0.6 if htype == HMType.INSET_AXIS else 0.1,
                     use_axis=ax,
                     **kwargs)

    plt.setp(ax.yaxis.get_majorticklabels(), rotation=90)

    pad = kwargs.get('pad', [0.125, 0.125, 0.1, 0.18])

    pad_b = pad[0]
    pad_l = pad[1]
    pad_t = pad[2]
    pad_r = pad[3]
    mar = 0.003

    vmin = kwargs.get('vmin', None)
    vmax = kwargs.get('vmax', None)

    h_frame = 1 - pad_b - pad_t
    w_frame = 1 - pad_l - pad_r

    # Loop over data dimensions and create text annotations.
    for i in range(data.shape[0]):
        for j in range(data.shape[1]):
            _ = ax.text(j+0.5, i+0.5, data_txt[i, j],
                        horizontalalignment="center",
                        verticalalignment="center",
                        color="black")

    if htype == HMType.INSET_1D:
        ax.tick_params(axis='y', which='major', pad=50)
        for y in range(data.shape[0]):

            h_frame = (1 - pad_b - pad_t) / data.shape[0]
            w_frame = 1 - pad_l - pad_r

            mean = np.nanmean(data[y])
            if vmin and vmax:
                mean = np.nanmean(
                    data[y, np.where((data[y] > vmin) & (data[y] < vmax))]
                )

            ax = plt.axes(
                [pad_l, pad_b + h_frame * y + mar, w_frame, h_frame - 2 * mar],
                frame_on=False,
                yticks=[mean],
                yticklabels=['{:4.2f}'.format(mean)],
                xticks=[],
                xticklabels=[]
            )
            ax.tick_params(axis='y', which='major', pad=25)

            if vmin and vmax:
                ax.set_ylim(vmin, vmax)

            d = [{'x': np.arange(data.shape[1]),
                  'y': data[y],
                  'drawstyle': 'steps-mid',
                  'linestyle': 'dotted',
                  'linewidth': 2,
                  'color': 'white',
                  },
                 {'x': np.array([0, data.shape[1] - 1]),
                  'y': np.array([mean, mean]),
                  'drawstyle': 'steps-mid',
                  'linewidth': 1.0,
                  'linestyle': 'dotted',
                  'color': 'white',
                  },
                 ]

            y_lab = ''
            # Locate label in a middle of the axis
            if y == int(data.shape[0] / 2):
                y_lab = 'Average over time'

            _ = xana.simplePlot(d,
                                x_label="",
                                y_label=y_lab,
                                use_axis=ax,
                                y_log=False)
    if htype == HMType.INSET_AXIS:
        # Group y axis by pixels and memory cells
        # Axis inset shows memory cells
        nypix = data.shape[0] / mem_cells
        ax2 = plt.axes([pad_l, pad_b, w_frame, h_frame / nypix],
                       frame_on=False, yticks=range(2),
                       yticklabels=[mem_cells, 0],
                       xticks=[], xticklabels=[]
                       )
        ax2.yaxis.set_label_position("right")
        ax2.set_ylabel('Memory cell ID', color='r')
        ax2.tick_params('y', colors='r', right=True, left=False,
                        labelright=True, labelleft=False)

    if fname:
        fig.savefig(fname, bbox_inches='tight')

load_data_from_hdf5(filelist)

Load Saved calibration constant from h5 file

:param filelist: List of file to be parsed with glob :return: dictionary with calibration constant

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def load_data_from_hdf5(filelist):
    """
    Load Saved calibration constant from h5 file

    :param filelist: List of file to be parsed with glob
    :return: dictionary with calibration constant
    """
    data = {}
    files = glob.glob(filelist)
    # Loop over files
    for filename in files:
        with h5py.File(filename, 'r') as f:
            # Loop over calibration constants
            for cKey in f.keys():
                if cKey not in data:
                    data[cKey] = {}
                # Loop over modules
                for mKey in f.get(cKey):
                    if mKey not in data[cKey]:
                        data[cKey][mKey] = {}
                    # Loop over module data
                    for tKey in f.get('{}/{}'.format(cKey, mKey)):

                        if tKey == 'ctime':
                            ctime = data[cKey][mKey].get("ctime", [])
                            for timeKey in f.get("/".join((cKey, mKey, tKey))):
                                path = "/".join((cKey, mKey, tKey, timeKey))
                                value = f.get(path)[()]
                                value = dateutil.parser.parse(value)
                                ctime.append(value)
                            data[cKey][mKey]["ctime"] = ctime
                            continue

                        # Load ndarray if they are there
                        item = f.get("/".join((cKey, mKey, tKey)))
                        if isinstance(item, h5py.Dataset):
                            data[cKey][mKey][tKey] = item[()]
                            continue

                        # Loop over stored data
                        for dKey in f.get("/".join((cKey, mKey, tKey))):

                            # Loop over metadata
                            if dKey == "mdata":
                                mdata_d = {}
                                for mdKey in f.get(
                                        "/".join((cKey, mKey, tKey, 'mdata'))):
                                    mdata_d[mdKey] = f.get(
                                        "/".join(
                                            (cKey, mKey, tKey, 'mdata',
                                             mdKey)))[()]
                                mdata_l = data[cKey][mKey].get("mdata", [])
                                mdata_l.append(mdata_d)
                                data[cKey][mKey]["mdata"] = mdata_l
                                continue

                            if dKey not in data[cKey][mKey]:
                                data[cKey][mKey][dKey] = []

                            value = f.get(
                                "/".join((cKey, mKey, tKey, dKey)))[()]

                            if dKey == "ctime":
                                value = dateutil.parser.parse(value)

                            data[cKey][mKey][dKey].append(value)
    return data

multi_intersect(a, b)

Combine two lists in one. Keep only elements present in both lists.

:param a: First list :param b: Second list :return: Combined list.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def multi_intersect(a, b):
    """
    Combine two lists in one. Keep only elements present in both lists.

    :param a: First list
    :param b: Second list
    :return: Combined list.
    """
    aa = to_multiset(a)
    bb = to_multiset(b)
    return from_multiset(aa & bb)

multi_union(a, b)

Combine two lists in one. If the same element is present in both lists, it counts only once.

:param a: First list :param b: Second list :return: Combined list.

Example:

.. code-block:: python

a = [5,5]
b = [4,4,4,5]
comb = multi_union(a, b)
# comb: [4,4,4,5,5]
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def multi_union(a, b):
    """
    Combine two lists in one.
    If the same element is present in both lists, it counts only once.

    :param a: First list
    :param b: Second list
    :return: Combined list.

        Example:
    .. code-block:: python

        a = [5,5]
        b = [4,4,4,5]
        comb = multi_union(a, b)
        # comb: [4,4,4,5,5]

    """
    aa = to_multiset(a)
    bb = to_multiset(b)
    return from_multiset(aa | bb)

recursively_save_dict_contents_to_group(h5file, path, dic)

Save dictionary to h5 file recursively

:param h5file: h5 file :param path: Path in h5 file :param dic: Dictionary to be saved

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def recursively_save_dict_contents_to_group(h5file, path, dic):
    """
    Save dictionary to h5 file recursively

    :param h5file: h5 file
    :param path: Path in h5 file
    :param dic: Dictionary to be saved

    """
    for key, item in dic.items():
        newpath = '/'.join((path, str(key)))

        # Prepare datatime to save
        if isinstance(item, datetime.datetime):
            item = item.isoformat()

        # Iterate over dictionary
        if isinstance(item, dict):
            recursively_save_dict_contents_to_group(h5file, newpath, item)

        # Iterate over list
        elif isinstance(item, (list, tuple)):
            for i, x in enumerate(item):
                newpath = '/'.join((path, str(key), str(i)))
                if isinstance(x, dict):
                    recursively_save_dict_contents_to_group(h5file, newpath, x)
                elif isinstance(x, datetime.datetime):
                    h5file[newpath] = x.isoformat()
                else:
                    raise ValueError('Cannot save a list of %s ' % type(item))

        # Save strings, scalars or array of scalars
        elif (np.isscalar(item) or
              isinstance(item, (str, np.ndarray))):
            h5file[newpath] = item

        # Other types cannot be saved and will result in an error
        else:
            raise ValueError('Cannot save %s type.' % type(item))

save_dict_to_hdf5(dic, filename)

Save dictionary to h5 file

:param dic: Python dictionary to be saved :param filename: Filename including path

Dictionary may contain other dictionary, lists, scalars, strings, datatime and ndarray of scalars.

Dictionaries and lists are iterated to be stored in h5 file. datatime is converted to integer.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def save_dict_to_hdf5(dic, filename):
    """
    Save dictionary to h5 file

    :param dic: Python dictionary to be saved
    :param filename: Filename including path

    Dictionary may contain other dictionary, lists, scalars, strings,
    datatime and ndarray of scalars.

    Dictionaries and lists are iterated to be stored in h5 file.
    datatime is converted to integer.
    """
    with h5py.File(filename, 'w') as h5file:
        recursively_save_dict_contents_to_group(h5file, '/', dic)

to_multiset(x)

Convert list to multiset

:param x: List to be converted :return: ordered multiset: Item of list, index of this element

E.g. (2,3,3,4,2) gives {(2, 0), (2, 1), (3, 0), (3, 1), (4, 0)}

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/cal_tools/ana_tools.py
def to_multiset(x):
    """
    Convert list to multiset

    :param x: List to be converted
    :return: ordered multiset: Item of list, index of this element

    E.g. (2,3,3,4,2) gives {(2, 0), (2, 1), (3, 0), (3, 1), (4, 0)}
    """
    result = set()
    max_rep = len(x)
    for elt in x:
        for n in range(max_rep):
            n_elt = (elt, n)
            if n_elt not in result:
                result.add(n_elt)
                break
    return result