Skip to content

calibrate

JobArgs

Command line arguments for running one calibration job

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
class JobArgs:
    """Command line arguments for running one calibration job"""
    def __init__(self, args: List[str]):
        self.args = args

    def __repr__(self):
        return f"JobArgs({self.args})"

    def __eq__(self, other):
        return isinstance(other, JobArgs) and (self.args == other.args)

    def format_cmd(self, python):
        return [a.format(python=python) for a in self.args]

    def run_direct(self, work_dir, python) -> int:
        """Run this job in a local process, return exit status"""
        return call(self.format_cmd(python), cwd=work_dir)

    def submit_job(
            self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None
    ):
        """Submit this job to Slurm, return its job ID"""
        cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
        cmd += self.format_cmd(python)
        # sbatch propagates environment variables into the job by default
        output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8')
        return output.partition(';')[0].strip()  # job ID

run_direct(work_dir, python)

Run this job in a local process, return exit status

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def run_direct(self, work_dir, python) -> int:
    """Run this job in a local process, return exit status"""
    return call(self.format_cmd(python), cwd=work_dir)

submit_job(work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None)

Submit this job to Slurm, return its job ID

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def submit_job(
        self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None
):
    """Submit this job to Slurm, return its job ID"""
    cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
    cmd += self.format_cmd(python)
    # sbatch propagates environment variables into the job by default
    output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8')
    return output.partition(';')[0].strip()  # job ID

JobChain

A collection of jobs to run for one call to xfel-calibrate

This is a chain of steps, each of which may contain several jobs. It also holds the work directory, where the parameterised notebooks are saved, and the path to the Python interpreter to run the notebooks.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
class JobChain:
    """A collection of jobs to run for one call to xfel-calibrate

    This is a chain of steps, each of which may contain several jobs.
    It also holds the work directory, where the parameterised notebooks are
    saved, and the path to the Python interpreter to run the notebooks.
    """
    def __init__(self, steps: List[Step], work_dir: Path, python):
        self.steps = [s for s in steps if s.jobs]
        self.work_dir = work_dir
        self.python = python

    @classmethod
    def from_dir(cls, work_dir: Path, python):
        """Load a JobChain from a work directory containing steps.json"""
        d = json.loads((work_dir / 'steps.json').read_text('utf-8'))
        steps = [Step.from_dict(sd) for sd in d['steps']]
        return cls(steps, work_dir, python)

    def save(self):
        """Save the steps of this chain to steps.json in the work directory"""
        with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f:
            json.dump({
                'steps': [step.to_dict() for step in self.steps]
            }, f, indent=2)

    def submit_jobs(self, slurm_opts: SlurmOptions, env=None):
        """Submit these jobs to Slurm, return a list of job IDs

        Slurm dependencies are used to manage the sequence of jobs.
        """
        print("Submitting jobs with Slurm options:")
        print(" ".join(slurm_opts.get_launcher_command(self.work_dir)))
        all_job_ids = []
        dep_job_ids = ()  # Replaced after each step
        for step in self.steps:
            step_job_ids = []
            kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
            for job_desc in step.jobs:
                jid = job_desc.submit_job(
                    self.work_dir, self.python, slurm_opts, env=env, **kw
                )
                step_job_ids.append(jid)
            dep_job_ids = step_job_ids
            all_job_ids.extend(step_job_ids)
        return all_job_ids

    def run_direct(self) -> bool:
        """Run these jobs in local processes, return True if any failed"""
        errors = False
        for i, step in enumerate(self.steps):
            if errors and not step.after_error:
                print(f"Not running step {i}, previous step failed")
                continue

            exit_codes = [
                j.run_direct(self.work_dir, self.python) for j in step.jobs
            ]
            if any(ec != 0 for ec in exit_codes):
                errors = True

        return errors

from_dir(work_dir, python) classmethod

Load a JobChain from a work directory containing steps.json

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
@classmethod
def from_dir(cls, work_dir: Path, python):
    """Load a JobChain from a work directory containing steps.json"""
    d = json.loads((work_dir / 'steps.json').read_text('utf-8'))
    steps = [Step.from_dict(sd) for sd in d['steps']]
    return cls(steps, work_dir, python)

run_direct()

