import unittest
import logging
import os
import argparse
import shutil
from time import strftime
import numpy as np
import xarray as xr
import extra_data as ed
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
logging.basicConfig(level=logging.DEBUG)
[docs]log_root = logging.getLogger(__name__)
[docs]suites = {"no-processing": (
"test_create",
"test_use_xgm_tim",
),
"processing": (
"test_processing_quick",
#"test_normalization_all",
)
}
[docs]def setup_tmp_dir():
for d in _temp_dirs:
if not os.path.isdir(d):
os.mkdir(d)
[docs]def cleanup_tmp_dir():
for d in _temp_dirs:
shutil.rmtree(d, ignore_errors=True)
log_root.info(f'remove {d}')
[docs]class TestDSSC(unittest.TestCase):
@classmethod
[docs] def setUpClass(cls):
log_root.info("Start global setup")
setup_tmp_dir()
log_root.info("Finished global setup, start tests")
@classmethod
[docs] def tearDownClass(cls):
log_root.info("Clean up test environment....")
cleanup_tmp_dir()
[docs] def test_create(self):
proposal_nb = 2212
run_nb = 235
run = tb.load_run(proposal_nb, run_nb, include='*DA*')
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
bins_trainId = tb.get_array(run,
'PP800_PhaseShifter',
0.04)
bins_pulse = ['pumped', 'unpumped'] * 10
binner1 = tbdet.create_dssc_bins("trainId",
run_info['trainIds'],
bins_trainId.values)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
params = {'binners': binners}
# normal
run235 = tbdet.DSSCBinner(proposal_nb, run_nb)
del(run235)
run235 = tbdet.DSSCBinner(2212, 235, dssc_coords_stride=1)
run235.add_binner('trainId', binner1)
run235.add_binner('pulse', binner2)
xgm_threshold=(300.0, 8000.0)
run235.create_pulsemask('xgm', xgm_threshold)
self.assertIsNotNone(run235.get_xgm_binned())
self.assertEqual(run235.binners['trainId'].values[0],
np.float32(7585.52))
# expected fails
with self.assertRaises(FileNotFoundError) as cm:
run235 = tbdet.DSSCBinner(2212, 2354)
err_msg = "[Errno 2] No such file or directory: " \
"'/gpfs/exfel/exp/SCS/201901/p002212/raw/r2354'"
self.assertEqual(str(cm.exception), err_msg)
[docs] def test_use_xgm_tim(self):
proposal_nb = 2599
run_nb = 103
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
buckets_train = ['chunk1']*n_trains
buckets_pulse = ['image_unpumped', 'dark',
'image_pumped', 'dark']
binner1 = tbdet.create_dssc_bins("trainId",trainIds,buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
dssc_frame_coords = np.linspace(0,2,2, dtype=np.uint64)
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb,
binners=binners,
dssc_coords_stride=dssc_frame_coords)
bin_obj.load_xgm()
bin_obj.load_tim()
xgm_binned = bin_obj.get_xgm_binned()
tim_binned = bin_obj.get_tim_binned()
self.assertIsNotNone(xgm_binned)
self.assertIsNotNone(tim_binned)
[docs] def test_processing_quick(self):
proposal_nb = 2530
module_list=[2]
run_nb = 49
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
buckets_train = np.zeros(n_trains)
buckets_pulse = ['image', 'dark'] * 99 + ['image_last']
binner1 = tbdet.create_dssc_bins("trainId",
trainIds,
buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
bin_obj.process_data(
modules=module_list, filepath='./tmp/', chunksize=248)
filename = f'./tmp/run_{run_nb}_module{module_list[0]}.h5'
self.assertTrue(os.path.isfile(filename))
run_formatted = tbdet.DSSCFormatter('./tmp/')
run_formatted.combine_files()
attrs = {'run_type':'useful description',
'comment':'blabla',
'run_number':run_nb}
run_formatted.add_attributes(attrs)
run_formatted.save_formatted_data(
f'./tmp/run_{run_nb}_formatted.h5')
data = tbdet.load_xarray(f'./tmp/run_{run_nb}_formatted.h5')
self.assertIsNotNone(data)
[docs] def test_normalization_all(self):
proposal_nb = 2530
module_list=[2]
# dark
run_nb = 49
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
buckets_train = np.zeros(n_trains)
binner1 = tbdet.create_dssc_bins("trainId",
trainIds,
buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,fpt-1,fpt, dtype=int),
np.linspace(0,fpt-1,fpt, dtype=int))
binners = {'trainId': binner1, 'pulse': binner2}
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
bin_obj.process_data(
modules=module_list, filepath='./tmp/', chunksize=248)
filename = f'./tmp/run_{run_nb}_module{module_list[0]}.h5'
self.assertTrue(os.path.isfile(filename))
run_formatted = tbdet.DSSCFormatter('./tmp/')
run_formatted.combine_files()
attrs = {'run_type':'useful description',
'comment':'blabla',
'run_number':run_nb}
run_formatted.add_attributes(attrs)
run_formatted.save_formatted_data(
f'./tmp/run_{run_nb}_formatted.h5')
# main run
run_nb = 50
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
buckets_train = np.zeros(n_trains)
buckets_pulse = ['image', 'dark'] * 99 + ['image_last']
binner1 = tbdet.create_dssc_bins("trainId",
trainIds,
buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
dark = tbdet.load_xarray('./tmp/run_49_formatted.h5')
bin_params = {'modules':module_list,
'chunksize':248,
'filepath':'./tmp/',
'xgm_normalization':True,
'normevery':2,
'dark_image':dark['data']
}
bin_obj.process_data(**bin_params)
filename = f'./tmp/run_{run_nb}_module{module_list[0]}.h5'
self.assertTrue(os.path.isfile(filename))
[docs]def list_suites():
print("\nPossible test suites:\n" + "-" * 79)
for key in suites:
print(key)
print("-" * 79 + "\n")
[docs]def suite(*tests):
suite = unittest.TestSuite()
for test in tests:
suite.addTest(TestDSSC(test))
return suite
[docs]def main(*cliargs):
try:
for test_suite in cliargs:
if test_suite in suites:
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite(*suites[test_suite]))
else:
log_root.warning(
"Unknown suite: '{}'".format(test_suite))
pass
except Exception as err:
log_root.error("Unecpected error: {}".format(err),
exc_info=True)
pass
if __name__ == '__main__':
[docs] parser = argparse.ArgumentParser()
parser.add_argument('--list-suites',
action='store_true',
help='list possible test suites')
parser.add_argument('--run-suites', metavar='S',
nargs='+', action='store',
help='a list of valid test suites')
args = parser.parse_args()
if args.list_suites:
list_suites()
if args.run_suites:
main(*args.run_suites)