nb_args
Manipulating notebooks & translating parameters to command-line options
NBDetails
dataclass
¶
Details of a notebook-based workflow to run
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
@dataclass
class NBDetails:
"""Details of a notebook-based workflow to run"""
detector: str # e.g. AGIPD
caltype: str # e.g. CORRECT
path: Path
pre_paths: List[Path] # Notebooks to run before the main notebook
dep_paths: List[Path] # Notebooks to run after the main notebooks
contents: nbformat.NotebookNode
default_params: List[Parameter]
concurrency: Dict[str, Any] # Contents as in notebooks.py
user_venv: Optional[Path]
add_args_from_nb(parms, parser, cvar=None, no_required=False)
¶
Add argparse arguments for parameters in the first cell of a notebook.
Uses nbparameterise to extract the parameter information. Each foo_bar parameter gets a --foo-bar command line option. Boolean parameters get a pair of flags like --abc and --no-abc.
:param parms: List of nbparameterise Parameter objects :param parser: argparse.ArgumentParser instance to modify :param str cvar: Name of the concurrency parameter. :param bool no_required: If True, none of the added options are required.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def add_args_from_nb(parms, parser, cvar=None, no_required=False):
"""Add argparse arguments for parameters in the first cell of a notebook.
Uses nbparameterise to extract the parameter information. Each foo_bar
parameter gets a --foo-bar command line option.
Boolean parameters get a pair of flags like --abc and --no-abc.
:param parms: List of nbparameterise Parameter objects
:param parser: argparse.ArgumentParser instance to modify
:param str cvar: Name of the concurrency parameter.
:param bool no_required: If True, none of the added options are required.
"""
for p in parms:
if p.name == 'metadata_folder':
continue # Comes from xfel-calibrate machinery, can't be supplied
helpstr = ("Default: %(default)s" if not p.comment
else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip()))
required = (p.comment is not None
and "required" in p.comment
and not no_required
and p.name != cvar)
# This may be not a public API
# May require reprogramming in case of argparse updates
pars_group = parser._action_groups[2 if required else 1]
default = p.value if (not required) else None
if issubclass(p.type, list) or p.name == cvar:
ltype = type(p.value[0]) if issubclass(p.type, list) else p.type
range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False
pars_group.add_argument(f"--{consolize_name(p.name)}",
nargs='+',
type=ltype if not range_allowed else str,
default=default,
help=helpstr,
required=required,
action=make_intelli_list(ltype) if range_allowed else None)
elif issubclass(p.type, bool):
# For a boolean, make --XYZ and --no-XYZ options.
alt_group = pars_group.add_mutually_exclusive_group(required=required)
alt_group.add_argument(f"--{consolize_name(p.name)}",
action="store_true",
default=default,
help=helpstr,
dest=p.name)
alt_group.add_argument(f"--no-{consolize_name(p.name)}",
action="store_false",
default=default,
help=f"Opposite of --{consolize_name(p.name)}",
dest=p.name)
else:
pars_group.add_argument(f"--{consolize_name(p.name)}",
type=p.type,
default=default,
help=helpstr,
required=required)
consolize_name(name)
¶
Names of console parameters don't have underscores
deconsolize_args(args)
¶
Variable names have underscores
extend_params(nb, extend_func_name, argv)
¶
Add parameters in the first code cell by calling a function in the notebook
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def extend_params(nb, extend_func_name, argv):
"""Add parameters in the first code cell by calling a function in the notebook
"""
func = get_notebook_function(nb, extend_func_name)
if func is None:
warnings.warn(
f"Didn't find concurrency function {extend_func_name} in notebook",
RuntimeWarning
)
return
# Make a temporary parser that won't exit if it sees -h or --help
pre_parser = make_initial_parser(add_help=False)
params = extract_parameters(nb, lang='python')
add_args_from_nb(params, pre_parser, no_required=True)
known, _ = pre_parser.parse_known_args(argv[1:])
args = deconsolize_args(vars(known))
df = {}
exec(func, df)
f = df[extend_func_name]
sig = inspect.signature(f)
extension = f(*[args[p] for p in sig.parameters])
fcc = first_code_cell(nb)
fcc["source"] += "\n" + extension if extension else "\n"
first_code_cell(nb)
¶
Return the first code cell of a notebook
first_markdown_cell(nb)
¶
Return the first markdown cell of a notebook
get_cell_n(nb, cell_type, cell_n)
¶
Return notebook cell with given number and given type
:param nb: jupyter notebook :param cell_type: cell type, 'code' or 'markdown' :param cell_n: cell number (count from 0) :return: notebook cell
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def get_cell_n(nb, cell_type, cell_n):
"""
Return notebook cell with given number and given type
:param nb: jupyter notebook
:param cell_type: cell type, 'code' or 'markdown'
:param cell_n: cell number (count from 0)
:return: notebook cell
"""
counter = 0
for cell in nb.cells:
if cell.cell_type == cell_type:
if counter == cell_n:
return cell
counter += 1
make_epilog(nb, caltype=None)
¶
Make an epilog from the notebook to add to parser help
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def make_epilog(nb, caltype=None):
""" Make an epilog from the notebook to add to parser help
"""
msg = ""
header_cell = first_markdown_cell(nb)
lines = header_cell.source.split("\n") if header_cell is not None else ['']
if caltype:
msg += "{:<15} {}".format(caltype, lines[0]) + "\n"
else:
msg += "{}".format(lines[0]) + "\n"
pp = pprint.PrettyPrinter(indent=(17 if caltype else 0))
if len(lines[1:]):
plines = pp.pformat(lines[1:])[1:-1].split("\n")
for line in plines:
sline = line.replace("'", "", 1)
sline = sline.replace("', '", " " * (17 if caltype else 0), 1)
sline = sline[::-1].replace("'", "", 1)[::-1]
sline = sline.replace(" ,", " ")
if len(sline) > 1 and sline[0] == ",":
sline = sline[1:]
msg += sline + "\n"
msg += "\n"
return msg
make_intelli_list(ltype)
¶
Parses a list from range and comma expressions.
An expression of the form "1-5,6" will be parsed into the following list: [1,2,3,4,6]
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def make_intelli_list(ltype):
""" Parses a list from range and comma expressions.
An expression of the form "1-5,6" will be parsed into the following
list: [1,2,3,4,6]
"""
class IntelliListAction(argparse.Action):
element_type = ltype
def __init__(self, *args, **kwargs):
super(IntelliListAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
parsed_values = []
values = ",".join(values)
if isinstance(values, str):
for rcomp in values.split(","):
if "-" in rcomp:
start, end = rcomp.split("-")
parsed_values += list(range(int(start), int(end)))
else:
parsed_values += [int(rcomp)]
elif isinstance(values, (list, tuple)):
parsed_values = values
else:
parsed_values = [values, ]
parsed_values = [self.element_type(p) for p in parsed_values]
print("Parsed input {} to {}".format(values, parsed_values))
setattr(namespace, self.dest, parsed_values)
return IntelliListAction
parse_argv_and_load_nb(argv)
¶
Parse command-line arguments for xfel-calibrate to run a notebook
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def parse_argv_and_load_nb(argv) -> Tuple[Dict, NBDetails]:
"""Parse command-line arguments for xfel-calibrate to run a notebook"""
# extend the parser according to user input
# the first case is if a detector was given, but no calibration type
if len(argv) == 3 and "-h" in argv[2]:
detector = argv[1].upper()
try:
det_notebooks = notebooks[detector]
except KeyError:
# TODO: This should really go to stderr not stdout
print("Not one of the known detectors: {}".format(notebooks.keys()))
sys.exit(1)
msg = "Options for detector {}\n".format(detector)
msg += "*" * len(msg) + "\n\n"
# basically, this creates help in the form of
#
# TYPE some description that is
# indented for this type.
#
# The information is extracted from the first markdown cell of
# the notebook.
for caltype, notebook in det_notebooks.items():
if notebook.get("notebook") is None:
if notebook.get("user", {}).get("notebook") is None:
raise KeyError(
f"`{detector}` does not have a notebook path, for "
"notebooks that are stored in pycalibration set the "
"`notebook` key to a relative path or set the "
"`['user']['notebook']` key to an absolute path/path "
"pattern. Notebook configuration dictionary contains "
f"only: `{notebook}`"
""
)
# Everything should be indented by 17 spaces
msg += caltype.ljust(17) + "User defined notebook, arguments may vary\n"
msg += " "*17 + "User notebook expected to be at path:\n"
msg += " "*17 + notebook["user"]["notebook"] + "\n"
else:
nbpath = os.path.join(PKG_DIR, notebook["notebook"])
nb = nbformat.read(nbpath, as_version=4)
msg += make_epilog(nb, caltype=caltype)
make_initial_parser(epilog=msg).parse_args(argv[1:])
sys.exit() # parse_args should already exit for --help
elif len(argv) <= 3:
make_initial_parser().parse_args(argv[1:])
sys.exit() # parse_args should already exit - not enough args
# A detector and type was given. We derive the arguments
# from the corresponding notebook
args, _ = make_initial_parser(add_help=False).parse_known_args(argv[1:])
try:
nb_info = notebooks[args.detector.upper()][args.type.upper()]
except KeyError:
print("Not one of the known calibrations or detectors")
sys.exit(1)
# Pick out any arguments that may prevent reproducibility from
# working, sorted alphabetically and converted back to their
# canonical representation.
not_reproducible_args = sorted(
('--' + x.replace('_', '-')
for x in ['skip_env_freeze']
if getattr(args, x))
)
# If any of these arguments are set, present a warning.
if not_reproducible_args:
print('WARNING: One or more command line arguments ({}) may prevent '
'this specific correction result from being reproducible based '
'on its metadata. It may not be possible to restore identical '
'output data files when they have been deleted or lost. Please '
'ensure that the data retention policy of the chosen storage '
'location is sufficient for your '
'needs.'.format(', '.join(not_reproducible_args)))
if not args.not_reproducible:
# If not explicitly specified that reproducibility may be
# broken, remind the user and exit.
print('To proceed, you can explicitly allow reproducibility to '
'be broken by adding --not-reproducible')
sys.exit(1)
if nb_info["notebook"]:
notebook = os.path.join(PKG_DIR, nb_info["notebook"])
else:
# If `"notebook"` entry is None, then set it to the user provided
# notebook TODO: This is a very hacky workaround, better implementation
# is not really possible with the current state of this module
user_notebook_path = nb_info["user"]["notebook"]
# Pull out the variables in the templated path string, and get values
# from command line args (e.g. --proposal 1234 -> {proposal})
user_notebook_variables = [
name for (_, name, _, _) in string.Formatter().parse(user_notebook_path)
if name is not None
]
user_notebook_parser = argparse.ArgumentParser(add_help=False)
for var in user_notebook_variables:
user_notebook_parser.add_argument(f"--{var}")
user_notebook_args, _ = user_notebook_parser.parse_known_args(argv[1:])
notebook = user_notebook_path.format(**vars(user_notebook_args))
concurrency = nb_info.get("concurrency", {'parameter': None})
nb = nbformat.read(notebook, as_version=4)
# extend parameters if needed
ext_func = nb_info.get("extend parms", None)
if ext_func is not None:
extend_params(nb, ext_func, argv)
default_params = extract_parameters(nb, lang='python')
parser = make_initial_parser()
parser.description = make_epilog(nb)
add_args_from_nb(default_params, parser, cvar=concurrency['parameter'])
arg_dict = deconsolize_args(vars(parser.parse_args(argv[1:])))
user_venv = nb_info.get("user", {}).get("venv")
if user_venv is not None:
user_venv = Path(user_venv.format(**arg_dict))
return arg_dict, NBDetails(
detector=args.detector.upper(),
caltype=args.type.upper(),
path=Path(notebook),
pre_paths=[Path(PKG_DIR, p) for p in nb_info.get('pre_notebooks', [])],
dep_paths=[Path(PKG_DIR, p) for p in nb_info.get('dep_notebooks', [])],
contents=nb,
default_params=default_params,
concurrency=concurrency,
user_venv=user_venv,
)
set_figure_format(nb, enable_vector_format)
¶
Set svg format in inline backend for figures
If parameter enable_vector_format is set to True, svg format will be used for figures in the notebook rendering. Subsequently vector graphics figures will be used for report.
Source code in /usr/src/app/checkouts/readthedocs.org/user_builds/european-xfel-offline-calibration/envs/latest/lib/python3.8/site-packages/xfel_calibrate/nb_args.py
def set_figure_format(nb, enable_vector_format):
"""Set svg format in inline backend for figures
If parameter enable_vector_format is set to True, svg format will
be used for figures in the notebook rendering. Subsequently vector
graphics figures will be used for report.
"""
if enable_vector_format:
cell = get_cell_n(nb, 'code', 1)
cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n"