Run these jobs in local processes, return True if any failed

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def run_direct(self) -> bool:
    """Run these jobs in local processes, return True if any failed"""
    errors = False
    for i, step in enumerate(self.steps):
        if errors and not step.after_error:
            print(f"Not running step {i}, previous step failed")
            continue

        exit_codes = [
            j.run_direct(self.work_dir, self.python) for j in step.jobs
        ]
        if any(ec != 0 for ec in exit_codes):
            errors = True

    return errors

save()

Save the steps of this chain to steps.json in the work directory

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def save(self):
    """Save the steps of this chain to steps.json in the work directory"""
    with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f:
        json.dump({
            'steps': [step.to_dict() for step in self.steps]
        }, f, indent=2)

submit_jobs(slurm_opts, env=None)

Submit these jobs to Slurm, return a list of job IDs

Slurm dependencies are used to manage the sequence of jobs.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def submit_jobs(self, slurm_opts: SlurmOptions, env=None):
    """Submit these jobs to Slurm, return a list of job IDs

    Slurm dependencies are used to manage the sequence of jobs.
    """
    print("Submitting jobs with Slurm options:")
    print(" ".join(slurm_opts.get_launcher_command(self.work_dir)))
    all_job_ids = []
    dep_job_ids = ()  # Replaced after each step
    for step in self.steps:
        step_job_ids = []
        kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
        for job_desc in step.jobs:
            jid = job_desc.submit_job(
                self.work_dir, self.python, slurm_opts, env=env, **kw
            )
            step_job_ids.append(jid)
        dep_job_ids = step_job_ids
        all_job_ids.extend(step_job_ids)
    return all_job_ids

SlurmOptions

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
class SlurmOptions:
    def __init__(
            self, job_name=None, nice=None, mem=None, partition=None, reservation=None,
    ):
        self.job_name = job_name or 'xfel_calibrate'
        self.nice = nice
        self.mem = mem
        self.partition = partition
        self.reservation = reservation

    def get_partition_or_reservation(self) -> List[str]:
        """Return sbatch arguments to use a partition or reservation

        --reservation and --slurm-partition options have precedence.
        Otherwise, a default partition is used.
        """
        if self.reservation:
            return ['--reservation', self.reservation]
        return ['--partition', self.partition or sprof]

    def get_launcher_command(self, log_dir, after_ok=(), after_any=()) -> List[str]:
        """
        Return a slurm launcher command
        :param log_dir: Where Slurm .out log files should go
        :param after_ok: A list of jobs which must succeed first
        :param after_any: A list of jobs which must finish first, but may fail
        :return: List of commands and parameters to be used by subprocess
        """

        launcher_slurm = launcher_command.format(temp_path=log_dir).split()

        launcher_slurm += self.get_partition_or_reservation()

        launcher_slurm += ["--job-name", self.job_name]

        if self.nice:
            launcher_slurm.append(f"--nice={self.nice}")

        if self.mem:
            launcher_slurm.append(f"--mem={self.mem}G")

        deps = []
        if after_ok:
            deps.append("afterok:" + ":".join(str(j) for j in after_ok))
        if after_any:
            deps.append("afterany:" + ":".join(str(j) for j in after_any))
        if deps:
            launcher_slurm.append("--dependency=" + ",".join(deps))
        return launcher_slurm

get_launcher_command(log_dir, after_ok=(), after_any=())

Return a slurm launcher command :param log_dir: Where Slurm .out log files should go :param after_ok: A list of jobs which must succeed first :param after_any: A list of jobs which must finish first, but may fail :return: List of commands and parameters to be used by subprocess

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def get_launcher_command(self, log_dir, after_ok=(), after_any=()) -> List[str]:
    """
    Return a slurm launcher command
    :param log_dir: Where Slurm .out log files should go
    :param after_ok: A list of jobs which must succeed first
    :param after_any: A list of jobs which must finish first, but may fail
    :return: List of commands and parameters to be used by subprocess
    """

    launcher_slurm = launcher_command.format(temp_path=log_dir).split()

    launcher_slurm += self.get_partition_or_reservation()

    launcher_slurm += ["--job-name", self.job_name]

    if self.nice:
        launcher_slurm.append(f"--nice={self.nice}")

    if self.mem:
        launcher_slurm.append(f"--mem={self.mem}G")

    deps = []
    if after_ok:
        deps.append("afterok:" + ":".join(str(j) for j in after_ok))
    if after_any:
        deps.append("afterany:" + ":".join(str(j) for j in after_any))
    if deps:
        launcher_slurm.append("--dependency=" + ",".join(deps))
    return launcher_slurm

