973 parser = argparse.ArgumentParser(prog='athenaEF.py', formatter_class=
974 lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
975 usage = '%(prog)s [OPTION]... -f FILE jobOptions',
976 add_help=False)
977 parser.expert_groups = []
978
979
980 g = parser.add_argument_group('Options')
981 g.add_argument('jobOptions', nargs='?', help='job options: CA module (package.module:function), pickle file (.pkl), or JSON file (.json)')
982 g.add_argument('--threads', metavar='N', type=int, default=1, help='number of threads')
983 g.add_argument('--concurrent-events', metavar='N', type=int, help='number of concurrent events if different from --threads')
984 g.add_argument('--log-level', '-l', metavar='LVL', type=arg_log_level, default='INFO,ERROR', help='OutputLevel of athena,POOL')
985 g.add_argument('--precommand', '-c', metavar='CMD', action='append', default=[],
986 help='Python commands executed before job options')
987 g.add_argument('--postcommand', '-C', metavar='CMD', action='append', default=[],
988 help='Python commands executed after job options')
989 g.add_argument('--interactive', '-i', action='store_true', help='interactive mode')
990 g.add_argument('--help', '-h', nargs='?', choices=['all'], action=MyHelp, help='show help')
991
992 g = parser.add_argument_group('Input/Output')
993 g.add_argument('--file', '--filesInput', '-f', action='append', help='input RAW file')
994 g.add_argument('--save-output', '-o', metavar='FILE', help='output file name')
995 g.add_argument('--number-of-events', '--evtMax', '-n', metavar='N', type=int, default=-1, help='processes N events (default: -1, means all)')
996 g.add_argument('--skip-events', '--skipEvents', '-k', metavar='N', type=int, default=0, help='skip N first events')
997 g.add_argument('--loop-files', action='store_true', help='loop over input files if no more events')
998
999
1000 g = parser.add_argument_group('Performance and debugging')
1001 g.add_argument('--perfmon', action='store_true', help='enable PerfMon')
1002 g.add_argument('--tcmalloc', action='store_true', default=True, help='use tcmalloc')
1003 g.add_argument('--stdcmalloc', action='store_true', help='use stdcmalloc')
1004 g.add_argument('--stdcmath', action='store_true', help='use stdcmath library')
1005 g.add_argument('--imf', action='store_true', default=True, help='use Intel math library')
1006 g.add_argument('--show-includes', '-s', action='store_true', help='show printout of included files')
1007
1008
1009 g = parser.add_argument_group('Conditions')
1010 g.add_argument('--run-number', '-R', metavar='RUN', type=int,
1011 help='run number (if None, read from first event)')
1012 g.add_argument('--lb-number', '-L', metavar='LBN', type=int,
1013 help='lumiblock number (if None, read from first event)')
1014 g.add_argument('--conditions-run', metavar='RUN', type=int, default=None,
1015 help='reference run number for conditions lookup (use when IS run number has no COOL data)')
1016 g.add_argument('--sor-time', type=arg_sor_time,
1017 help='The Start Of Run time. Three formats are accepted: '
1018 '1) the string "now", for current time; '
1019 '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
1020 '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from the conditions DB')
1021 g.add_argument('--detector-mask', metavar='MASK', type=arg_detector_mask,
1022 help='detector mask (if None, read from the conditions DB), use string "all" to enable all detectors')
1023
1024
1025 g = parser.add_argument_group('Database')
1026 g.add_argument('--use-database', '-b', action='store_true',
1027 help='configure from trigger database using SMK')
1028 g.add_argument('--db-server', metavar='DB', default='TRIGGERDB_RUN3', help='DB server name (alias)')
1029 g.add_argument('--smk', type=int, default=None, help='Super Master Key')
1030 g.add_argument('--l1psk', type=int, default=None, help='L1 prescale key')
1031 g.add_argument('--hltpsk', type=int, default=None, help='HLT prescale key')
1032 g.add_argument('--use-crest', action='store_true', default=False,
1033 help='Use CREST for trigger configuration')
1034 g.add_argument('--crest-server', metavar='URL', default=None,
1035 help='CREST server URL (defaults to flags.Trigger.crestServer)')
1036 g.add_argument('--dump-config', action='store_true', help='Dump joboptions JSON file')
1037 g.add_argument('--dump-config-exit', action='store_true', help='Dump joboptions JSON file and exit')
1038
1039
1040 g = parser.add_argument_group('Magnets')
1041 g.add_argument('--solenoid-current', type=float, default=None,
1042 help='Solenoid current in Amperes (default: nominal current for offline running, required from IS online)')
1043 g.add_argument('--toroids-current', type=float, default=None,
1044 help='Toroids current in Amperes (default: nominal current for offline running, required from IS online)')
1045
1046
1047 g = parser.add_argument_group('Online')
1048 g.add_argument('--online-environment', action='store_true',
1049 help='Enable online environment: read run parameters from IS and trigger '
1050 'configuration keys (SMK, L1PSK, HLTPSK) from OKS via WEBDAQ REST API')
1051 g.add_argument('--partition', metavar='NAME', default=None,
1052 help='TDAQ partition name (defaults to TDAQ_PARTITION environment variable)')
1053 g.add_argument('--webdaq-base', metavar='URL', default=None,
1054 help='WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE environment variable)')
1055
1056
1057 g = parser.add_argument_group('Online Histogramming')
1058 g.add_argument('--oh-monitoring', '-M', action='store_true', default=False,
1059 help='enable online histogram publishing via WebdaqHistSvc')
1060
1061
1062 g = parser.add_argument_group('Expert')
1063 parser.expert_groups.append(g)
1064 (args, unparsed_args) = parser.parse_known_args()
1065 check_args(parser, args)
1066
1067
1068 from ROOT import gROOT
1069 gROOT.SetBatch()
1070
1071
1072 import AthenaCommon.Logging
1073 AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
1074 AthenaCommon.Logging.log.setFormat("%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
1075 if args.show_includes:
1076 from AthenaCommon.Include import include
1077 include.setShowIncludes( True )
1078
1079
1080 if not args.concurrent_events:
1081 args.concurrent_events = args.threads
1082
1083
1084 from TrigPSC import PscConfig
1085 from TrigPSC.PscDefaultFlags import defaultOnlineFlags
1086
1087
1088 flags = defaultOnlineFlags()
1089
1090
1091 if args.oh_monitoring:
1092 flags.Trigger.Online.useOnlineWebdaqHistSvc = True
1093 log.info("Enabled WebdaqHistSvc for online histogram publishing")
1094
1095
1096 log.info("Using CREST for trigger configuration: %s", args.use_crest)
1097 if args.use_crest:
1098 flags.Trigger.useCrest = True
1099 if args.crest_server:
1100 flags.Trigger.crestServer = args.crest_server
1101 else:
1102 args.crest_server = flags.Trigger.crestServer
1103
1104 update_run_params(args, flags)
1105
1106 if args.use_database:
1107
1108
1109
1110 PscConfig.forcePSK = (args.hltpsk is not None) or args.online_environment
1111
1112 update_trigconf_keys(args, flags)
1113
1114
1115 if not args.use_database and args.jobOptions and not args.jobOptions.endswith('.json'):
1116 PscConfig.unparsedArguments = unparsed_args
1117 for flag_arg in unparsed_args:
1118 flags.fillFromString(flag_arg)
1119
1120 PscConfig.interactive = args.interactive
1121 PscConfig.exitAfterDump = args.dump_config_exit
1122
1123
1124
1125
1126
1127
1128
1129 if args.conditions_run is not None:
1130 log.info("Using conditions from reference run %d (overriding run %s for IOV lookup)",
1131 args.conditions_run, args.run_number)
1132 flags.Input.ConditionsRunNumber = args.conditions_run
1133
1134
1135 if args.number_of_events > 0:
1136 flags.Exec.MaxEvents = args.number_of_events
1137
1138
1139 if args.skip_events > 0:
1140 flags.Exec.SkipEvents = args.skip_events
1141
1142
1143
1144
1145
1146 flags.PerfMon.doFastMonMT = args.perfmon
1147
1148
1149
1150 flags.Trigger.Online.useEFByteStreamSvc = True
1151 ef = flags.Trigger.Online.EFInterface
1152 ef_files = args.file if args.file else []
1153 ef.Files = ef_files
1154 ef.LoopFiles = args.loop_files
1155 ef.NumEvents = args.number_of_events
1156 ef.SkipEvents = args.skip_events
1157 ef.RunNumber = args.run_number
1158
1159
1160 if args.precommand:
1161 log.info("Executing precommand(s)")
1162 for cmd in args.precommand:
1163 log.info(" %s", cmd)
1164 exec(cmd, globals(), {'flags': flags})
1165
1166
1167 is_database = args.use_database
1168 is_pickle = False
1169 is_json = False
1170
1171 if not is_database and args.jobOptions:
1172 jobOptions = args.jobOptions
1173 is_pickle = jobOptions.endswith('.pkl')
1174 is_json = jobOptions.endswith('.json')
1175
1176 if is_database:
1177
1178
1179 if args.use_crest:
1180 crestconn = TriggerCrestUtil.getCrestConnection(args.db_server)
1181 db_alias = f"{args.crest_server}/{crestconn}"
1182 log.info("Loading configuration via CREST from %s with SMK %d", db_alias, args.smk)
1183 else:
1184 db_alias = args.db_server
1185 log.info("Loading configuration from database %s with SMK %d", db_alias, args.smk)
1186
1187
1188 run_params = get_run_params(args).to_dict()
1189 acc = load_from_database(db_alias, args.smk, args.l1psk, args.hltpsk, run_params,
1190 num_threads=args.threads, num_slots=args.concurrent_events,
1191 ef_files=ef_files)
1192 log.info("Configuration loaded from database")
1193
1194 elif is_pickle:
1195
1196 log.info("Loading configuration from pickle file: %s", jobOptions)
1197 with open(jobOptions, 'rb') as f:
1198 acc = pickle.load(f)
1199 log.info("Configuration loaded from pickle")
1200
1201 elif is_json:
1202
1203 log.info("Loading configuration from JSON file: %s", jobOptions)
1204
1205 run_params = get_run_params(args).to_dict()
1206 acc = load_from_json(jobOptions, run_params,
1207 num_threads=args.threads, num_slots=args.concurrent_events,
1208 ef_files=ef_files)
1209 log.info("Configuration loaded from JSON")
1210
1211 else:
1212
1213
1214
1215
1216
1217 log.info("Loading CA configuration from: %s", jobOptions)
1218
1219
1220 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
1221 from AthenaConfiguration.MainServicesConfig import addMainSequences
1222 from TrigServices.TriggerUnixStandardSetup import commonServicesCfg
1223 from AthenaConfiguration.ComponentFactory import CompFactory
1224
1225 locked_flags = flags.clone()
1226 locked_flags.lock()
1227
1228
1229 cfg = ComponentAccumulator(CompFactory.AthSequencer("AthMasterSeq", Sequential=True))
1230 cfg.setAppProperty('ExtSvcCreates', False)
1231 cfg.setAppProperty("MessageSvcType", "TrigMessageSvc")
1232 cfg.setAppProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
1233
1234
1235 addMainSequences(locked_flags, cfg)
1236 cfg.merge(commonServicesCfg(locked_flags))
1237
1238
1239 cfg_func = AthHLT.getCACfg(jobOptions)
1240 cfg.merge(cfg_func(flags))
1241
1242
1243 if args.postcommand:
1244 log.info("Executing postcommand(s)")
1245 for cmd in args.postcommand:
1246 log.info(" %s", cmd)
1247 exec(cmd, globals(), {'flags': flags, 'cfg': cfg})
1248 args.postcommand = []
1249
1250
1251 fname = "HLTJobOptions_EF"
1252 log.info("Dumping configuration to %s.pkl and %s.json", fname, fname)
1253 with open(f"{fname}.pkl", "wb") as f:
1254 cfg.store(f)
1255
1256 from TrigConfIO.JsonUtils import create_joboptions_json
1257 create_joboptions_json(f"{fname}.pkl", f"{fname}.json")
1258
1259
1260 if args.dump_config_exit:
1261 log.info("Configuration dumped to %s.json. Exiting...", fname)
1262 sys.exit(0)
1263
1264
1265 log.info("Loading configuration from %s.json via TrigConf::JobOptionsSvc", fname)
1266
1267 run_params = get_run_params(args).to_dict()
1268 acc = load_from_json(f"{fname}.json", run_params,
1269 num_threads=args.threads, num_slots=args.concurrent_events,
1270 ef_files=ef_files)
1271
1272 log.info("Configuration loaded with HLT online services")
1273
1274
1275 if args.postcommand:
1276 log.info("Executing postcommand(s)")
1277 for cmd in args.postcommand:
1278 log.info(" %s", cmd)
1279 exec(cmd, globals(), {'flags': flags, 'acc': acc})
1280
1281
1282 if args.dump_config or args.dump_config_exit:
1283 fname = "HLTJobOptions_EF"
1284
1285 if is_database:
1286
1287 from TrigConfIO.HLTTriggerConfigAccess import HLTJobOptionsAccess
1288 log.info("Fetching configuration from database for dump...")
1289 jo_access = HLTJobOptionsAccess(dbalias=acc.db_server, smkey=acc.smk)
1290 props = jo_access.algorithms()
1291
1292 log.info("Dumping configuration to %s.json", fname)
1293 hlt_json = {'filetype': 'joboptions', 'properties': props}
1294 with open(f"{fname}.json", "w") as f:
1295 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1296
1297 elif is_json:
1298
1299 props = acc.properties
1300 if props:
1301 log.info("Dumping configuration to %s.json", fname)
1302 hlt_json = {'filetype': 'joboptions', 'properties': props}
1303 with open(f"{fname}.json", "w") as f:
1304 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1305 else:
1306 log.warning("No properties available to dump")
1307
1308 elif is_pickle:
1309
1310 app_props, msg_props, comp_props = acc.gatherProps()
1311 props = {"ApplicationMgr": app_props, "MessageSvc": msg_props}
1312 for comp, name, value in comp_props:
1313 props.setdefault(comp, {})[name] = value
1314
1315 log.info("Dumping configuration to %s.json", fname)
1316 hlt_json = {'filetype': 'joboptions', 'properties': props}
1317 with open(f"{fname}.json", "w") as f:
1318 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1319
1320
1321
1322
1323 if args.dump_config_exit:
1324 log.info("Configuration dumped. Exiting...")
1325 sys.exit(0)
1326
1327
1328 log.info("Starting Athena execution...")
1329
1330
1331
1332
1333 worker_dir = os.path.join(os.getcwd(), "athenaHLT_workers", "athenaHLT-01")
1334 if not os.path.exists(worker_dir):
1335 log.info("Creating worker directory: %s", worker_dir)
1336 os.makedirs(worker_dir, exist_ok=True)
1337
1338 if args.interactive:
1339 log.info("Interactive mode - call acc.run() to execute")
1340 import code
1341 code.interact(local={'acc': acc, 'flags': flags})
1342 else:
1343
1344 from AthenaCommon import ExitCodes
1345 exitcode = 0
1346 try:
1347
1348 sc = acc.run(args.number_of_events)
1349 if sc.isFailure():
1350 exitcode = ExitCodes.EXE_ALG_FAILURE
1351 except SystemExit as e:
1352 exitcode = ExitCodes.EXE_ALG_FAILURE if e.code == 1 else e.code
1353 except Exception:
1354 traceback.print_exc()
1355 exitcode = ExitCodes.UNKNOWN_EXCEPTION
1356
1357 log.info('Leaving with code %d: "%s"', exitcode, ExitCodes.what(exitcode))
1358 sys.exit(exitcode)
1359
1360