1012 parser = argparse.ArgumentParser(prog='athenaEF.py', formatter_class=
1013 lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
1014 usage = '%(prog)s [OPTION]... -f FILE jobOptions',
1015 add_help=False)
1016 parser.expert_groups = []
1017
1018
1019 g = parser.add_argument_group('Options')
1020 g.add_argument('jobOptions', nargs='?', help='job options: CA module (package.module:function), pickle file (.pkl), or JSON file (.json)')
1021 g.add_argument('--threads', metavar='N', type=int, default=1, help='number of threads')
1022 g.add_argument('--concurrent-events', metavar='N', type=int, help='number of concurrent events if different from --threads')
1023 g.add_argument('--log-level', '-l', metavar='LVL', type=arg_log_level, default='INFO,ERROR', help='OutputLevel of athena,POOL')
1024 g.add_argument('--precommand', '-c', metavar='CMD', action='append', default=[],
1025 help='Python commands executed before job options')
1026 g.add_argument('--postcommand', '-C', metavar='CMD', action='append', default=[],
1027 help='Python commands executed after job options')
1028 g.add_argument('--interactive', '-i', action='store_true', help='interactive mode')
1029 g.add_argument('--help', '-h', nargs='?', choices=['all'], action=MyHelp, help='show help')
1030
1031 g = parser.add_argument_group('Input/Output')
1032 g.add_argument('--file', '--filesInput', '-f', action='append', help='input RAW file')
1033 g.add_argument('--save-output', '-o', metavar='FILE', help='output file name')
1034 g.add_argument('--number-of-events', '--evtMax', '-n', metavar='N', type=int, default=-1, help='processes N events (default: -1, means all)')
1035 g.add_argument('--skip-events', '--skipEvents', '-k', metavar='N', type=int, default=0, help='skip N first events')
1036 g.add_argument('--loop-files', action='store_true', help='loop over input files if no more events')
1037 g.add_argument('--efdf-interface-library', metavar='LIB', default='TrigDFEmulator',
1038 help='name of the EFDF interface shared library to load')
1039
1040
1041 g = parser.add_argument_group('Performance and debugging')
1042 g.add_argument('--perfmon', action='store_true', help='enable PerfMon')
1043 g.add_argument('--tcmalloc', action='store_true', default=True, help='use tcmalloc')
1044 g.add_argument('--stdcmalloc', action='store_true', help='use stdcmalloc')
1045 g.add_argument('--stdcmath', action='store_true', help='use stdcmath library')
1046 g.add_argument('--imf', action='store_true', default=True, help='use Intel math library')
1047 g.add_argument('--show-includes', '-s', action='store_true', help='show printout of included files')
1048
1049
1050 g = parser.add_argument_group('Conditions')
1051 g.add_argument('--run-number', '-R', metavar='RUN', type=int,
1052 help='run number (if None, read from first event)')
1053 g.add_argument('--lb-number', '-L', metavar='LBN', type=int,
1054 help='lumiblock number (if None, read from first event)')
1055 g.add_argument('--conditions-run', metavar='RUN', type=int, default=None,
1056 help='reference run number for conditions lookup (use when IS run number has no COOL data)')
1057 g.add_argument('--sor-time', type=arg_sor_time,
1058 help='The Start Of Run time. Three formats are accepted: '
1059 '1) the string "now", for current time; '
1060 '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
1061 '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from the conditions DB')
1062 g.add_argument('--detector-mask', metavar='MASK', type=arg_detector_mask,
1063 help='detector mask (if None, read from the conditions DB), use string "all" to enable all detectors')
1064
1065
1066 g = parser.add_argument_group('Database')
1067 g.add_argument('--use-database', '-b', action='store_true',
1068 help='configure from trigger database using SMK')
1069 g.add_argument('--db-server', metavar='DB', default='TRIGGERDB_RUN3', help='DB server name (alias)')
1070 g.add_argument('--smk', type=int, default=None, help='Super Master Key')
1071 g.add_argument('--l1psk', type=int, default=None, help='L1 prescale key')
1072 g.add_argument('--hltpsk', type=int, default=None, help='HLT prescale key')
1073 g.add_argument('--use-crest', action='store_true', default=False,
1074 help='Use CREST for trigger configuration')
1075 g.add_argument('--crest-server', metavar='URL', default=None,
1076 help='CREST server URL (defaults to flags.Trigger.crestServer)')
1077 g.add_argument('--dump-config', action='store_true', help='Dump joboptions JSON file')
1078 g.add_argument('--dump-config-exit', action='store_true', help='Dump joboptions JSON file and exit')
1079
1080
1081 g = parser.add_argument_group('Magnets')
1082 g.add_argument('--solenoid-current', type=float, default=None,
1083 help='Solenoid current in Amperes (default: nominal current for offline running, required from IS online)')
1084 g.add_argument('--toroids-current', type=float, default=None,
1085 help='Toroids current in Amperes (default: nominal current for offline running, required from IS online)')
1086
1087
1088 g = parser.add_argument_group('Online')
1089 g.add_argument('--online-environment', action='store_true',
1090 help='Enable online environment: read run parameters from IS and trigger '
1091 'configuration keys (SMK, L1PSK, HLTPSK) from OKS via WEBDAQ REST API')
1092 g.add_argument('--partition', metavar='NAME', default=None,
1093 help='TDAQ partition name (defaults to TDAQ_PARTITION environment variable)')
1094 g.add_argument('--webdaq-base', metavar='URL', default=None,
1095 help='WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE environment variable)')
1096
1097
1098 g = parser.add_argument_group('Online Histogramming')
1099 g.add_argument('--oh-monitoring', '-M', action='store_true', default=False,
1100 help='enable online histogram publishing via WebdaqHistSvc')
1101
1102
1103 g = parser.add_argument_group('Expert')
1104 parser.expert_groups.append(g)
1105 (args, unparsed_args) = parser.parse_known_args()
1106 check_args(parser, args)
1107
1108
1109 from PyUtils.Helpers import ROOTSetup
1110 ROOTSetup(batch=True)
1111
1112
1113 import ROOT
1114 ROOT.ROOT.EnableThreadSafety()
1115
1116
1117 import AthenaCommon.Logging
1118 AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
1119 AthenaCommon.Logging.log.setFormat("%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
1120 if args.show_includes:
1121 from AthenaCommon.Include import include
1122 include.setShowIncludes( True )
1123
1124
1125 if not args.concurrent_events:
1126 args.concurrent_events = args.threads
1127
1128
1129 from TrigPSC import PscConfig
1130 from TrigPSC.PscDefaultFlags import defaultOnlineFlags
1131
1132
1133 flags = defaultOnlineFlags()
1134
1135
1136 if args.oh_monitoring:
1137 flags.Trigger.Online.useOnlineWebdaqHistSvc = True
1138 log.info("Enabled WebdaqHistSvc for online histogram publishing")
1139
1140
1141 log.info("Using CREST for trigger configuration: %s", args.use_crest)
1142 if args.use_crest:
1143 flags.Trigger.useCrest = True
1144 if args.crest_server:
1145 flags.Trigger.crestServer = args.crest_server
1146 else:
1147 args.crest_server = flags.Trigger.crestServer
1148
1149 update_run_params(args, flags)
1150
1151 if args.use_database:
1152
1153
1154
1155 PscConfig.forcePSK = (args.hltpsk is not None) or args.online_environment
1156
1157 update_trigconf_keys(args, flags)
1158
1159
1160 if not args.use_database and args.jobOptions and not args.jobOptions.endswith('.json'):
1161 PscConfig.unparsedArguments = unparsed_args
1162 for flag_arg in unparsed_args:
1163 flags.fillFromString(flag_arg)
1164
1165 PscConfig.interactive = args.interactive
1166 PscConfig.exitAfterDump = args.dump_config_exit
1167
1168
1169
1170
1171
1172
1173
1174 if args.conditions_run is not None:
1175 log.info("Using conditions from reference run %d (overriding run %s for IOV lookup)",
1176 args.conditions_run, args.run_number)
1177 flags.Input.ConditionsRunNumber = args.conditions_run
1178
1179
1180 if args.number_of_events > 0:
1181 flags.Exec.MaxEvents = args.number_of_events
1182
1183
1184 if args.skip_events > 0:
1185 flags.Exec.SkipEvents = args.skip_events
1186
1187
1188
1189
1190
1191 flags.PerfMon.doFastMonMT = args.perfmon
1192
1193
1194
1195 flags.Trigger.Online.useEFByteStreamSvc = True
1196 ef = flags.Trigger.Online.EFInterface
1197 ef_files = args.file if args.file else []
1198 ef.Files = ef_files
1199 ef.OutputFileName = f"athenaEF_{args.save_output}" if args.save_output else ""
1200 ef.LoopFiles = args.loop_files
1201 ef.NumEvents = args.number_of_events
1202 ef.SkipEvents = args.skip_events
1203 ef.RunNumber = args.run_number
1204 ef.T0ProjectTag = args.T0_project_tag
1205 ef.BeamType = args.beam_type
1206 ef.BeamEnergy = args.beam_energy
1207 ef.TriggerType = args.trigger_type
1208 ef.Stream = args.stream
1209 ef.Lumiblock = args.lumiblock
1210 ef.DetMask = args.file_detector_mask
1211 ef.LibraryName = args.efdf_interface_library
1212
1213
1214 if args.precommand:
1215 log.info("Executing precommand(s)")
1216 for cmd in args.precommand:
1217 log.info(" %s", cmd)
1218 exec(cmd, globals(), {'flags': flags})
1219
1220
1221 is_database = args.use_database
1222 is_pickle = False
1223 is_json = False
1224
1225 if not is_database and args.jobOptions:
1226 jobOptions = args.jobOptions
1227 is_pickle = jobOptions.endswith('.pkl')
1228 is_json = jobOptions.endswith('.json')
1229
1230 if is_database:
1231
1232
1233 if args.use_crest:
1234 crestconn = TriggerCrestUtil.getCrestConnection(args.db_server)
1235 db_alias = f"{args.crest_server}/{crestconn}"
1236 log.info("Loading configuration via CREST from %s with SMK %d", db_alias, args.smk)
1237 else:
1238 db_alias = args.db_server
1239 log.info("Loading configuration from database %s with SMK %d", db_alias, args.smk)
1240
1241
1242 run_params = get_run_params(args).to_dict()
1243 acc = load_from_database(db_alias, args.smk, args.l1psk, args.hltpsk, run_params,
1244 num_threads=args.threads, num_slots=args.concurrent_events,
1245 ef_files=ef_files)
1246 log.info("Configuration loaded from database")
1247
1248 elif is_pickle:
1249
1250 log.info("Loading configuration from pickle file: %s", jobOptions)
1251 with open(jobOptions, 'rb') as f:
1252 acc = pickle.load(f)
1253 log.info("Configuration loaded from pickle")
1254
1255 elif is_json:
1256
1257 log.info("Loading configuration from JSON file: %s", jobOptions)
1258
1259 run_params = get_run_params(args).to_dict()
1260 acc = load_from_json(jobOptions, run_params,
1261 num_threads=args.threads, num_slots=args.concurrent_events,
1262 ef_files=ef_files)
1263 log.info("Configuration loaded from JSON")
1264
1265 else:
1266
1267
1268
1269
1270
1271 log.info("Loading CA configuration from: %s", jobOptions)
1272
1273
1274 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
1275 from AthenaConfiguration.MainServicesConfig import addMainSequences
1276 from TrigServices.TriggerUnixStandardSetup import commonServicesCfg
1277 from AthenaConfiguration.ComponentFactory import CompFactory
1278
1279 locked_flags = flags.clone()
1280 locked_flags.lock()
1281
1282
1283 cfg = ComponentAccumulator(CompFactory.AthSequencer("AthMasterSeq", Sequential=True))
1284 cfg.setAppProperty('ExtSvcCreates', False)
1285 cfg.setAppProperty("MessageSvcType", "TrigMessageSvc")
1286 cfg.setAppProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
1287
1288
1289 addMainSequences(locked_flags, cfg)
1290 cfg.merge(commonServicesCfg(locked_flags))
1291
1292
1293 cfg_func = AthHLT.getCACfg(jobOptions)
1294 cfg.merge(cfg_func(flags))
1295
1296
1297 if args.postcommand:
1298 log.info("Executing postcommand(s)")
1299 for cmd in args.postcommand:
1300 log.info(" %s", cmd)
1301 exec(cmd, globals(), {'flags': flags, 'cfg': cfg})
1302 args.postcommand = []
1303
1304
1305 fname = "HLTJobOptions"
1306 log.info("Dumping configuration to %s.pkl and %s.json", fname, fname)
1307 with open(f"{fname}.pkl", "wb") as f:
1308 cfg.store(f)
1309
1310 from TrigConfIO.JsonUtils import create_joboptions_json
1311 create_joboptions_json(f"{fname}.pkl", f"{fname}.json")
1312
1313
1314 if args.dump_config_exit:
1315 log.info("Configuration dumped to %s.json. Exiting...", fname)
1316 sys.exit(0)
1317
1318
1319 log.info("Loading configuration from %s.json via TrigConf::JobOptionsSvc", fname)
1320
1321 run_params = get_run_params(args).to_dict()
1322 acc = load_from_json(f"{fname}.json", run_params,
1323 num_threads=args.threads, num_slots=args.concurrent_events,
1324 ef_files=ef_files)
1325
1326 log.info("Configuration loaded with HLT online services")
1327
1328
1329 if args.postcommand:
1330 log.info("Executing postcommand(s)")
1331 for cmd in args.postcommand:
1332 log.info(" %s", cmd)
1333 exec(cmd, globals(), {'flags': flags, 'acc': acc})
1334
1335
1336 if args.dump_config or args.dump_config_exit:
1337 fname = "HLTJobOptions"
1338
1339 if is_database:
1340
1341 from TrigConfIO.HLTTriggerConfigAccess import HLTJobOptionsAccess
1342 log.info("Fetching configuration from database for dump...")
1343 jo_access = HLTJobOptionsAccess(dbalias=acc.db_server, smkey=acc.smk)
1344 props = jo_access.algorithms()
1345
1346 log.info("Dumping configuration to %s.json", fname)
1347 hlt_json = {'filetype': 'joboptions', 'properties': props}
1348 with open(f"{fname}.json", "w") as f:
1349 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1350
1351 elif is_json:
1352
1353 props = acc.properties
1354 if props:
1355 log.info("Dumping configuration to %s.json", fname)
1356 hlt_json = {'filetype': 'joboptions', 'properties': props}
1357 with open(f"{fname}.json", "w") as f:
1358 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1359 else:
1360 log.warning("No properties available to dump")
1361
1362 elif is_pickle:
1363
1364 app_props, msg_props, comp_props = acc.gatherProps()
1365 props = {"ApplicationMgr": app_props, "MessageSvc": msg_props}
1366 for comp, name, value in comp_props:
1367 props.setdefault(comp, {})[name] = value
1368
1369 log.info("Dumping configuration to %s.json", fname)
1370 hlt_json = {'filetype': 'joboptions', 'properties': props}
1371 with open(f"{fname}.json", "w") as f:
1372 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1373
1374
1375
1376
1377 if args.dump_config_exit:
1378 log.info("Configuration dumped. Exiting...")
1379 sys.exit(0)
1380
1381
1382 log.info("Starting Athena execution...")
1383
1384
1385
1386
1387 worker_dir = os.path.join(os.getcwd(), "athenaHLT_workers", "athenaHLT-01")
1388 if not os.path.exists(worker_dir):
1389 log.info("Creating worker directory: %s", worker_dir)
1390 os.makedirs(worker_dir, exist_ok=True)
1391
1392 if args.interactive:
1393 log.info("Interactive mode - call acc.run() to execute")
1394 import code
1395 code.interact(local={'acc': acc, 'flags': flags})
1396 else:
1397
1398 from AthenaCommon import ExitCodes
1399 exitcode = 0
1400 try:
1401
1402 sc = acc.run(args.number_of_events)
1403 if sc.isFailure():
1404 exitcode = ExitCodes.EXE_ALG_FAILURE
1405 except SystemExit as e:
1406 exitcode = ExitCodes.EXE_ALG_FAILURE if e.code == 1 else e.code
1407 except Exception:
1408 traceback.print_exc()
1409 exitcode = ExitCodes.UNKNOWN_EXCEPTION
1410
1411 log.info('Leaving with code %d: "%s"', exitcode, ExitCodes.what(exitcode))
1412 sys.exit(exitcode)
1413
1414