get_partition_or_reservation()

Return sbatch arguments to use a partition or reservation

--reservation and --slurm-partition options have precedence. Otherwise, a default partition is used.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def get_partition_or_reservation(self) -> List[str]:
    """Return sbatch arguments to use a partition or reservation

    --reservation and --slurm-partition options have precedence.
    Otherwise, a default partition is used.
    """
    if self.reservation:
        return ['--reservation', self.reservation]
    return ['--partition', self.partition or sprof]

Step

A group of jobs which may run in parallel

If after_error is True, this step should run even if previous steps failed. Otherwise, it will only run if the previous steps succeed.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
class Step:
    """A group of jobs which may run in parallel

    If after_error is True, this step should run even if previous steps failed.
    Otherwise, it will only run if the previous steps succeed.
    """
    def __init__(self, jobs: List[JobArgs], after_error=False):
        self.jobs = jobs
        self.after_error = after_error

    def to_dict(self):
        return {
            'jobs': [j.args for j in self.jobs],
            'after_error': self.after_error,
        }

    @classmethod
    def from_dict(cls, d):
        return cls([JobArgs(args) for args in d['jobs']], d['after_error'])

balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=8)

Return balance list of sequences to be executed on slurm nodes Total list of sequences is splitted onto several nodes based on sequences_per_node. If the number of the required nodes is more than the max_nodes, the number of sequences_per_node is adjusted.

:param in_folder: Path to the input raw data without the run number. :param run: Run number. :param sequences: List of sequences. [-1] for obtaining all. :param sequences_per_node: Number of sequences to process per a node. :param karabo_da: Karabo data aggregator used as data file inset. :param max_nodes: Maximum number of maxwell nodes to use. :return: Balanced list of sequences.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def balance_sequences(in_folder: str, run: int, sequences: List[int],
                      sequences_per_node: int, karabo_da: Union[list, str],
                      max_nodes: int = 8):
    """Return balance list of sequences to be executed on slurm nodes
    Total list of sequences is splitted onto several nodes based on
    sequences_per_node. If the number of the required nodes is more than
    the max_nodes, the number of sequences_per_node is adjusted.

    :param in_folder: Path to the input raw data without the run number.
    :param run: Run number.
    :param sequences: List of sequences. [-1] for obtaining all.
    :param sequences_per_node: Number of sequences to process per a node.
    :param karabo_da: Karabo data aggregator used as data file inset.
    :param max_nodes: Maximum number of maxwell nodes to use.
    :return: Balanced list of sequences.
    """
    # TODO: some small detector notebooks have karabo_da as a list.
    # remove this str check after unifying the expected type across
    # correction notebooks.
    if isinstance(karabo_da, str):
        karabo_da = [karabo_da]
    elif not isinstance(karabo_da, list):
        raise TypeError("Balance sequences expects `karabo_da` as a string or list.")

    # data-mapping for LPD mini and GH2 25um uses karabo-da names like
    # LPDMINI00/2 or DA01/2 to identify individual modules. The /2 is not
    # part of the file name
    karabo_da = list({kda.split('/')[0] for kda in karabo_da})

    in_path = Path(in_folder, f"r{run:04d}")

    # TODO: remove ["-1"] after karabo_da refactor
    if karabo_da in [["-1"], ["all"]]:
        karabo_da = [""]

    # Get all possible sequences for the selected karabo_da
    sequence_files = []
    for k_da in karabo_da:
        sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5"))

    # Extract sequences from input files.
    seq_nums = {int(sf.stem[-5:]) for sf in sequence_files}

    # Validate selected sequences with sequences in in_folder
    if sequences != [-1]:
        seq_nums = sorted(seq_nums.intersection(sequences))
        if len(seq_nums) == 0:
            raise ValueError(
                f"Selected sequences {sequences} are not "
                f"available in {in_path}"
            )

    # Validate required nodes with max_nodes
    nsplits = len(seq_nums) // sequences_per_node
    if nsplits > max_nodes:
        sequences_per_node = math.ceil(len(seq_nums)/max_nodes)
        nsplits = max_nodes
        print(f"Changed to {sequences_per_node} sequences per node")
        print(f"to have a maximum of {max_nodes} concurrent jobs")
    elif nsplits == 0:
        nsplits = 1

    return [l.tolist() for l in np.array_split(list(seq_nums), nsplits)
            if l.size > 0]

