ATLAS Offline Software
athenaHLT.py
Go to the documentation of this file.
1 #!/bin/sh
2 # -*- mode: python -*-
3 #
4 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
5 #
6 # This is a script that is born as shell to setup the preloading and then
7 # resurrected as python script for the actual athenaHLT.py application.
8 #
9 """date"
10 
11 # defaults
12 export USETCMALLOC=1
13 export USEIMF=1
14 
15 # parse command line arguments
16 for a in ${@}
17 do
18  case "$a" in
19  --stdcmalloc) USETCMALLOC=0;;
20  --tcmalloc) USETCMALLOC=1;;
21  --stdcmath) USEIMF=0;;
22  --imf) USEIMF=1;;
23  --preloadlib*) export ATHENA_ADD_PRELOAD=${a#*=};;
24  --no-ers-signal-handlers) export TDAQ_ERS_NO_SIGNAL_HANDLERS=1;;
25  esac
26 done
27 
28 # Do the actual preloading via LD_PRELOAD
29 source `which athena_preload.sh `
30 
31 # Now resurrect ourselves as python script
32 python_path=`which python`
33 "exec" "$python_path" "-tt" "$0" "$@";
34 
35 """
36 
37 import sys
38 import os
39 import argparse
40 import ast
41 import collections.abc
42 from datetime import datetime as dt
43 
44 # Use single-threaded oracle client library to avoid extra
45 # threads when forking (see ATR-21890, ATDBOPS-115)
46 os.environ["CORAL_ORA_NO_OCI_THREADED"] = "1"
47 
48 # Add possible paths for IS schema files to TDAQ_DB_PATH:
49 for p in reversed(os.environ.get("DATAPATH","").split(os.pathsep)):
50  if p.rstrip('/').endswith('/share'):
51  os.environ["TDAQ_DB_PATH"] = os.path.join(p,'schema') + os.pathsep + os.environ["TDAQ_DB_PATH"]
52 
53 from TrigCommon import AthHLT
54 from AthenaCommon.Logging import logging
55 log = logging.getLogger('athenaHLT')
56 
57 
60 def arg_sor_time(s):
61  """Convert possible SOR time arguments to an OWLTime compatible string"""
62  fmt = '%d/%m/%y %H:%M:%S.%f'
63  if s=='now': return dt.now().strftime(fmt)
64  elif s.isdigit(): return dt.fromtimestamp(float(s)/1e9).strftime(fmt)
65  else: return s
66 
68  """Convert detector mask to format expected by eformat"""
69  if s=='all':
70  return 'f'*32
71  dmask = hex(int(s,16)) # Normalize input to hex-string
72  dmask = dmask.lower().replace('0x', '').replace('l', '') # remove markers
73  return '0' * (32 - len(dmask)) + dmask # (pad with 0s)
74 
75 def arg_ros2rob(s):
76  """Handle an explicit dictionary or read it from a file"""
77  try:
78  with open(s) as f:
79  s = f.read() # If file exists, replace ros2rob with content of the file
80  print("Reading ros2rob from a file")
81  except IOError:
82  pass
83 
84  try:
85  ros2robdict = arg_eval(s)
86  if type(ros2robdict) is not dict:
87  raise(ValueError)
88  return ros2robdict
89  except Exception:
90  sys.stderr.write("ERROR! ros2rob cannot be evaluated as it is not a proper python dictionary: {}\n".format(s))
91  sys.stderr.write("Using empty ros2rob dictionary instead\n")
92  return {}
93 
95  """Argument handler for log levels"""
96  lvls = s.split(',')
97  if len(lvls)==1: lvls.append('ERROR')
98  return lvls
99 
100 def arg_eval(s):
101  """Argument handler for python types (list, dict, ...)"""
102  return ast.literal_eval(s)
103 
104 def check_args(parser, args):
105  """Consistency check of command line arguments"""
106 
107  if not args.jobOptions and not args.use_database:
108  parser.error("No job options file specified")
109 
110  # Due to missing per-worker dirs this is not supported (ATR-19462)
111  if args.perfmon and args.oh_monitoring and args.nprocs>1:
112  parser.error("--perfmon cannot be used with --oh-monitoring and --nprocs > 1")
113 
114  if not args.file and not args.dump_config_exit:
115  parser.error("--file is required unless using --dump-config-exit")
116 
117 def update_pcommands(args, cdict):
118  """Apply modifications to pre/postcommands"""
119 
120  if args.run_number is not None:
121  cdict['trigger']['precommand'].append('_run_number=%d' % args.run_number)
122  if args.lb_number is not None:
123  cdict['trigger']['precommand'].append('_lb_number=%d' % args.lb_number)
124 
126  """Update run parameters from file/COOL"""
127 
128  if (args.run_number and not args.lb_number) or (not args.run_number and args.lb_number):
129  log.error("Both or neither of the options -R (--run-number) and -L (--lb-number) have to be specified")
130 
131  if args.run_number is None and args.file:
132  from eformat import EventStorage
133  dr = EventStorage.pickDataReader(args.file[0])
134  args.run_number = dr.runNumber()
135  args.lb_number = dr.lumiblockNumber()
136 
137  sor_params = None
138  if (args.sor_time is None or args.detector_mask is None) and args.run_number is not None:
139  sor_params = AthHLT.get_sor_params(args.run_number)
140  log.debug('SOR parameters: %s', sor_params)
141  if sor_params is None:
142  log.error("Run %d does not exist. If you want to use this run-number specify "
143  "remaining run parameters, e.g.: --sor-time=now --detector-mask=all", args.run_number)
144  sys.exit(1)
145 
146  if args.sor_time is None and sor_params is not None:
147  args.sor_time = arg_sor_time(str(sor_params['SORTime']))
148 
149  if args.detector_mask is None and sor_params is not None:
150  dmask = sor_params['DetectorMask']
151  if args.run_number < AthHLT.CondDB._run2:
152  dmask = hex(dmask)
153  args.detector_mask = arg_detector_mask(dmask)
154 
156  """Update trigger configuration keys"""
157 
158  if args.smk is None or args.l1psk is None or args.hltpsk is None:
159  try:
160  log.info("Reading trigger configuration keys from COOL for run %s", args.run_number)
161  trigconf = AthHLT.get_trigconf_keys(args.run_number, args.lb_number)
162  if args.smk is None:
163  args.smk = trigconf['SMK']
164  if args.l1psk is None:
165  args.l1psk = trigconf['LVL1PSK']
166  if args.hltpsk is None:
167  args.hltpsk = trigconf['HLTPSK']
168  except KeyError:
169  log.error("Cannot read trigger configuration keys from COOL for run %d", args.run_number)
170  sys.exit(1)
171 
173  """Update nested dictionary (https://stackoverflow.com/q/3232943)"""
174  for k, v in u.items():
175  if isinstance(v, collections.abc.Mapping):
176  d[k] = update_nested_dict(d.get(k, {}), v)
177  else:
178  d[k] = v
179  return d
180 
181 def HLTMPPy_cfgdict(args):
182  """Create the configuration dictionary as expected by HLTMPPy as defined in
183  https://gitlab.cern.ch/atlas-tdaq-software/HLTMPPU/blob/master/python/HLTMPPy/runner.py"""
184 
185  cdict = {}
186  cdict['HLTMPPU'] = {
187  'application_name' : 'athenaHLT-%d' % os.getpid(), # unique name required to avoid interference
188  'extra_params' : ["dumpFDs=1", "dumpThreads=1"] if args.debug_fork else None,
189  'interactive' : args.interactive,
190  'log_root' : os.getcwd(),
191  'log_name' : ('' if args.unique_log_files else 'athenaHLT:'),
192  'module' : 'HLTMPPU',
193  'num_forks' : args.nprocs,
194  'num_threads' : args.threads,
195  'num_slots' : args.concurrent_events,
196  'partition_name' : args.partition,
197  'hard_timeout' : args.timeout,
198  'soft_timeout_fraction' : 0.95,
199  'hltresultSizeMb': args.hltresult_size
200  }
201  if args.debug:
202  cdict['HLTMPPU']['debug'] = args.debug
203  if args.script_after_fork:
204  cdict['HLTMPPU']['scriptAfterFork'] = args.script_after_fork
205 
206  if args.file:
207  cdict['datasource'] = {
208  'module': 'dffileds',
209  'dslibrary': 'DFDcmEmuBackend',
210  'compressionFormat': 'ZLIB',
211  'compressionLevel': 2,
212  'file': args.file,
213  'loopFiles': args.loop_files,
214  'numEvents': args.number_of_events,
215  'outFile': args.save_output,
216  'preload': False,
217  'extraL1Robs': args.extra_l1r_robs,
218  'skipEvents': args.skip_events
219  }
220  else:
221  cdict['datasource'] = {'module': 'dcmds'}
222 
223  cdict['global'] = {
224  'date': args.sor_time,
225  'detector_mask': args.detector_mask,
226  'log_root': cdict['HLTMPPU']['log_root'],
227  'options_file': None,
228  'partition_name': args.partition,
229  'ros2robs': args.ros2rob,
230  'run_number': args.run_number,
231  'save_options': None,
232  'solenoid_current': 7730,
233  'toroid_current': 20400,
234  'schema_files': ['Larg.LArNoiseBurstCandidates.is.schema.xml'],
235  'with_infrastructure': args.oh_monitoring
236  }
237 
238  if not args.file:
239  cdict['global']['trigger_type'] = ''
240  cdict['global']['detector_mask'] = 'f'*32
241  cdict['global']['beam_type'] = 0
242  cdict['global']['beam_energy'] = 0
243  cdict['global']['T0_project_tag'] = ''
244 
245  if args.oh_monitoring:
246  cdict['monitoring'] = {
247  'module': 'monsvcis',
248  'library': 'MonSvcInfoService',
249  'ISInterval': 10,
250  'ISRegex': '.*',
251  'ISServer': '${TDAQ_IS_SERVER=DF}',
252  'ISSlots': 1,
253  'OHInterval': args.oh_interval,
254  'OHInclude': '.*',
255  'OHExclude': '',
256  'OHServerName': 'HLT-Histogramming',
257  'OHSlots': 5
258  }
259 
260  cdict['trigger'] = {
261  'library': ['TrigPSC'],
262  'joType' : args.joboptionsvc_type,
263  'msgType' : args.msgsvc_type
264  }
265  if not args.use_database:
266  cdict['trigger'].update({
267  'module': 'joboptions',
268  'joFile': args.jobOptions,
269  'SMK': None,
270  'l1PSK': None,
271  'l1BG': 0,
272  'l1MenuConfig': 'DB',
273  'precommand' : args.precommand,
274  'postcommand' : args.postcommand,
275  'logLevels' : args.log_level
276  })
277  # Python bootstrap depending on file type
278  cdict['trigger']['pythonSetupFile'] = 'TrigPSC.TrigPSCPythonDbSetup' if \
279  args.jobOptions.endswith('.json') else 'TrigPSC.TrigPSCPythonCASetup'
280 
281  else:
282  cdict['trigger'].update({
283  'module': 'DBPython',
284  'pythonSetupFile' : 'TrigPSC.TrigPSCPythonDbSetup',
285  'db_alias': args.db_server,
286  'coral_server': args.db_server,
287  'use_coral': True,
288  'SMK': args.smk,
289  'l1PSK': args.l1psk,
290  'HLTPSK': args.hltpsk,
291  'l1BG': 0,
292  'l1MenuConfig': 'DB',
293  'precommand' : args.precommand,
294  'postcommand' : args.postcommand,
295  'logLevels' : args.log_level
296  })
297 
298  return cdict
299 
300 
301 class MyHelp(argparse.Action):
302  """Custom help to hide/show expert groups"""
303  def __call__(self, parser, namespace, values, option_string=None):
304 
305  for g in parser.expert_groups:
306  for a in g._group_actions:
307  if values!='all':
308  a.help = argparse.SUPPRESS
309 
310  parser.print_help()
311  if values!='all':
312  print('\nUse --help=all to show all (expert) options')
313  sys.exit(0)
314 
315 
316 def main():
317  parser = argparse.ArgumentParser(prog='athenaHLT.py', formatter_class=
318  lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
319  usage = '%(prog)s [OPTION]... -f FILE jobOptions',
320  add_help=False)
321  parser.expert_groups = [] # Keep list of expert option groups
322 
323 
324  g = parser.add_argument_group('Options')
325  g.add_argument('jobOptions', nargs='?', help='job options, CA module or JSON file')
326  g.add_argument('--threads', metavar='N', type=int, default=1, help='number of threads')
327  g.add_argument('--nprocs', metavar='N', type=int, default=1, help='number of children to fork')
328  g.add_argument('--concurrent-events', metavar='N', type=int, help='number of concurrent events if different from --threads')
329  g.add_argument('--log-level', '-l', metavar='LVL', type=arg_log_level, default='INFO,ERROR', help='OutputLevel of athena,POOL')
330  g.add_argument('--precommand', '-c', metavar='CMD', action='append', default=[],
331  help='Python commands executed before job options or database configuration')
332  g.add_argument('--postcommand', '-C', metavar='CMD', action='append', default=[],
333  help='Python commands executed after job options or database configuration')
334  g.add_argument('--interactive', '-i', action='store_true', help='interactive mode')
335  g.add_argument('--help', '-h', nargs='?', choices=['all'], action=MyHelp, help='show help')
336 
337  g = parser.add_argument_group('Input/Output')
338  g.add_argument('--file', '--filesInput', '-f', action='append', help='input RAW file')
339  g.add_argument('--save-output', '-o', metavar='FILE', help='output file name')
340  g.add_argument('--number-of-events', '--evtMax', '-n', metavar='N', type=int, default=-1, help='processes N events (<=0 means all)')
341  g.add_argument('--skip-events', '--skipEvents', '-k', metavar='N', type=int, default=0, help='skip N first events')
342  g.add_argument('--loop-files', action='store_true', help='loop over input files if no more events')
343 
344 
345  g = parser.add_argument_group('Performance and debugging')
346  g.add_argument('--debug', '-d', nargs='?', const='child', choices=['parent','child'],
347  help='attach debugger (to child by default)')
348  g.add_argument('--script-after-fork', metavar='CMD', help='Execute a command after forking. The command has to be in the'
349  ' form "whichProc:cmd" where whichProc is one of [mother,firstFork,allForks] and cmd can contain a format'
350  ' string {pid} which will be replaced with the PID of the corresponding process (mother or fork).')
351  g.add_argument('--perfmon', action='store_true', help='enable PerfMon')
352  g.add_argument('--tcmalloc', action='store_true', default=True, help='use tcmalloc')
353  g.add_argument('--stdcmalloc', action='store_true', help='use stdcmalloc')
354  g.add_argument('--stdcmath', action='store_true', help='use stdcmath library')
355  g.add_argument('--imf', action='store_true', default=True, help='use Intel math library')
356  g.add_argument('--show-includes', '-s', action='store_true', help='show printout of included files')
357  g.add_argument('--timeout', metavar='MSEC', default=60*60*1000, help='timeout in milliseconds')
358 
359 
360  g = parser.add_argument_group('Database')
361  g.add_argument('--use-database', '-b', action='store_true',
362  help='configure from trigger database, reading keys from COOL if not specified')
363  g.add_argument('--db-server', metavar='DB', default='TRIGGERDB_RUN3', help='DB server name')
364  g.add_argument('--smk', type=int, default=None, help='Super Master Key')
365  g.add_argument('--l1psk', type=int, default=None, help='L1 prescale key')
366  g.add_argument('--hltpsk', type=int, default=None, help='HLT prescale key')
367  g.add_argument('--dump-config', action='store_true', help='Dump joboptions JSON file')
368  g.add_argument('--dump-config-exit', action='store_true', help='Dump joboptions JSON file and exit')
369 
370 
371  g = parser.add_argument_group('Online Histogramming')
372  g.add_argument('--oh-monitoring', '-M', action='store_true',
373  help='enable OH monitoring')
374  g.add_argument('--oh-interval', metavar='SEC', type=int, default=5,
375  help='seconds between histogram publications.')
376 
377 
378  g = parser.add_argument_group('Conditions')
379  g.add_argument('--run-number', '-R', metavar='RUN', type=int,
380  help='run number (if None, read from first event)')
381  g.add_argument('--lb-number', '-L', metavar='LBN', type=int,
382  help='lumiblock number (if None, read from first event)')
383  g.add_argument('--sor-time', type=arg_sor_time,
384  help='The Start Of Run time. Three formats are accepted: '
385  '1) the string "now", for current time; '
386  '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
387  '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from COOL')
388  g.add_argument('--detector-mask', metavar='MASK', type=arg_detector_mask,
389  help='detector mask (if None, read from COOL), use string "all" to enable all detectors')
390 
391 
392  g = parser.add_argument_group('Expert')
393  parser.expert_groups.append(g)
394  g.add_argument('--joboptionsvc-type', metavar='TYPE', default='TrigConf::JobOptionsSvc', help='JobOptionsSvc type')
395  g.add_argument('--msgsvc-type', metavar='TYPE', default='TrigMessageSvc', help='MessageSvc type')
396  g.add_argument('--partition', '-p', metavar='NAME', default='athenaHLT', help='partition name')
397  g.add_argument('--no-ers-signal-handlers', action='store_true', help='disable ERS signal handlers')
398  g.add_argument('--preloadlib', metavar='LIB', help='preload an arbitrary library')
399  g.add_argument('--unique-log-files', '-ul', action='store_true', help='add pid/timestamp to worker log files')
400  g.add_argument('--debug-fork', action='store_true', help='Dump open files/threads during forking')
401  g.add_argument('--hltresult-size', metavar='MB', type=int, default=10, help='Maximum HLT result size in MB')
402  g.add_argument('--extra-l1r-robs', metavar='ROBS', type=arg_eval, default=[],
403  help='List of additional ROB IDs that are considered part of the L1 result and passed to the HLT')
404  g.add_argument('--ros2rob', metavar='DICT', type=arg_ros2rob, default={},
405  help='Either a string in the form of python dictionary that contains ros-rob mappings '
406  'or a file path that contains such string. For example, /path/to/rosmap.txt or '
407  '{"ROS0":[0x11205,0x11206],"ROS1":[2120005,2120006]}')
408  g.add_argument('--cfgdict', metavar='DICT', type=arg_eval, default={},
409  help='HLTMPPy config dictionary with additional options, e.g.: '
410  '--cfgdict \'{"global": {"log_root" : "/tmp"}}\'')
411 
412  (args, unparsed_args) = parser.parse_known_args()
413  check_args(parser, args)
414 
415  # set default OutputLevels and file inclusion
416  import AthenaCommon.Logging
417  AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
418  AthenaCommon.Logging.log.setFormat("%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
419  if args.show_includes:
420  from AthenaCommon.Include import include
421  include.setShowIncludes( True )
422 
423  # consistency checks for arguments
424  if not args.concurrent_events:
425  args.concurrent_events = args.threads
426 
427  if args.loop_files and args.number_of_events<0:
428  log.warning("Looping over files without specifying number of events will run forever!")
429 
430  # interactive mode only becomes active after python configuration/reload:
431  if args.interactive and not (args.use_database or args.jobOptions.endswith('.json')):
432  args.interactive = False
433 
434  # Update args and set athena flags
435  from TrigPSC import PscConfig
436 
437  update_run_params(args)
438  if args.use_database:
439  # If HLTPSK was given on the command line, we ignore what is stored in COOL
440  PscConfig.forcePSK = (args.hltpsk is not None)
441  # We always read the keys corresponding to the run/LB in the first file
443 
444  # get HLTMPPY config dictionary
445  cdict = HLTMPPy_cfgdict(args)
446 
447  # Apply any expert-level overrides
448  update_nested_dict(cdict, args.cfgdict)
449 
450  # Modify pre/postcommands if necessary
451  update_pcommands(args, cdict)
452 
453  # Extra Psc configuration
454  from TrigPSC.PscDefaultFlags import defaultOnlineFlags
455  flags = defaultOnlineFlags()
456 
457  # Fill flags from command line (if not running from DB/JSON):
458  if not args.use_database and not args.jobOptions.endswith('.json'):
459  PscConfig.unparsedArguments = unparsed_args
460  for flag_arg in unparsed_args:
461  flags.fillFromString(flag_arg)
462 
463  PscConfig.interactive = args.interactive
464  PscConfig.exitAfterDump = args.dump_config_exit
465 
466  flags.PerfMon.doFastMonMT = args.perfmon
467  flags.Trigger.Online.useOnlineTHistSvc = args.oh_monitoring
468 
469  # Run HLTMPPU
470  from HLTMPPy.runner import runHLTMPPy
471  runHLTMPPy(cdict)
472 
473 
474 if "__main__" in __name__:
475  sys.exit(main())
athenaHLT.MyHelp
Definition: athenaHLT.py:301
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
vtune_athena.format
format
Definition: vtune_athena.py:14
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
athenaHLT.MyHelp.__call__
def __call__(self, parser, namespace, values, option_string=None)
Definition: athenaHLT.py:303
athenaHLT.arg_sor_time
def arg_sor_time(s)
The following arg_* methods are used as custom types in argparse.
Definition: athenaHLT.py:60
athenaHLT.main
def main()
Definition: athenaHLT.py:316
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
athenaHLT.update_nested_dict
def update_nested_dict(d, u)
Definition: athenaHLT.py:172
athenaHLT.update_trigconf_keys
def update_trigconf_keys(args)
Definition: athenaHLT.py:155
athenaHLT.arg_log_level
def arg_log_level(s)
Definition: athenaHLT.py:94
athenaHLT.arg_eval
def arg_eval(s)
Definition: athenaHLT.py:100
athenaHLT.update_run_params
def update_run_params(args)
Definition: athenaHLT.py:125
athenaHLT.arg_detector_mask
def arg_detector_mask(s)
Definition: athenaHLT.py:67
athenaHLT.arg_ros2rob
def arg_ros2rob(s)
Definition: athenaHLT.py:75
athenaHLT.update_pcommands
def update_pcommands(args, cdict)
Definition: athenaHLT.py:117
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
athenaHLT.HLTMPPy_cfgdict
def HLTMPPy_cfgdict(args)
Definition: athenaHLT.py:181
Trk::open
@ open
Definition: BinningType.h:40
python.PscDefaultFlags.defaultOnlineFlags
def defaultOnlineFlags()
Definition: PscDefaultFlags.py:28
athenaHLT.check_args
def check_args(parser, args)
Definition: athenaHLT.py:104
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
str
Definition: BTagTrackIpAccessor.cxx:11
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
readCCLHist.float
float
Definition: readCCLHist.py:83
Trk::split
@ split
Definition: LayerMaterialProperties.h:38