Source code for toolbox_scs.test.test_dssc_cls

import unittest
import logging
import os
import argparse
import shutil
from time import strftime

import numpy as np
import xarray as xr
import extra_data as ed

import toolbox_scs as tb
import toolbox_scs.detectors as tbdet

logging.basicConfig(level=logging.DEBUG)
[docs]log_root = logging.getLogger(__name__)
[docs]suites = {"no-processing": ( "test_create", "test_use_xgm_tim", ), "processing": ( "test_processing_quick",
#"test_normalization_all", ) }
[docs]_temp_dirs = ['tmp']
[docs]def setup_tmp_dir(): for d in _temp_dirs: if not os.path.isdir(d): os.mkdir(d)
[docs]def cleanup_tmp_dir(): for d in _temp_dirs: shutil.rmtree(d, ignore_errors=True) log_root.info(f'remove {d}')
[docs]class TestDSSC(unittest.TestCase): @classmethod
[docs] def setUpClass(cls): log_root.info("Start global setup") setup_tmp_dir() log_root.info("Finished global setup, start tests")
@classmethod
[docs] def tearDownClass(cls): log_root.info("Clean up test environment....") cleanup_tmp_dir()
[docs] def test_create(self): proposal_nb = 2212 run_nb = 235 run = tb.load_run(proposal_nb, run_nb, include='*DA*') run_info = tbdet.load_dssc_info(proposal_nb, run_nb) bins_trainId = tb.get_array(run, 'PP800_PhaseShifter', 0.04) bins_pulse = ['pumped', 'unpumped'] * 10 binner1 = tbdet.create_dssc_bins("trainId", run_info['trainIds'], bins_trainId.values) binner2 = tbdet.create_dssc_bins("pulse", np.linspace(0,19,20, dtype=int), bins_pulse) binners = {'trainId': binner1, 'pulse': binner2} params = {'binners': binners} # normal run235 = tbdet.DSSCBinner(proposal_nb, run_nb) del(run235) run235 = tbdet.DSSCBinner(2212, 235, dssc_coords_stride=1) run235.add_binner('trainId', binner1) run235.add_binner('pulse', binner2) xgm_threshold=(300.0, 8000.0) run235.create_pulsemask('xgm', xgm_threshold) self.assertIsNotNone(run235.get_xgm_binned()) self.assertEqual(run235.binners['trainId'].values[0], np.float32(7585.52)) # expected fails with self.assertRaises(FileNotFoundError) as cm: run235 = tbdet.DSSCBinner(2212, 2354) err_msg = "[Errno 2] No such file or directory: " \ "'/gpfs/exfel/exp/SCS/201901/p002212/raw/r2354'" self.assertEqual(str(cm.exception), err_msg)
[docs] def test_use_xgm_tim(self): proposal_nb = 2599 run_nb = 103 run_info = tbdet.load_dssc_info(proposal_nb, run_nb) fpt = run_info['frames_per_train'] n_trains = run_info['number_of_trains'] trainIds = run_info['trainIds'] buckets_train = ['chunk1']*n_trains buckets_pulse = ['image_unpumped', 'dark', 'image_pumped', 'dark'] binner1 = tbdet.create_dssc_bins("trainId",trainIds,buckets_train) binner2 = tbdet.create_dssc_bins("pulse", np.linspace(0,fpt-1,fpt, dtype=int), buckets_pulse) binners = {'trainId': binner1, 'pulse': binner2} dssc_frame_coords = np.linspace(0,2,2, dtype=np.uint64) bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners, dssc_coords_stride=dssc_frame_coords) bin_obj.load_xgm() bin_obj.load_tim() xgm_binned = bin_obj.get_xgm_binned() tim_binned = bin_obj.get_tim_binned() self.assertIsNotNone(xgm_binned) self.assertIsNotNone(tim_binned)
[docs] def test_processing_quick(self): proposal_nb = 2530 module_list=[2] run_nb = 49 run_info = tbdet.load_dssc_info(proposal_nb, run_nb) fpt = run_info['frames_per_train'] n_trains = run_info['number_of_trains'] trainIds = run_info['trainIds'] buckets_train = np.zeros(n_trains) buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] binner1 = tbdet.create_dssc_bins("trainId", trainIds, buckets_train) binner2 = tbdet.create_dssc_bins("pulse", np.linspace(0,fpt-1,fpt, dtype=int), buckets_pulse) binners = {'trainId': binner1, 'pulse': binner2} bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) bin_obj.process_data( modules=module_list, filepath='./tmp/', chunksize=248) filename = f'./tmp/run_{run_nb}_module{module_list[0]}.h5' self.assertTrue(os.path.isfile(filename)) run_formatted = tbdet.DSSCFormatter('./tmp/') run_formatted.combine_files() attrs = {'run_type':'useful description', 'comment':'blabla', 'run_number':run_nb} run_formatted.add_attributes(attrs) run_formatted.save_formatted_data( f'./tmp/run_{run_nb}_formatted.h5') data = tbdet.load_xarray(f'./tmp/run_{run_nb}_formatted.h5') self.assertIsNotNone(data)
[docs] def test_normalization_all(self): proposal_nb = 2530 module_list=[2] # dark run_nb = 49 run_info = tbdet.load_dssc_info(proposal_nb, run_nb) fpt = run_info['frames_per_train'] n_trains = run_info['number_of_trains'] trainIds = run_info['trainIds'] buckets_train = np.zeros(n_trains) binner1 = tbdet.create_dssc_bins("trainId", trainIds, buckets_train) binner2 = tbdet.create_dssc_bins("pulse", np.linspace(0,fpt-1,fpt, dtype=int), np.linspace(0,fpt-1,fpt, dtype=int)) binners = {'trainId': binner1, 'pulse': binner2} bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) bin_obj.process_data( modules=module_list, filepath='./tmp/', chunksize=248) filename = f'./tmp/run_{run_nb}_module{module_list[0]}.h5' self.assertTrue(os.path.isfile(filename)) run_formatted = tbdet.DSSCFormatter('./tmp/') run_formatted.combine_files() attrs = {'run_type':'useful description', 'comment':'blabla', 'run_number':run_nb} run_formatted.add_attributes(attrs) run_formatted.save_formatted_data( f'./tmp/run_{run_nb}_formatted.h5') # main run run_nb = 50 run_info = tbdet.load_dssc_info(proposal_nb, run_nb) fpt = run_info['frames_per_train'] n_trains = run_info['number_of_trains'] trainIds = run_info['trainIds'] buckets_train = np.zeros(n_trains) buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] binner1 = tbdet.create_dssc_bins("trainId", trainIds, buckets_train) binner2 = tbdet.create_dssc_bins("pulse", np.linspace(0,fpt-1,fpt, dtype=int), buckets_pulse) binners = {'trainId': binner1, 'pulse': binner2} bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) dark = tbdet.load_xarray('./tmp/run_49_formatted.h5') bin_params = {'modules':module_list, 'chunksize':248, 'filepath':'./tmp/', 'xgm_normalization':True, 'normevery':2, 'dark_image':dark['data'] } bin_obj.process_data(**bin_params) filename = f'./tmp/run_{run_nb}_module{module_list[0]}.h5' self.assertTrue(os.path.isfile(filename))
[docs]def list_suites(): print("\nPossible test suites:\n" + "-" * 79) for key in suites: print(key) print("-" * 79 + "\n")
[docs]def suite(*tests): suite = unittest.TestSuite() for test in tests: suite.addTest(TestDSSC(test)) return suite
[docs]def main(*cliargs): try: for test_suite in cliargs: if test_suite in suites: runner = unittest.TextTestRunner(verbosity=2) runner.run(suite(*suites[test_suite])) else: log_root.warning( "Unknown suite: '{}'".format(test_suite)) pass except Exception as err: log_root.error("Unecpected error: {}".format(err), exc_info=True) pass
if __name__ == '__main__':
[docs] parser = argparse.ArgumentParser()
parser.add_argument('--list-suites', action='store_true', help='list possible test suites') parser.add_argument('--run-suites', metavar='S', nargs='+', action='store', help='a list of valid test suites') args = parser.parse_args() if args.list_suites: list_suites() if args.run_suites: main(*args.run_suites)