create_finalize_script(fmt_args, cal_work_dir, job_list)

Create a finalize script to produce output report :param fmt_args: Dictionary of fmt arguments :param cal_work_dir: Path to temporary folder to run slurm job :param job_list: List of slurm jobs :return: The path of the created script

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def create_finalize_script(fmt_args, cal_work_dir, job_list) -> str:
    """
    Create a finalize script to produce output report
    :param fmt_args: Dictionary of fmt arguments
    :param cal_work_dir: Path to temporary folder to run slurm job
    :param job_list: List of slurm jobs
    :return: The path of the created script
    """
    tmpl = Template("""\
                    #!/usr/bin/env python3
                    import os
                    from xfel_calibrate.finalize import finalize

                    finalize(joblist={{joblist}},
                             finaljob=os.environ.get('SLURM_JOB_ID', ''),
                             cal_work_dir='{{cal_work_dir}}',
                             out_path='{{out_path}}',
                             version='{{version}}',
                             title='{{title}}',
                             author='{{author}}',
                             report_to='{{report_to}}',
                             data_path='{{in_folder}}',
                             request_time='{{request_time}}',
                             submission_time='{{submission_time}}')

                    """)

    fmt_args['joblist'] = job_list
    f_name = os.path.join(cal_work_dir, "finalize.py")
    with open(f_name, "w") as finfile:
        finfile.write(textwrap.dedent(tmpl.render(**fmt_args)))

    # change rights of the file to be:
    # executed and writable for user, readable for user, group and others
    all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
    os.chmod(f_name, all_stats)
    return f_name

extract_title_author(nb)

Tries to extract title, author from markdown.

The version is taken from git.

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def extract_title_author(nb):
    """ Tries to extract title, author from markdown.

    The version is taken from git.
    """

    first_md = first_markdown_cell(nb)
    if first_md is None:
        return None, None
    source = first_md["source"]
    title = re.findall(r'#+\s*(.*)\s*#+', source)
    author = re.findall(
        r'author[\s]*[:][\s]*(.*?)\s*(?:[,?]|version)', source, flags=re.IGNORECASE)

    title = title[0] if len(title) else None
    author = author[0] if len(author) else None
    return title, author

flatten_list(l)

Make a string representation of a list :param l: List or a string :return: Same string or string with first and last entry of a list

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def flatten_list(l):
    """
    Make a string representation of a list
    :param l: List or a string
    :return: Same string or string with first and last entry of a list
    """
    if not isinstance(l, list):
        return str(l)
    if len(l) > 1:
        return '{}-{}'.format(l[0], l[-1])
    elif len(l) == 1:
        return '{}'.format(l[0])
    else:
        return ''

get_par_attr(parms, key, attr, default=None)

Return the type of parameter with name key :param parms: List of parameters :param key: Name of the parameter to be considered :param attr: Name of the parameter attribute (e.g. value, type) :param default: Type to be returned if interested name is not found :return: The attribute of the parameter

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def get_par_attr(parms, key, attr, default=None):
    """
    Return the type of parameter with name key
    :param parms: List of parameters
    :param key: Name of the parameter to be considered
    :param attr: Name of the parameter attribute (e.g. value, type)
    :param default: Type to be returned if interested name is not found
    :return: The attribute of the parameter
    """
    for p in parms:
        if p.name == key:
            return getattr(p, attr, default)
    return default

make_par_table(parms)

Create a RST table with input parameters of the notebook

