1009 parser = argparse.ArgumentParser(prog='athenaEF.py', formatter_class=
1010 lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
1011 usage = '%(prog)s [OPTION]... -f FILE jobOptions',
1012 add_help=False)
1013 parser.expert_groups = []
1014
1015
1016 g = parser.add_argument_group('Options')
1017 g.add_argument('jobOptions', nargs='?', help='job options: CA module (package.module:function), pickle file (.pkl), or JSON file (.json)')
1018 g.add_argument('--threads', metavar='N', type=int, default=1, help='number of threads')
1019 g.add_argument('--concurrent-events', metavar='N', type=int, help='number of concurrent events if different from --threads')
1020 g.add_argument('--log-level', '-l', metavar='LVL', type=arg_log_level, default='INFO,ERROR', help='OutputLevel of athena,POOL')
1021 g.add_argument('--precommand', '-c', metavar='CMD', action='append', default=[],
1022 help='Python commands executed before job options')
1023 g.add_argument('--postcommand', '-C', metavar='CMD', action='append', default=[],
1024 help='Python commands executed after job options')
1025 g.add_argument('--interactive', '-i', action='store_true', help='interactive mode')
1026 g.add_argument('--help', '-h', nargs='?', choices=['all'], action=MyHelp, help='show help')
1027
1028 g = parser.add_argument_group('Input/Output')
1029 g.add_argument('--file', '--filesInput', '-f', action='append', help='input RAW file')
1030 g.add_argument('--save-output', '-o', metavar='FILE', help='output file name')
1031 g.add_argument('--number-of-events', '--evtMax', '-n', metavar='N', type=int, default=-1, help='processes N events (default: -1, means all)')
1032 g.add_argument('--skip-events', '--skipEvents', '-k', metavar='N', type=int, default=0, help='skip N first events')
1033 g.add_argument('--loop-files', action='store_true', help='loop over input files if no more events')
1034 g.add_argument('--efdf-interface-library', metavar='LIB', default='TrigDFEmulator',
1035 help='name of the EFDF interface shared library to load')
1036
1037
1038 g = parser.add_argument_group('Performance and debugging')
1039 g.add_argument('--perfmon', action='store_true', help='enable PerfMon')
1040 g.add_argument('--tcmalloc', action='store_true', default=True, help='use tcmalloc')
1041 g.add_argument('--stdcmalloc', action='store_true', help='use stdcmalloc')
1042 g.add_argument('--stdcmath', action='store_true', help='use stdcmath library')
1043 g.add_argument('--imf', action='store_true', default=True, help='use Intel math library')
1044 g.add_argument('--show-includes', '-s', action='store_true', help='show printout of included files')
1045
1046
1047 g = parser.add_argument_group('Conditions')
1048 g.add_argument('--run-number', '-R', metavar='RUN', type=int,
1049 help='run number (if None, read from first event)')
1050 g.add_argument('--lb-number', '-L', metavar='LBN', type=int,
1051 help='lumiblock number (if None, read from first event)')
1052 g.add_argument('--conditions-run', metavar='RUN', type=int, default=None,
1053 help='reference run number for conditions lookup (use when IS run number has no COOL data)')
1054 g.add_argument('--sor-time', type=arg_sor_time,
1055 help='The Start Of Run time. Three formats are accepted: '
1056 '1) the string "now", for current time; '
1057 '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
1058 '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from the conditions DB')
1059 g.add_argument('--detector-mask', metavar='MASK', type=arg_detector_mask,
1060 help='detector mask (if None, read from the conditions DB), use string "all" to enable all detectors')
1061
1062
1063 g = parser.add_argument_group('Database')
1064 g.add_argument('--use-database', '-b', action='store_true',
1065 help='configure from trigger database using SMK')
1066 g.add_argument('--db-server', metavar='DB', default='TRIGGERDB_RUN3', help='DB server name (alias)')
1067 g.add_argument('--smk', type=int, default=None, help='Super Master Key')
1068 g.add_argument('--l1psk', type=int, default=None, help='L1 prescale key')
1069 g.add_argument('--hltpsk', type=int, default=None, help='HLT prescale key')
1070 g.add_argument('--use-crest', action='store_true', default=False,
1071 help='Use CREST for trigger configuration')
1072 g.add_argument('--crest-server', metavar='URL', default=None,
1073 help='CREST server URL (defaults to flags.Trigger.crestServer)')
1074 g.add_argument('--dump-config', action='store_true', help='Dump joboptions JSON file')
1075 g.add_argument('--dump-config-exit', action='store_true', help='Dump joboptions JSON file and exit')
1076
1077
1078 g = parser.add_argument_group('Magnets')
1079 g.add_argument('--solenoid-current', type=float, default=None,
1080 help='Solenoid current in Amperes (default: nominal current for offline running, required from IS online)')
1081 g.add_argument('--toroids-current', type=float, default=None,
1082 help='Toroids current in Amperes (default: nominal current for offline running, required from IS online)')
1083
1084
1085 g = parser.add_argument_group('Online')
1086 g.add_argument('--online-environment', action='store_true',
1087 help='Enable online environment: read run parameters from IS and trigger '
1088 'configuration keys (SMK, L1PSK, HLTPSK) from OKS via WEBDAQ REST API')
1089 g.add_argument('--partition', metavar='NAME', default=None,
1090 help='TDAQ partition name (defaults to TDAQ_PARTITION environment variable)')
1091 g.add_argument('--webdaq-base', metavar='URL', default=None,
1092 help='WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE environment variable)')
1093
1094
1095 g = parser.add_argument_group('Online Histogramming')
1096 g.add_argument('--oh-monitoring', '-M', action='store_true', default=False,
1097 help='enable online histogram publishing via WebdaqHistSvc')
1098
1099
1100 g = parser.add_argument_group('Expert')
1101 parser.expert_groups.append(g)
1102 (args, unparsed_args) = parser.parse_known_args()
1103 check_args(parser, args)
1104
1105
1106 from ROOT import gROOT
1107 gROOT.SetBatch()
1108
1109
1110 import AthenaCommon.Logging
1111 AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
1112 AthenaCommon.Logging.log.setFormat("%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
1113 if args.show_includes:
1114 from AthenaCommon.Include import include
1115 include.setShowIncludes( True )
1116
1117
1118 if not args.concurrent_events:
1119 args.concurrent_events = args.threads
1120
1121
1122 from TrigPSC import PscConfig
1123 from TrigPSC.PscDefaultFlags import defaultOnlineFlags
1124
1125
1126 flags = defaultOnlineFlags()
1127
1128
1129 if args.oh_monitoring:
1130 flags.Trigger.Online.useOnlineWebdaqHistSvc = True
1131 log.info("Enabled WebdaqHistSvc for online histogram publishing")
1132
1133
1134 log.info("Using CREST for trigger configuration: %s", args.use_crest)
1135 if args.use_crest:
1136 flags.Trigger.useCrest = True
1137 if args.crest_server:
1138 flags.Trigger.crestServer = args.crest_server
1139 else:
1140 args.crest_server = flags.Trigger.crestServer
1141
1142 update_run_params(args, flags)
1143
1144 if args.use_database:
1145
1146
1147
1148 PscConfig.forcePSK = (args.hltpsk is not None) or args.online_environment
1149
1150 update_trigconf_keys(args, flags)
1151
1152
1153 if not args.use_database and args.jobOptions and not args.jobOptions.endswith('.json'):
1154 PscConfig.unparsedArguments = unparsed_args
1155 for flag_arg in unparsed_args:
1156 flags.fillFromString(flag_arg)
1157
1158 PscConfig.interactive = args.interactive
1159 PscConfig.exitAfterDump = args.dump_config_exit
1160
1161
1162
1163
1164
1165
1166
1167 if args.conditions_run is not None:
1168 log.info("Using conditions from reference run %d (overriding run %s for IOV lookup)",
1169 args.conditions_run, args.run_number)
1170 flags.Input.ConditionsRunNumber = args.conditions_run
1171
1172
1173 if args.number_of_events > 0:
1174 flags.Exec.MaxEvents = args.number_of_events
1175
1176
1177 if args.skip_events > 0:
1178 flags.Exec.SkipEvents = args.skip_events
1179
1180
1181
1182
1183
1184 flags.PerfMon.doFastMonMT = args.perfmon
1185
1186
1187
1188 flags.Trigger.Online.useEFByteStreamSvc = True
1189 ef = flags.Trigger.Online.EFInterface
1190 ef_files = args.file if args.file else []
1191 ef.Files = ef_files
1192 ef.LoopFiles = args.loop_files
1193 ef.NumEvents = args.number_of_events
1194 ef.SkipEvents = args.skip_events
1195 ef.RunNumber = args.run_number
1196 ef.T0ProjectTag = args.T0_project_tag
1197 ef.BeamType = args.beam_type
1198 ef.BeamEnergy = args.beam_energy
1199 ef.TriggerType = args.trigger_type
1200 ef.Stream = args.stream
1201 ef.Lumiblock = args.lumiblock
1202 ef.DetMask = args.file_detector_mask
1203 ef.LibraryName = args.efdf_interface_library
1204
1205
1206 if args.precommand:
1207 log.info("Executing precommand(s)")
1208 for cmd in args.precommand:
1209 log.info(" %s", cmd)
1210 exec(cmd, globals(), {'flags': flags})
1211
1212
1213 is_database = args.use_database
1214 is_pickle = False
1215 is_json = False
1216
1217 if not is_database and args.jobOptions:
1218 jobOptions = args.jobOptions
1219 is_pickle = jobOptions.endswith('.pkl')
1220 is_json = jobOptions.endswith('.json')
1221
1222 if is_database:
1223
1224
1225 if args.use_crest:
1226 crestconn = TriggerCrestUtil.getCrestConnection(args.db_server)
1227 db_alias = f"{args.crest_server}/{crestconn}"
1228 log.info("Loading configuration via CREST from %s with SMK %d", db_alias, args.smk)
1229 else:
1230 db_alias = args.db_server
1231 log.info("Loading configuration from database %s with SMK %d", db_alias, args.smk)
1232
1233
1234 run_params = get_run_params(args).to_dict()
1235 acc = load_from_database(db_alias, args.smk, args.l1psk, args.hltpsk, run_params,
1236 num_threads=args.threads, num_slots=args.concurrent_events,
1237 ef_files=ef_files)
1238 log.info("Configuration loaded from database")
1239
1240 elif is_pickle:
1241
1242 log.info("Loading configuration from pickle file: %s", jobOptions)
1243 with open(jobOptions, 'rb') as f:
1244 acc = pickle.load(f)
1245 log.info("Configuration loaded from pickle")
1246
1247 elif is_json:
1248
1249 log.info("Loading configuration from JSON file: %s", jobOptions)
1250
1251 run_params = get_run_params(args).to_dict()
1252 acc = load_from_json(jobOptions, run_params,
1253 num_threads=args.threads, num_slots=args.concurrent_events,
1254 ef_files=ef_files)
1255 log.info("Configuration loaded from JSON")
1256
1257 else:
1258
1259
1260
1261
1262
1263 log.info("Loading CA configuration from: %s", jobOptions)
1264
1265
1266 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
1267 from AthenaConfiguration.MainServicesConfig import addMainSequences
1268 from TrigServices.TriggerUnixStandardSetup import commonServicesCfg
1269 from AthenaConfiguration.ComponentFactory import CompFactory
1270
1271 locked_flags = flags.clone()
1272 locked_flags.lock()
1273
1274
1275 cfg = ComponentAccumulator(CompFactory.AthSequencer("AthMasterSeq", Sequential=True))
1276 cfg.setAppProperty('ExtSvcCreates', False)
1277 cfg.setAppProperty("MessageSvcType", "TrigMessageSvc")
1278 cfg.setAppProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
1279
1280
1281 addMainSequences(locked_flags, cfg)
1282 cfg.merge(commonServicesCfg(locked_flags))
1283
1284
1285 cfg_func = AthHLT.getCACfg(jobOptions)
1286 cfg.merge(cfg_func(flags))
1287
1288
1289 if args.postcommand:
1290 log.info("Executing postcommand(s)")
1291 for cmd in args.postcommand:
1292 log.info(" %s", cmd)
1293 exec(cmd, globals(), {'flags': flags, 'cfg': cfg})
1294 args.postcommand = []
1295
1296
1297 fname = "HLTJobOptions_EF"
1298 log.info("Dumping configuration to %s.pkl and %s.json", fname, fname)
1299 with open(f"{fname}.pkl", "wb") as f:
1300 cfg.store(f)
1301
1302 from TrigConfIO.JsonUtils import create_joboptions_json
1303 create_joboptions_json(f"{fname}.pkl", f"{fname}.json")
1304
1305
1306 if args.dump_config_exit:
1307 log.info("Configuration dumped to %s.json. Exiting...", fname)
1308 sys.exit(0)
1309
1310
1311 log.info("Loading configuration from %s.json via TrigConf::JobOptionsSvc", fname)
1312
1313 run_params = get_run_params(args).to_dict()
1314 acc = load_from_json(f"{fname}.json", run_params,
1315 num_threads=args.threads, num_slots=args.concurrent_events,
1316 ef_files=ef_files)
1317
1318 log.info("Configuration loaded with HLT online services")
1319
1320
1321 if args.postcommand:
1322 log.info("Executing postcommand(s)")
1323 for cmd in args.postcommand:
1324 log.info(" %s", cmd)
1325 exec(cmd, globals(), {'flags': flags, 'acc': acc})
1326
1327
1328 if args.dump_config or args.dump_config_exit:
1329 fname = "HLTJobOptions_EF"
1330
1331 if is_database:
1332
1333 from TrigConfIO.HLTTriggerConfigAccess import HLTJobOptionsAccess
1334 log.info("Fetching configuration from database for dump...")
1335 jo_access = HLTJobOptionsAccess(dbalias=acc.db_server, smkey=acc.smk)
1336 props = jo_access.algorithms()
1337
1338 log.info("Dumping configuration to %s.json", fname)
1339 hlt_json = {'filetype': 'joboptions', 'properties': props}
1340 with open(f"{fname}.json", "w") as f:
1341 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1342
1343 elif is_json:
1344
1345 props = acc.properties
1346 if props:
1347 log.info("Dumping configuration to %s.json", fname)
1348 hlt_json = {'filetype': 'joboptions', 'properties': props}
1349 with open(f"{fname}.json", "w") as f:
1350 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1351 else:
1352 log.warning("No properties available to dump")
1353
1354 elif is_pickle:
1355
1356 app_props, msg_props, comp_props = acc.gatherProps()
1357 props = {"ApplicationMgr": app_props, "MessageSvc": msg_props}
1358 for comp, name, value in comp_props:
1359 props.setdefault(comp, {})[name] = value
1360
1361 log.info("Dumping configuration to %s.json", fname)
1362 hlt_json = {'filetype': 'joboptions', 'properties': props}
1363 with open(f"{fname}.json", "w") as f:
1364 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1365
1366
1367
1368
1369 if args.dump_config_exit:
1370 log.info("Configuration dumped. Exiting...")
1371 sys.exit(0)
1372
1373
1374 log.info("Starting Athena execution...")
1375
1376
1377
1378
1379 worker_dir = os.path.join(os.getcwd(), "athenaHLT_workers", "athenaHLT-01")
1380 if not os.path.exists(worker_dir):
1381 log.info("Creating worker directory: %s", worker_dir)
1382 os.makedirs(worker_dir, exist_ok=True)
1383
1384 if args.interactive:
1385 log.info("Interactive mode - call acc.run() to execute")
1386 import code
1387 code.interact(local={'acc': acc, 'flags': flags})
1388 else:
1389
1390 from AthenaCommon import ExitCodes
1391 exitcode = 0
1392 try:
1393
1394 sc = acc.run(args.number_of_events)
1395 if sc.isFailure():
1396 exitcode = ExitCodes.EXE_ALG_FAILURE
1397 except SystemExit as e:
1398 exitcode = ExitCodes.EXE_ALG_FAILURE if e.code == 1 else e.code
1399 except Exception:
1400 traceback.print_exc()
1401 exitcode = ExitCodes.UNKNOWN_EXCEPTION
1402
1403 log.info('Leaving with code %d: "%s"', exitcode, ExitCodes.what(exitcode))
1404 sys.exit(exitcode)
1405
1406