314 Read trigger configuration keys (SMK, L1PSK, HLTPSK) and DB info from OKS via WEBDAQ REST API.
316 This reads the keys from the partition's TriggerConfiguration object and its
317 related L1TriggerConfiguration and TriggerDBConnection objects.
320 - Partition -> TriggerConfiguration -> L1TriggerConfiguration (Lvl1PrescaleKey)
321 - Partition -> TriggerConfiguration -> TriggerDBConnection (SuperMasterKey)
322 - Partition -> TriggerConfiguration -> HLTImplementationDB (hltPrescaleKey)
325 partition: The partition name (default: from TDAQ_PARTITION env var)
326 webdaq_base: Base URL for webis_server (default: from TDAQ_WEBDAQ_BASE env var)
327 strict: If True, raise an exception if OKS read fails (for --online-environment)
330 dict with keys: SMK, L1PSK, HLTPSK, db_alias (values may be None if not found)
333 RuntimeError: If strict=True and OKS read fails
338 if webdaq_base
is None:
339 webdaq_base = os.environ.get(
'TDAQ_WEBDAQ_BASE')
342 msg =
"TDAQ_WEBDAQ_BASE not set, cannot read from OKS"
344 raise RuntimeError(msg +
" (required for --online-environment)")
346 return {
'SMK':
None,
'L1PSK':
None,
'HLTPSK':
None,
'db_alias':
None}
349 if partition
is None:
350 partition = os.environ.get(
'TDAQ_PARTITION',
'ATLAS')
352 log.info(
"Reading trigger configuration keys from OKS via WEBDAQ: %s (partition=%s)",
353 webdaq_base, partition)
355 result = {
'SMK':
None,
'L1PSK':
None,
'HLTPSK':
None,
'db_alias':
None}
357 def extract_oks_data(response_json):
359 Extract data from OKS compact format: [name, type, attributes, relationships]
360 Returns tuple (attributes_dict, relationships_dict)
362 if isinstance(response_json, list)
and len(response_json) >= 4:
363 return response_json[2], response_json[3]
364 elif isinstance(response_json, list)
and len(response_json) >= 3:
365 return response_json[2], {}
366 return response_json, {}
369 """Extract object ID from a relationship reference."""
370 if isinstance(ref, list)
and len(ref) >= 2:
372 elif isinstance(ref, dict)
and 'id' in ref:
374 elif isinstance(ref, str):
383 url = f
"{webdaq_base}/info/current/{partition}/oks/Partition/{partition}?format=compact"
384 log.debug(
"Fetching Partition from OKS: %s", url)
386 response = requests.get(url, timeout=10)
387 if response.status_code == 200:
388 part_attrs, part_rels = extract_oks_data(response.json())
389 log.debug(
"Partition attributes: %s", part_attrs)
390 log.debug(
"Partition relationships: %s", part_rels)
394 if 'TriggerConfiguration' in part_rels:
395 trig_conf_id = get_ref_id(part_rels[
'TriggerConfiguration'])
398 log.debug(
"TriggerConfiguration ID: %s", trig_conf_id)
401 url = f
"{webdaq_base}/info/current/{partition}/oks/TriggerConfiguration/{trig_conf_id}?format=compact"
402 response = requests.get(url, timeout=10)
403 if response.status_code == 200:
404 trig_attrs, trig_rels = extract_oks_data(response.json())
405 log.debug(
"TriggerConfiguration attributes: %s", trig_attrs)
406 log.debug(
"TriggerConfiguration relationships: %s", trig_rels)
409 if 'l1' in trig_rels:
410 l1_id = get_ref_id(trig_rels[
'l1'])
412 url = f
"{webdaq_base}/info/current/{partition}/oks/L1TriggerConfiguration/{l1_id}?format=compact"
413 resp = requests.get(url, timeout=10)
414 if resp.status_code == 200:
415 l1_attrs, _ = extract_oks_data(resp.json())
416 log.debug(
"L1TriggerConfiguration attributes: %s", l1_attrs)
417 if 'Lvl1PrescaleKey' in l1_attrs:
418 result[
'L1PSK'] = int(l1_attrs[
'Lvl1PrescaleKey'])
419 log.info(
"Got L1PSK=%d from OKS", result[
'L1PSK'])
422 if 'TriggerDBConnection' in trig_rels:
423 db_id = get_ref_id(trig_rels[
'TriggerDBConnection'])
425 url = f
"{webdaq_base}/info/current/{partition}/oks/TriggerDBConnection/{db_id}?format=compact"
426 resp = requests.get(url, timeout=10)
427 if resp.status_code == 200:
428 db_attrs, _ = extract_oks_data(resp.json())
429 log.debug(
"TriggerDBConnection attributes: %s", db_attrs)
430 if 'SuperMasterKey' in db_attrs:
431 result[
'SMK'] = int(db_attrs[
'SuperMasterKey'])
432 log.info(
"Got SMK=%d from OKS", result[
'SMK'])
433 if 'Alias' in db_attrs:
434 result[
'db_alias'] = db_attrs[
'Alias']
435 log.info(
"Got db_alias=%s from OKS", result[
'db_alias'])
438 if 'hlt' in trig_rels:
439 hlt_id = get_ref_id(trig_rels[
'hlt'])
441 url = f
"{webdaq_base}/info/current/{partition}/oks/HLTImplementationDB/{hlt_id}?format=compact"
442 resp = requests.get(url, timeout=10)
443 if resp.status_code == 200:
444 hlt_attrs, _ = extract_oks_data(resp.json())
445 log.debug(
"HLTImplementationDB attributes: %s", hlt_attrs)
446 if 'hltPrescaleKey' in hlt_attrs:
447 result[
'HLTPSK'] = int(hlt_attrs[
'hltPrescaleKey'])
448 log.info(
"Got HLTPSK=%d from OKS", result[
'HLTPSK'])
450 msg = f
"Failed to fetch Partition from OKS: HTTP {response.status_code}"
452 raise RuntimeError(msg +
" (required for --online-environment)")
455 except requests.exceptions.RequestException
as e:
456 msg = f
"Error fetching trigger keys from OKS: {e}"
458 raise RuntimeError(msg +
" (required for --online-environment)")
460 except (ValueError, KeyError, TypeError)
as e:
461 msg = f
"Error parsing trigger keys from OKS: {e}"
463 raise RuntimeError(msg +
" (required for --online-environment)")
468 missing = [k
for k
in [
'SMK',
'L1PSK',
'HLTPSK']
if result.get(k)
is None]
470 raise RuntimeError(f
"Failed to get {', '.join(missing)} from OKS (required for --online-environment)")
815 """Update run parameters from IS, file, or conditions DB"""
818 if getattr(args,
'online_environment',
False):
819 log.info(
"Reading run parameters from Information Service via WEBDAQ")
823 solenoid_override = getattr(args,
'solenoid_current',
None)
824 toroids_override = getattr(args,
'toroids_current',
None)
827 partition=getattr(args,
'partition',
None),
828 webdaq_base=getattr(args,
'webdaq_base',
None),
830 solenoid_current_override=solenoid_override,
831 toroids_current_override=toroids_override)
833 if args.run_number
is None and run_params.run_number
is not None:
834 args.run_number = run_params.run_number
835 log.info(
"Using run_number=%d from IS", args.run_number)
836 if args.lb_number
is None and run_params.lb_number
is not None:
837 args.lb_number = run_params.lb_number
838 log.info(
"Using lb_number=%d from IS", args.lb_number)
839 if args.sor_time
is None and run_params.sor_time
is not None:
840 args.sor_time = run_params.sor_time
841 log.info(
"Using sor_time=%s from IS", args.sor_time)
842 if args.detector_mask
is None and run_params.detector_mask
is not None:
843 args.detector_mask = run_params.detector_mask
844 log.info(
"Using detector_mask=%s from IS", args.detector_mask)
846 args.solenoid_current = run_params.solenoid_current
847 args.toroids_current = run_params.toroids_current
849 if (args.run_number
is not None and args.lb_number
is None)
or (args.run_number
is None and args.lb_number
is not None):
850 log.error(
"Both or neither of the options -R (--run-number) and -L (--lb-number) have to be specified")
852 if args.run_number
is None and args.file:
853 from eformat
import EventStorage
854 dr = EventStorage.pickDataReader(args.file[0])
855 args.run_number = dr.runNumber()
856 args.lb_number = dr.lumiblockNumber()
859 if (args.sor_time
is None or args.detector_mask
is None)
and args.run_number
is not None:
860 sor_params = AthHLT.get_sor_params(args.run_number)
861 log.debug(
'SOR parameters: %s', sor_params)
862 if sor_params
is None:
863 log.error(
"Run %d does not exist. If you want to use this run-number specify "
864 "remaining run parameters, e.g.: --sor-time=now --detector-mask=all", args.run_number)
867 if args.sor_time
is None and sor_params
is not None:
868 args.sor_time =
arg_sor_time(str(sor_params[
'SORTime']))
870 if args.detector_mask
is None and sor_params
is not None:
871 dmask = sor_params[
'DetectorMask']
872 if args.run_number < AthHLT.CondDB._run2:
878 if getattr(args,
'solenoid_current',
None)
is None:
879 args.solenoid_current = RunParams.DEFAULT_SOLENOID_CURRENT
880 log.debug(
"Using default solenoid_current=%.1f", args.solenoid_current)
881 if getattr(args,
'toroids_current',
None)
is None:
882 args.toroids_current = RunParams.DEFAULT_TOROIDS_CURRENT
883 log.debug(
"Using default toroids_current=%.1f", args.toroids_current)
973 parser = argparse.ArgumentParser(prog=
'athenaEF.py', formatter_class=
974 lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
975 usage =
'%(prog)s [OPTION]... -f FILE jobOptions',
977 parser.expert_groups = []
980 g = parser.add_argument_group(
'Options')
981 g.add_argument(
'jobOptions', nargs=
'?', help=
'job options: CA module (package.module:function), pickle file (.pkl), or JSON file (.json)')
982 g.add_argument(
'--threads', metavar=
'N', type=int, default=1, help=
'number of threads')
983 g.add_argument(
'--concurrent-events', metavar=
'N', type=int, help=
'number of concurrent events if different from --threads')
984 g.add_argument(
'--log-level',
'-l', metavar=
'LVL', type=arg_log_level, default=
'INFO,ERROR', help=
'OutputLevel of athena,POOL')
985 g.add_argument(
'--precommand',
'-c', metavar=
'CMD', action=
'append', default=[],
986 help=
'Python commands executed before job options')
987 g.add_argument(
'--postcommand',
'-C', metavar=
'CMD', action=
'append', default=[],
988 help=
'Python commands executed after job options')
989 g.add_argument(
'--interactive',
'-i', action=
'store_true', help=
'interactive mode')
990 g.add_argument(
'--help',
'-h', nargs=
'?', choices=[
'all'], action=MyHelp, help=
'show help')
992 g = parser.add_argument_group(
'Input/Output')
993 g.add_argument(
'--file',
'--filesInput',
'-f', action=
'append', help=
'input RAW file')
994 g.add_argument(
'--save-output',
'-o', metavar=
'FILE', help=
'output file name')
995 g.add_argument(
'--number-of-events',
'--evtMax',
'-n', metavar=
'N', type=int, default=-1, help=
'processes N events (default: -1, means all)')
996 g.add_argument(
'--skip-events',
'--skipEvents',
'-k', metavar=
'N', type=int, default=0, help=
'skip N first events')
997 g.add_argument(
'--loop-files', action=
'store_true', help=
'loop over input files if no more events')
1000 g = parser.add_argument_group(
'Performance and debugging')
1001 g.add_argument(
'--perfmon', action=
'store_true', help=
'enable PerfMon')
1002 g.add_argument(
'--tcmalloc', action=
'store_true', default=
True, help=
'use tcmalloc')
1003 g.add_argument(
'--stdcmalloc', action=
'store_true', help=
'use stdcmalloc')
1004 g.add_argument(
'--stdcmath', action=
'store_true', help=
'use stdcmath library')
1005 g.add_argument(
'--imf', action=
'store_true', default=
True, help=
'use Intel math library')
1006 g.add_argument(
'--show-includes',
'-s', action=
'store_true', help=
'show printout of included files')
1009 g = parser.add_argument_group(
'Conditions')
1010 g.add_argument(
'--run-number',
'-R', metavar=
'RUN', type=int,
1011 help=
'run number (if None, read from first event)')
1012 g.add_argument(
'--lb-number',
'-L', metavar=
'LBN', type=int,
1013 help=
'lumiblock number (if None, read from first event)')
1014 g.add_argument(
'--conditions-run', metavar=
'RUN', type=int, default=
None,
1015 help=
'reference run number for conditions lookup (use when IS run number has no COOL data)')
1016 g.add_argument(
'--sor-time', type=arg_sor_time,
1017 help=
'The Start Of Run time. Three formats are accepted: '
1018 '1) the string "now", for current time; '
1019 '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
1020 '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from the conditions DB')
1021 g.add_argument(
'--detector-mask', metavar=
'MASK', type=arg_detector_mask,
1022 help=
'detector mask (if None, read from the conditions DB), use string "all" to enable all detectors')
1025 g = parser.add_argument_group(
'Database')
1026 g.add_argument(
'--use-database',
'-b', action=
'store_true',
1027 help=
'configure from trigger database using SMK')
1028 g.add_argument(
'--db-server', metavar=
'DB', default=
'TRIGGERDB_RUN3', help=
'DB server name (alias)')
1029 g.add_argument(
'--smk', type=int, default=
None, help=
'Super Master Key')
1030 g.add_argument(
'--l1psk', type=int, default=
None, help=
'L1 prescale key')
1031 g.add_argument(
'--hltpsk', type=int, default=
None, help=
'HLT prescale key')
1032 g.add_argument(
'--use-crest', action=
'store_true', default=
False,
1033 help=
'Use CREST for trigger configuration')
1034 g.add_argument(
'--crest-server', metavar=
'URL', default=
None,
1035 help=
'CREST server URL (defaults to flags.Trigger.crestServer)')
1036 g.add_argument(
'--dump-config', action=
'store_true', help=
'Dump joboptions JSON file')
1037 g.add_argument(
'--dump-config-exit', action=
'store_true', help=
'Dump joboptions JSON file and exit')
1040 g = parser.add_argument_group(
'Magnets')
1041 g.add_argument(
'--solenoid-current', type=float, default=
None,
1042 help=
'Solenoid current in Amperes (default: nominal current for offline running, required from IS online)')
1043 g.add_argument(
'--toroids-current', type=float, default=
None,
1044 help=
'Toroids current in Amperes (default: nominal current for offline running, required from IS online)')
1047 g = parser.add_argument_group(
'Online')
1048 g.add_argument(
'--online-environment', action=
'store_true',
1049 help=
'Enable online environment: read run parameters from IS and trigger '
1050 'configuration keys (SMK, L1PSK, HLTPSK) from OKS via WEBDAQ REST API')
1051 g.add_argument(
'--partition', metavar=
'NAME', default=
None,
1052 help=
'TDAQ partition name (defaults to TDAQ_PARTITION environment variable)')
1053 g.add_argument(
'--webdaq-base', metavar=
'URL', default=
None,
1054 help=
'WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE environment variable)')
1057 g = parser.add_argument_group(
'Online Histogramming')
1058 g.add_argument(
'--oh-monitoring',
'-M', action=
'store_true', default=
False,
1059 help=
'enable online histogram publishing via WebdaqHistSvc')
1062 g = parser.add_argument_group(
'Expert')
1063 parser.expert_groups.append(g)
1064 (args, unparsed_args) = parser.parse_known_args()
1068 from ROOT
import gROOT
1072 import AthenaCommon.Logging
1073 AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
1074 AthenaCommon.Logging.log.setFormat(
"%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
1075 if args.show_includes:
1076 from AthenaCommon.Include
import include
1077 include.setShowIncludes(
True )
1080 if not args.concurrent_events:
1081 args.concurrent_events = args.threads
1084 from TrigPSC
import PscConfig
1085 from TrigPSC.PscDefaultFlags
import defaultOnlineFlags
1088 flags = defaultOnlineFlags()
1091 if args.oh_monitoring:
1092 flags.Trigger.Online.useOnlineWebdaqHistSvc =
True
1093 log.info(
"Enabled WebdaqHistSvc for online histogram publishing")
1096 log.info(
"Using CREST for trigger configuration: %s", args.use_crest)
1098 flags.Trigger.useCrest =
True
1099 if args.crest_server:
1100 flags.Trigger.crestServer = args.crest_server
1102 args.crest_server = flags.Trigger.crestServer
1106 if args.use_database:
1110 PscConfig.forcePSK = (args.hltpsk
is not None)
or args.online_environment
1115 if not args.use_database
and args.jobOptions
and not args.jobOptions.endswith(
'.json'):
1116 PscConfig.unparsedArguments = unparsed_args
1117 for flag_arg
in unparsed_args:
1118 flags.fillFromString(flag_arg)
1120 PscConfig.interactive = args.interactive
1121 PscConfig.exitAfterDump = args.dump_config_exit
1129 if args.conditions_run
is not None:
1130 log.info(
"Using conditions from reference run %d (overriding run %s for IOV lookup)",
1131 args.conditions_run, args.run_number)
1132 flags.Input.ConditionsRunNumber = args.conditions_run
1135 if args.number_of_events > 0:
1136 flags.Exec.MaxEvents = args.number_of_events
1139 if args.skip_events > 0:
1140 flags.Exec.SkipEvents = args.skip_events
1146 flags.PerfMon.doFastMonMT = args.perfmon
1150 flags.Trigger.Online.useEFByteStreamSvc =
True
1151 ef = flags.Trigger.Online.EFInterface
1152 ef_files = args.file
if args.file
else []
1154 ef.LoopFiles = args.loop_files
1155 ef.NumEvents = args.number_of_events
1156 ef.SkipEvents = args.skip_events
1157 ef.RunNumber = args.run_number
1161 log.info(
"Executing precommand(s)")
1162 for cmd
in args.precommand:
1163 log.info(
" %s", cmd)
1164 exec(cmd, globals(), {
'flags': flags})
1167 is_database = args.use_database
1171 if not is_database
and args.jobOptions:
1172 jobOptions = args.jobOptions
1173 is_pickle = jobOptions.endswith(
'.pkl')
1174 is_json = jobOptions.endswith(
'.json')
1180 crestconn = TriggerCrestUtil.getCrestConnection(args.db_server)
1181 db_alias = f
"{args.crest_server}/{crestconn}"
1182 log.info(
"Loading configuration via CREST from %s with SMK %d", db_alias, args.smk)
1184 db_alias = args.db_server
1185 log.info(
"Loading configuration from database %s with SMK %d", db_alias, args.smk)
1190 num_threads=args.threads, num_slots=args.concurrent_events,
1192 log.info(
"Configuration loaded from database")
1196 log.info(
"Loading configuration from pickle file: %s", jobOptions)
1197 with open(jobOptions,
'rb')
as f:
1198 acc = pickle.load(f)
1199 log.info(
"Configuration loaded from pickle")
1203 log.info(
"Loading configuration from JSON file: %s", jobOptions)
1207 num_threads=args.threads, num_slots=args.concurrent_events,
1209 log.info(
"Configuration loaded from JSON")
1217 log.info(
"Loading CA configuration from: %s", jobOptions)
1220 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
1221 from AthenaConfiguration.MainServicesConfig
import addMainSequences
1222 from TrigServices.TriggerUnixStandardSetup
import commonServicesCfg
1223 from AthenaConfiguration.ComponentFactory
import CompFactory
1225 locked_flags = flags.clone()
1229 cfg = ComponentAccumulator(CompFactory.AthSequencer(
"AthMasterSeq", Sequential=
True))
1230 cfg.setAppProperty(
'ExtSvcCreates',
False)
1231 cfg.setAppProperty(
"MessageSvcType",
"TrigMessageSvc")
1232 cfg.setAppProperty(
"JobOptionsSvcType",
"TrigConf::JobOptionsSvc")
1235 addMainSequences(locked_flags, cfg)
1236 cfg.merge(commonServicesCfg(locked_flags))
1239 cfg_func = AthHLT.getCACfg(jobOptions)
1240 cfg.merge(cfg_func(flags))
1243 if args.postcommand:
1244 log.info(
"Executing postcommand(s)")
1245 for cmd
in args.postcommand:
1246 log.info(
" %s", cmd)
1247 exec(cmd, globals(), {
'flags': flags,
'cfg': cfg})
1248 args.postcommand = []
1251 fname =
"HLTJobOptions_EF"
1252 log.info(
"Dumping configuration to %s.pkl and %s.json", fname, fname)
1253 with open(f
"{fname}.pkl",
"wb")
as f:
1256 from TrigConfIO.JsonUtils
import create_joboptions_json
1257 create_joboptions_json(f
"{fname}.pkl", f
"{fname}.json")
1260 if args.dump_config_exit:
1261 log.info(
"Configuration dumped to %s.json. Exiting...", fname)
1265 log.info(
"Loading configuration from %s.json via TrigConf::JobOptionsSvc", fname)
1269 num_threads=args.threads, num_slots=args.concurrent_events,
1272 log.info(
"Configuration loaded with HLT online services")
1275 if args.postcommand:
1276 log.info(
"Executing postcommand(s)")
1277 for cmd
in args.postcommand:
1278 log.info(
" %s", cmd)
1279 exec(cmd, globals(), {
'flags': flags,
'acc': acc})
1282 if args.dump_config
or args.dump_config_exit:
1283 fname =
"HLTJobOptions_EF"
1287 from TrigConfIO.HLTTriggerConfigAccess
import HLTJobOptionsAccess
1288 log.info(
"Fetching configuration from database for dump...")
1289 jo_access = HLTJobOptionsAccess(dbalias=acc.db_server, smkey=acc.smk)
1290 props = jo_access.algorithms()
1292 log.info(
"Dumping configuration to %s.json", fname)
1293 hlt_json = {
'filetype':
'joboptions',
'properties': props}
1294 with open(f
"{fname}.json",
"w")
as f:
1295 json.dump(hlt_json, f, indent=4, sort_keys=
True, ensure_ascii=
True)
1299 props = acc.properties
1301 log.info(
"Dumping configuration to %s.json", fname)
1302 hlt_json = {
'filetype':
'joboptions',
'properties': props}
1303 with open(f
"{fname}.json",
"w")
as f:
1304 json.dump(hlt_json, f, indent=4, sort_keys=
True, ensure_ascii=
True)
1306 log.warning(
"No properties available to dump")
1310 app_props, msg_props, comp_props = acc.gatherProps()
1311 props = {
"ApplicationMgr": app_props,
"MessageSvc": msg_props}
1312 for comp, name, value
in comp_props:
1313 props.setdefault(comp, {})[name] = value
1315 log.info(
"Dumping configuration to %s.json", fname)
1316 hlt_json = {
'filetype':
'joboptions',
'properties': props}
1317 with open(f
"{fname}.json",
"w")
as f:
1318 json.dump(hlt_json, f, indent=4, sort_keys=
True, ensure_ascii=
True)
1323 if args.dump_config_exit:
1324 log.info(
"Configuration dumped. Exiting...")
1328 log.info(
"Starting Athena execution...")
1333 worker_dir = os.path.join(os.getcwd(),
"athenaHLT_workers",
"athenaHLT-01")
1334 if not os.path.exists(worker_dir):
1335 log.info(
"Creating worker directory: %s", worker_dir)
1336 os.makedirs(worker_dir, exist_ok=
True)
1338 if args.interactive:
1339 log.info(
"Interactive mode - call acc.run() to execute")
1341 code.interact(local={
'acc': acc,
'flags': flags})
1344 from AthenaCommon
import ExitCodes
1348 sc = acc.run(args.number_of_events)
1350 exitcode = ExitCodes.EXE_ALG_FAILURE
1351 except SystemExit
as e:
1352 exitcode = ExitCodes.EXE_ALG_FAILURE
if e.code == 1
else e.code
1354 traceback.print_exc()
1355 exitcode = ExitCodes.UNKNOWN_EXCEPTION
1357 log.info(
'Leaving with code %d: "%s"', exitcode, ExitCodes.what(exitcode))