:param parms: parameters of the notebook

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def make_par_table(parms):
    """
    Create a RST table with input parameters of the notebook

    :param parms: parameters of the notebook
    """

    # Add space in long strings without line breakers ` ,-/` to
    # wrap them in latex
    def split_len(seq, length):
        """
        Splits a sequence into smaller segments of a specified length,
        concatenates them, and adds line-breaking characters
        to ensure proper line breaks in LaTeX.

        Args:
            seq (str): The sequence to be split.
            length (int): The desired length of each segment.

        Returns:
            str: The concatenated line with line-breaking characters.

        Examples:
            >>> split_len("slurm_prof_230711_095647.832671_0", 10)
            ''slurm_prof_230711_09\\-5647.832671_0\\-''
        """
        lbc = set(' ,-/')
        line = ''
        for i in range(0, len(seq), length):
            sub_line = seq[i:i + length]
            # Ensure proper line break if the
            # start of the new line begins with `_`
            if sub_line[0] == '_' and line[-1] == "-":
                line += '\\'
            line += sub_line.replace('/', '/\-')
            if not any(c in lbc for c in sub_line):
                line += '\-'
        return line

    # Prepare strings and estimate their length
    l_parms = []
    len_parms = [0, 0]
    max_len = [20, 20]
    for p in parms:
        name = p.name.replace('_', '-')
        if len(name) > max_len[0]:
            len_parms[0] = max_len[0]
            name = split_len(name, max_len[0])

        value = tex_escape(str(p.value))
        if len(value) > max_len[1]:
            len_parms[1] = max_len[1]
            value = split_len(value, max_len[1])
        if issubclass(p.type, str):
            value = "``{}''".format(value)
        comment = tex_escape(str(p.comment)[1:])
        l_parms.append([name, value, comment])

    # Fix column width is needed
    col_type = ['l', 'c', 'p{.3\\textwidth}']
    if len_parms[0] == max_len[0]:
        col_type[0] = col_type[2]
    if len_parms[1] == max_len[1]:
        col_type[1] = col_type[2]

    tmpl = Template('''
                    Input of the calibration pipeline
                    =================================

                    .. raw:: latex

                        \\begin{longtable}{ {% for k in p %}{{k}}{%- endfor %} }
                        \hline
                        {% for k in lines %}
                        {{ k[0] }} & {{ k[1] }} & {{ k[2] }} \\\\
                        {%- endfor %}
                        \hline
                        \end{longtable}
                    ''')

    return textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))

remove_duplications(l)

Remove duplicated elements in the list

:param l: Input list :return: Output list of unique elements

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def remove_duplications(l) -> list:
    """
    Remove duplicated elements in the list

    :param l: Input list
    :return: Output list of unique elements
    """
    unique_l = []
    for elem in l:
        if elem not in unique_l:
            unique_l.append(elem)
    if unique_l != l:
        print("Duplicated concurrency parameters were removed")
    return unique_l

run(argv=None)

Run a calibration task with parser arguments

Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/calibrate.py
def run(argv=None):
    """ Run a calibration task with parser arguments """
    # Ensure files are opened as UTF-8 by default, regardless of environment.
    if "readthedocs.org" not in sys.executable:
        locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))

    if argv is None:
        argv = sys.argv

    args, nb_details = parse_argv_and_load_nb(argv)

    concurrency = nb_details.concurrency
    concurrency_par = args["concurrency_par"] or concurrency['parameter']
    if concurrency_par == concurrency['parameter']:
        # Use the defaults from notebook.py to split the work into several jobs
        concurrency_defval = concurrency.get('default concurrency', None)
        concurrency_func = concurrency.get('use function', None)
    else:
        # --concurrency-par specified something different from notebook.py:
        # don't use the associated settings from there.
        concurrency_defval = concurrency_func = None

    notebook_path = nb_details.path
    nb = nb_details.contents

    title, author = extract_title_author(nb)
    version = get_pycalib_version()

    if not title:
        title = f"{nb_details.detector} {nb_details.caltype} Calibration"
    if not author:
        author = "anonymous"
    if not version:
        version = ""

    title = title.rstrip()

    # request_time is in local timezone
    if args["request_time"] == "Now":
        request_time = datetime.now(tz=timezone.utc)
    else:
        request_time = datetime.fromisoformat(args["request_time"])

    # check if concurrency parameter is given and we run concurrently
    if concurrency_par is not None and not any(
            p.name == concurrency_par for p in nb_details.default_params
    ):
        msg = f"Notebook cannot be run concurrently: no {concurrency_par} parameter"
        warnings.warn(msg, RuntimeWarning)

    # If not explicitly specified, use a new profile for ipcluster
    default_params_by_name = {p.name: p.value for p in nb_details.default_params}
    if 'cluster_profile' in default_params_by_name:
        if args.get("cluster_profile") == default_params_by_name["cluster_profile"]:
            args['cluster_profile'] = f"slurm_prof_{request_time:%y%m%d_%H%M%S.%f}"

    # wait on all jobs to run and then finalize the run by creating a report from the notebooks
    out_path = Path(default_report_path) / nb_details.detector / nb_details.caltype / datetime.now().isoformat()
    if try_report_to_output:
        if "out_folder" in args:
            out_path = Path(args["out_folder"]).absolute()
        else:
            print(f"No 'out_folder' given, outputting to '{out_path}' instead.")

    out_path.mkdir(parents=True, exist_ok=True)

    # Use given report name, or automatic unique name if not specified
    det_name = args.get('karabo_id', nb_details.detector)
    unique_name = f"{det_name}-{nb_details.caltype}-{request_time:%y%m%d_%H%M%S.%f}"
    if args['skip_report']:
        report_to = ''
    elif args["report_to"] is None:
        report_to = out_path / f"{unique_name}.pdf"
        print(f"report_to not specified, will use {report_to}")
    else:
        report_to = Path(args["report_to"]).with_suffix('.pdf').absolute()

    if report_to:
        # Work dir matching report file but without .pdf
        cal_work_dir = report_to.with_suffix('')
    else:
        cal_work_dir = out_path / unique_name
    cal_work_dir.mkdir(parents=True)

    # Write all input parameters to rst file to be included to final report
    parms = parameter_values(nb_details.default_params, **args)
    (cal_work_dir / "InputParameters.rst").write_text(make_par_table(parms))
    # And save the invocation of this script itself
    (cal_work_dir / "run_calibrate.sh").write_text(
        f'# pycalibration version: {version}\n' + shlex.join(argv)
    )

    # Copy the bash script which will be used to run notebooks
    shutil.copy2(
        os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"),
        cal_work_dir / "pycalib-run-nb.sh"
    )

    if nb_details.user_venv:
        print("Using specified venv:", nb_details.user_venv)
        python_exe = str(nb_details.user_venv / 'bin' / 'python')
    else:
        python_exe = python_path

    # Write metadata about calibration job to output folder
    metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir, new=True)

    parm_subdict = metadata.setdefault("calibration-configurations", {})
    for p in parms:
        name = consolize_name(p.name)
        parm_subdict[name] = p.value

    metadata["pycalibration-version"] = version
    metadata["report-path"] = str(report_to) if report_to \
        else '# REPORT SKIPPED #'
    metadata['reproducible'] = not args['not_reproducible']
    metadata["concurrency"] = {
        'parameter': concurrency_par,
        'default': concurrency_defval,
        'function': concurrency_func,
    }
    metadata["notebook"] = {
        'title': title,
        'author': author,
    }
    metadata['python-environment'] = {
        'path': python_exe,
        'python-version': get_python_version(python_exe),
    }
    if args["constants_from"]:
        with open(args["constants_from"], "r", encoding='utf-8') as f:
            d = yaml.safe_load(f)
        metadata["retrieved-constants"] = d["retrieved-constants"]
    metadata.save()

    # Record installed Python packages for reproducing the environment
    if not args['skip_env_freeze']:
        with (cal_work_dir / 'requirements.txt').open('wb') as f:
            check_call([python_exe, '-m', 'pip', 'freeze'], stdout=f)

    folder = get_par_attr(parms, 'in_folder', 'value', '')

    pre_jobs = []
    cluster_cores = concurrency.get("cluster cores", 8)
    # Check if there are pre-notebooks
    for pre_notebook_path in nb_details.pre_paths:
        lead_nb = nbformat.read(pre_notebook_path, as_version=4)
        pre_jobs.append(prepare_job(
            cal_work_dir, lead_nb, pre_notebook_path, args,
            cluster_cores=cluster_cores
        ))

    main_jobs = []
    if concurrency_par is None:
        main_jobs.append(prepare_job(
            cal_work_dir, nb,
            notebook_path, args,
            cluster_cores=cluster_cores,
        ))
    else:
        cvals = args.get(concurrency_par, None)

        # Consider [-1] as None
        if (cvals is None or cvals == [-1]) and concurrency_defval is not None:
            print(f"Concurrency parameter '{concurrency_par}' "
                  f"is taken from notebooks.py")
            cvals = concurrency_defval if isinstance(concurrency_defval, (list, tuple)) else range(concurrency_defval)

        if cvals is None:
            defcval = get_par_attr(parms, concurrency_par, 'value')
            if defcval is not None:
                print(f"Concurrency parameter '{concurrency_par}' "
                      f"is taken from '{notebook_path}'")
                cvals = defcval if isinstance(defcval, (list, tuple)) else [defcval]

        if concurrency_func:
            func = get_notebook_function(nb, concurrency_func)
            if func is None:
                warnings.warn(
                    f"Didn't find concurrency function {concurrency_func} in notebook",
                    RuntimeWarning
                )
            else:
                df = {}
                exec(func, df)
                f = df[concurrency_func]
                import inspect
                sig = inspect.signature(f)
                if cvals:
                    # in case default needs to be used for function call
                    args[concurrency_par] = cvals
                callargs = [args[arg] for arg in sig.parameters]
                cvals = f(*callargs)
                print(f"Split concurrency into {cvals}")

        if cvals is None:
            raise ValueError(
                f"No values found for {concurrency_par} (concurrency parameter)"
            )

        # get expected type
        cvtype = get_par_attr(parms, concurrency_par, 'type', list)
        cvals = remove_duplications(cvals)

        if not cvals:
            raise ValueError("Splitting data for concurrency gave 0 jobs")

        for cnum, cval in enumerate(cvals):
            show_title = cnum == 0
            cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval

            main_jobs.append(prepare_job(
                cal_work_dir, nb, notebook_path, args,
                concurrency_par, cval,
                cluster_cores=cluster_cores,
                show_title=show_title,
            ))

    # Prepare dependent notebooks (e.g. summaries after correction)
    dep_jobs = []
    for i, dep_notebook_path in enumerate(nb_details.dep_paths):
        dep_nb = nbformat.read(dep_notebook_path, as_version=4)
        dep_jobs.append(prepare_job(
            cal_work_dir, dep_nb, dep_notebook_path, args,
            cluster_cores=cluster_cores,
        ))

    job_chain = JobChain([
        Step(pre_jobs),
        Step(main_jobs),
        Step(dep_jobs, after_error=True)
    ], Path(cal_work_dir), python_exe)

    # Save information about jobs for reproducibility
    job_chain.save()

    if args['prepare_only']:
        print("Files prepared, not executing now (--prepare-only option).")
        print("To execute the notebooks, run:")
        rpt_opts = ''
        if nb_details.user_venv is not None:
            rpt_opts = f'--python {python_exe}'
        print(f"  python -m xfel_calibrate.repeat {cal_work_dir} {rpt_opts}")
        return

    print("Calibration work directory (including Slurm .out files):")
    print(" ", cal_work_dir)

    submission_time = datetime.now(tz=timezone.utc)

    # Launch the calibration work
    if args["no_cluster_job"]:
        print("Running notebooks directly, not via Slurm...")
        errors = job_chain.run_direct()
        joblist = []
    else:
        print("Submitting jobs to Slurm...")

        joblist = job_chain.submit_jobs(SlurmOptions(
            job_name=args.get('slurm_name', 'xfel_calibrate'),
            nice=args['slurm_scheduling'],
            mem=args['slurm_mem'],
            reservation=args['reservation'],
            partition=args['slurm_partition'],
        ))
        errors = False

    fmt_args = {'cal_work_dir': cal_work_dir,
                'out_path': out_path,
                'version': version,
                'title': title,
                'author': author,
                'report_to': report_to,
                'in_folder': folder,
                'request_time': request_time.isoformat(),
                'submission_time': submission_time.isoformat(),
                }

    joblist.append(run_finalize(
        fmt_args=fmt_args,
        cal_work_dir=cal_work_dir,
        job_list=joblist,
        sequential=args["no_cluster_job"],
        partition=args["slurm_partition"] or "exfel",
    ))

    if any(j is not None for j in joblist):
        print("Submitted the following SLURM jobs: {}".format(",".join(joblist)))

    return int(errors)