ATLAS Offline Software
Loading...
Searching...
No Matches
athenaEF.py
Go to the documentation of this file.
1#!/bin/sh
2# -*- mode: python -*-
3#
4# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
5#
6# athenaEF.py - A modified version of athenaHLT.py that runs the HLT configuration
7# directly without using HLTMPPy/HLTMPPU. It creates the configuration like
8# athenaHLT but executes it like athena.py does.
9#
10"""date"
11
12# defaults
13export USETCMALLOC=1
14export USEIMF=1
15
16# parse command line arguments
17for a in ${@}
18do
19 case "$a" in
20 --stdcmalloc) USETCMALLOC=0;;
21 --tcmalloc) USETCMALLOC=1;;
22 --stdcmath) USEIMF=0;;
23 --imf) USEIMF=1;;
24 --preloadlib*) export ATHENA_ADD_PRELOAD=${a#*=};;
25 --no-ers-signal-handlers) export TDAQ_ERS_NO_SIGNAL_HANDLERS=1;;
26 esac
27done
28
29# Do the actual preloading via LD_PRELOAD
30source `which athena_preload.sh `
31
32# Now resurrect ourselves as python script
33python_path=`which python`
34"exec" "$python_path" "-tt" "$0" "$@";
35
36"""
37
38import sys
39import os
40import argparse
41import json
42import pickle
43import traceback
44from datetime import datetime as dt
45
46from TrigConfStorage.TriggerCrestUtil import TriggerCrestUtil
47
48# Use single-threaded oracle client library to avoid extra
49# threads when forking (see ATR-21890, ATDBOPS-115)
50os.environ["CORAL_ORA_NO_OCI_THREADED"] = "1"
51
52from TrigCommon import AthHLT
53from AthenaCommon.Logging import logging
54log = logging.getLogger('athenaEF')
55
56
57# =============================================================================
58# Run Parameters Configuration
59# =============================================================================
60# Default values for run parameters used by prepareForStart.
61# These can be overridden by command-line arguments or fetched from IS.
62
64 """
65 Container for run parameters needed by HltEventLoopMgr::prepareForStart().
66
67 This class centralizes all run parameter defaults or their retrieval from IS.
68 """
69
70 # Default values
71 DEFAULT_RUN_NUMBER = 0
72 DEFAULT_LB_NUMBER = 0
73 DEFAULT_DETECTOR_MASK = 'f' * 32 # All detectors enabled
74 DEFAULT_SOR_TIME = None # Will use 'now' if not set
75 DEFAULT_SOLENOID_CURRENT = 7730.0 # (nominal)
76 DEFAULT_TOROIDS_CURRENT = 20400.0 # (nominal)
77 DEFAULT_BEAM_TYPE = 0
78 DEFAULT_BEAM_ENERGY = 0
79 DEFAULT_RUN_TYPE = "Physics"
80 DEFAULT_TRIGGER_TYPE = 0
81 DEFAULT_RECORDING_ENABLED = False
82
83 def __init__(self,
84 run_number=None,
85 lb_number=None,
86 detector_mask=None,
87 sor_time=None,
88 solenoid_current=None,
89 toroids_current=None,
90 beam_type=None,
91 beam_energy=None,
92 run_type=None,
93 trigger_type=None,
94 recording_enabled=None,
95 conditions_run=None,
96 T0_project_tag='',
97 stream='',
98 lumiblock=0):
99 """Initialize run parameters with defaults for any unspecified values."""
100 self.run_number = run_number if run_number is not None else self.DEFAULT_RUN_NUMBER
101 self.lb_number = lb_number if lb_number is not None else self.DEFAULT_LB_NUMBER
102 self.detector_mask = detector_mask if detector_mask is not None else self.DEFAULT_DETECTOR_MASK
103 self.sor_time = sor_time if sor_time is not None else dt.now().strftime('%d/%m/%y %H:%M:%S.%f')
104 self.solenoid_current = solenoid_current
105 self.toroids_current = toroids_current
106 self.beam_type = beam_type if beam_type is not None else self.DEFAULT_BEAM_TYPE
107 self.beam_energy = beam_energy if beam_energy is not None else self.DEFAULT_BEAM_ENERGY
108 self.run_type = run_type if run_type is not None else self.DEFAULT_RUN_TYPE
109 self.trigger_type = trigger_type if trigger_type is not None else self.DEFAULT_TRIGGER_TYPE
110 self.recording_enabled = recording_enabled if recording_enabled is not None else self.DEFAULT_RECORDING_ENABLED
111 self.conditions_run = conditions_run # Reference run for conditions lookup (None = use run_number)
112 self.T0_project_tag = T0_project_tag
113 self.stream = stream
114 self.lumiblock = lumiblock
115
116 def to_dict(self):
117 """Return run parameters as a dictionary for prepareForStart."""
118 return {
119 'run_number': self.run_number,
120 'lb_number': self.lb_number,
121 'detector_mask': self.detector_mask,
122 'sor_time': self.sor_time,
123 'solenoid_current': self.solenoid_current,
124 'toroids_current': self.toroids_current,
125 'beam_type': self.beam_type,
126 'beam_energy': self.beam_energy,
127 'run_type': self.run_type,
128 'trigger_type': self.trigger_type,
129 'recording_enabled': self.recording_enabled,
130 'conditions_run': self.conditions_run,
131 'T0_project_tag': self.T0_project_tag,
132 'stream': self.stream,
133 'lumiblock': self.lumiblock,
134 }
135
136 @classmethod
137 def from_args(cls, args):
138 """Create RunParams from argparse args, using defaults for unset values."""
139 return cls(
140 run_number=args.run_number,
141 lb_number=args.lb_number,
142 detector_mask=args.detector_mask,
143 sor_time=args.sor_time,
144 solenoid_current=getattr(args, 'solenoid_current', None),
145 toroids_current=getattr(args, 'toroids_current', None),
146 conditions_run=getattr(args, 'conditions_run', None),
147 T0_project_tag=getattr(args, 'T0_project_tag', ''),
148 stream=getattr(args, 'stream', ''),
149 lumiblock=getattr(args, 'lumiblock', 0),
150 )
151
152 @classmethod
153 def from_is(cls, partition=None, webdaq_base=None, strict=False,
154 solenoid_current_override=None, toroids_current_override=None):
155 """
156 Create RunParams by reading from IS via the WEBDAQ REST API.
157
158 This uses the webis_server REST API to fetch run parameters, avoiding
159 direct dependencies on TDAQ libraries. The API endpoint is determined by:
160 1. The webdaq_base parameter if provided
161 2. The TDAQ_WEBDAQ_BASE environment variable
162
163 The IS objects accessed are:
164 - RunParams.RunParams: run_number, det_mask, timeSOR, trigger_type, etc.
165 - Magnets.Magnets: SolenoidCurrent, ToroidsCurrent
166
167 Args:
168 partition: The partition name (default: from TDAQ_PARTITION env var)
169 webdaq_base: Base URL for webis_server (default: from TDAQ_WEBDAQ_BASE env var)
170 strict: If True, raise an exception if IS read fails (for --online-environment)
171 solenoid_current_override: If provided, skip IS fetch for solenoid (command-line override)
172 toroids_current_override: If provided, skip IS fetch for toroids (command-line override)
173
174 Returns:
175 RunParams instance with values from IS, or defaults if unavailable
176
177 Raises:
178 RuntimeError: If strict=True and IS read fails
179 """
180 import requests
181
182 # Determine the base URL
183 if webdaq_base is None:
184 webdaq_base = os.environ.get('TDAQ_WEBDAQ_BASE')
185
186 if not webdaq_base:
187 msg = "TDAQ_WEBDAQ_BASE not set, cannot read from IS"
188 if strict:
189 raise RuntimeError(msg + " (required for --online-environment)")
190 log.warning(msg + ". Using defaults.")
191 return cls()
192
193 # Determine partition
194 if partition is None:
195 partition = os.environ.get('TDAQ_PARTITION', 'ATLAS')
196
197 log.info("Reading run parameters from IS via WEBDAQ: %s (partition=%s)", webdaq_base, partition)
198
199 params = {}
200
201 # Fetch RunParams from IS
202 # API: GET /info/current/{partition}/is/{server}/{server}.{name}?format=compact
203 # Response format: [name, type, timestamp, data] - we need element [3]
204 try:
205 url = f"{webdaq_base}/info/current/{partition}/is/RunParams/RunParams.RunParams?format=compact"
206 log.debug("Fetching RunParams from: %s", url)
207
208 response = requests.get(url, timeout=10)
209 if response.status_code == 200:
210 response_data = response.json()
211 log.debug("RunParams response from IS: %s", response_data)
212
213 # Response is [name, type, timestamp, data]
214 if isinstance(response_data, list) and len(response_data) >= 4:
215 runparams = response_data[3]
216 else:
217 runparams = response_data
218
219 log.debug("RunParams data: %s", runparams)
220
221 # Map IS fields to our RunParams fields
222 if 'run_number' in runparams:
223 params['run_number'] = int(runparams['run_number'])
224 if 'lumiblock' in runparams:
225 params['lb_number'] = int(runparams['lumiblock'])
226 if 'det_mask' in runparams:
227 params['detector_mask'] = runparams['det_mask']
228 if 'timeSOR' in runparams:
229 sor_time = runparams['timeSOR']
230 # Ensure microseconds are present (TrigSORFromPtreeHelper expects format with .%f)
231 if '.' not in sor_time:
232 sor_time += '.000000'
233 params['sor_time'] = sor_time
234 if 'beam_type' in runparams:
235 params['beam_type'] = int(runparams['beam_type'])
236 if 'beam_energy' in runparams:
237 params['beam_energy'] = int(runparams['beam_energy'])
238 if 'run_type' in runparams:
239 params['run_type'] = runparams['run_type']
240 if 'trigger_type' in runparams:
241 params['trigger_type'] = int(runparams['trigger_type'])
242 if 'recording_enabled' in runparams:
243 params['recording_enabled'] = runparams['recording_enabled'] in ('1', 'true', 'True', True, 1)
244
245 log.info("Got run parameters from IS: run=%s, lb=%s",
246 params.get('run_number'), params.get('lb_number'))
247 else:
248 msg = f"Failed to fetch RunParams from IS: HTTP {response.status_code}"
249 if strict:
250 raise RuntimeError(msg + " (required for --online-environment)")
251 log.warning(msg)
252
253 except requests.exceptions.RequestException as e:
254 msg = f"Error fetching RunParams from IS: {e}"
255 if strict:
256 raise RuntimeError(msg + " (required for --online-environment)")
257 log.warning(msg)
258 except (ValueError, KeyError) as e:
259 msg = f"Error parsing RunParams from IS: {e}"
260 if strict:
261 raise RuntimeError(msg + " (required for --online-environment)")
262 log.warning(msg)
263
264 # Fetch Magnets from IS
265 # In strict mode (online environment), magnets are required unless provided via command line
266 # If command-line overrides are provided, use those instead of fetching from IS
267 have_solenoid_override = solenoid_current_override is not None
268 have_toroids_override = toroids_current_override is not None
269
270 if have_solenoid_override:
271 params['solenoid_current'] = solenoid_current_override
272 log.info("Using solenoid_current=%.1f from command line override", solenoid_current_override)
273 if have_toroids_override:
274 params['toroids_current'] = toroids_current_override
275 log.info("Using toroids_current=%.1f from command line override", toroids_current_override)
276
277 # Only fetch from IS if we need at least one value
278 if not (have_solenoid_override and have_toroids_override):
279 try:
280 url = f"{webdaq_base}/info/current/{partition}/is/Magnets/Magnets.Magnets?format=compact"
281 log.debug("Fetching Magnets from: %s", url)
282
283 response = requests.get(url, timeout=10)
284 if response.status_code == 200:
285 response_data = response.json()
286 log.debug("Magnets response from IS: %s", response_data)
287
288 magnets = response_data[3] if isinstance(response_data, list) and len(response_data) >= 4 else response_data
289 log.debug("Magnets data: %s", magnets)
290
291 # Magnets structure: { "SolenoidCurrent": {"value": ..., "ts": ...},
292 # "ToroidsCurrent": {"value": ..., "ts": ...} }
293 if not have_solenoid_override:
294 params['solenoid_current'] = float(magnets['SolenoidCurrent']['value'])
295 if not have_toroids_override:
296 params['toroids_current'] = float(magnets['ToroidsCurrent']['value'])
297
298 log.info("Got magnet currents from IS: solenoid=%s, toroids=%s",
299 params.get('solenoid_current'), params.get('toroids_current'))
300 elif strict:
301 raise RuntimeError(f"Magnets not available from IS: HTTP {response.status_code} "
302 "(required for --online-environment, use --solenoid-current and --toroids-current to override)")
303 else:
304 log.debug("Magnets not available from IS: HTTP %d", response.status_code)
305
306 except requests.exceptions.RequestException as e:
307 if strict:
308 raise RuntimeError(f"Error fetching Magnets from IS: {e} "
309 "(required for --online-environment, use --solenoid-current and --toroids-current to override)")
310 log.debug("Error fetching Magnets from IS: %s", e)
311 except (ValueError, KeyError, TypeError) as e:
312 if strict:
313 raise RuntimeError(f"Error parsing Magnets from IS: {e} "
314 "(required for --online-environment, use --solenoid-current and --toroids-current to override)")
315 log.debug("Error parsing Magnets from IS: %s", e)
316
317 # In strict mode, verify we got at least run_number from IS
318 if strict and 'run_number' not in params:
319 raise RuntimeError("Failed to get run_number from IS (required for --online-environment)")
320
321 return cls(**params)
322
323
324def get_trigconf_keys_from_oks(partition=None, webdaq_base=None, strict=False):
325 """
326 Read trigger configuration keys (SMK, L1PSK, HLTPSK) and DB info from OKS via WEBDAQ REST API.
327
328 This reads the keys from the partition's TriggerConfiguration object and its
329 related L1TriggerConfiguration and TriggerDBConnection objects.
330
331 OKS Structure:
332 - Partition -> TriggerConfiguration -> L1TriggerConfiguration (Lvl1PrescaleKey)
333 - Partition -> TriggerConfiguration -> TriggerDBConnection (SuperMasterKey)
334 - Partition -> TriggerConfiguration -> HLTImplementationDB (hltPrescaleKey)
335
336 Args:
337 partition: The partition name (default: from TDAQ_PARTITION env var)
338 webdaq_base: Base URL for webis_server (default: from TDAQ_WEBDAQ_BASE env var)
339 strict: If True, raise an exception if OKS read fails (for --online-environment)
340
341 Returns:
342 dict with keys: SMK, L1PSK, HLTPSK, db_alias (values may be None if not found)
343
344 Raises:
345 RuntimeError: If strict=True and OKS read fails
346 """
347 import requests
348
349 # Determine the base URL
350 if webdaq_base is None:
351 webdaq_base = os.environ.get('TDAQ_WEBDAQ_BASE')
352
353 if not webdaq_base:
354 msg = "TDAQ_WEBDAQ_BASE not set, cannot read from OKS"
355 if strict:
356 raise RuntimeError(msg + " (required for --online-environment)")
357 log.warning(msg)
358 return {'SMK': None, 'L1PSK': None, 'HLTPSK': None, 'db_alias': None}
359
360 # Determine partition
361 if partition is None:
362 partition = os.environ.get('TDAQ_PARTITION', 'ATLAS')
363
364 log.info("Reading trigger configuration keys from OKS via WEBDAQ: %s (partition=%s)",
365 webdaq_base, partition)
366
367 result = {'SMK': None, 'L1PSK': None, 'HLTPSK': None, 'db_alias': None}
368
369 def extract_oks_data(response_json):
370 """
371 Extract data from OKS compact format: [name, type, attributes, relationships]
372 Returns tuple (attributes_dict, relationships_dict)
373 """
374 if isinstance(response_json, list) and len(response_json) >= 4:
375 return response_json[2], response_json[3] # attributes, relationships
376 elif isinstance(response_json, list) and len(response_json) >= 3:
377 return response_json[2], {} # attributes only
378 return response_json, {} # fallback
379
380 def get_ref_id(ref):
381 """Extract object ID from a relationship reference."""
382 if isinstance(ref, list) and len(ref) >= 2:
383 return ref[0] # [id, class] format
384 elif isinstance(ref, dict) and 'id' in ref:
385 return ref['id']
386 elif isinstance(ref, str):
387 return ref
388 return None
389
390 # OKS API: GET /info/current/{partition}/oks/{class}/{name}?format=compact
391 # Response format: [name, type, attributes, relationships]
392 # - attributes: dict of simple values (strings, ints, etc.)
393 # - relationships: dict of references to other objects
394 try:
395 url = f"{webdaq_base}/info/current/{partition}/oks/Partition/{partition}?format=compact"
396 log.debug("Fetching Partition from OKS: %s", url)
397
398 response = requests.get(url, timeout=10)
399 if response.status_code == 200:
400 part_attrs, part_rels = extract_oks_data(response.json())
401 log.debug("Partition attributes: %s", part_attrs)
402 log.debug("Partition relationships: %s", part_rels)
403
404 # Get TriggerConfiguration reference from relationships
405 trig_conf_id = None
406 if 'TriggerConfiguration' in part_rels:
407 trig_conf_id = get_ref_id(part_rels['TriggerConfiguration'])
408
409 if trig_conf_id:
410 log.debug("TriggerConfiguration ID: %s", trig_conf_id)
411
412 # Get TriggerConfiguration object
413 url = f"{webdaq_base}/info/current/{partition}/oks/TriggerConfiguration/{trig_conf_id}?format=compact"
414 response = requests.get(url, timeout=10)
415 if response.status_code == 200:
416 trig_attrs, trig_rels = extract_oks_data(response.json())
417 log.debug("TriggerConfiguration attributes: %s", trig_attrs)
418 log.debug("TriggerConfiguration relationships: %s", trig_rels)
419
420 # Get L1TriggerConfiguration for L1PSK (relationship 'l1')
421 if 'l1' in trig_rels:
422 l1_id = get_ref_id(trig_rels['l1'])
423 if l1_id:
424 url = f"{webdaq_base}/info/current/{partition}/oks/L1TriggerConfiguration/{l1_id}?format=compact"
425 resp = requests.get(url, timeout=10)
426 if resp.status_code == 200:
427 l1_attrs, _ = extract_oks_data(resp.json())
428 log.debug("L1TriggerConfiguration attributes: %s", l1_attrs)
429 if 'Lvl1PrescaleKey' in l1_attrs:
430 result['L1PSK'] = int(l1_attrs['Lvl1PrescaleKey'])
431 log.info("Got L1PSK=%d from OKS", result['L1PSK'])
432
433 # Get TriggerDBConnection for SMK and db_alias (relationship 'TriggerDBConnection')
434 if 'TriggerDBConnection' in trig_rels:
435 db_id = get_ref_id(trig_rels['TriggerDBConnection'])
436 if db_id:
437 url = f"{webdaq_base}/info/current/{partition}/oks/TriggerDBConnection/{db_id}?format=compact"
438 resp = requests.get(url, timeout=10)
439 if resp.status_code == 200:
440 db_attrs, _ = extract_oks_data(resp.json())
441 log.debug("TriggerDBConnection attributes: %s", db_attrs)
442 if 'SuperMasterKey' in db_attrs:
443 result['SMK'] = int(db_attrs['SuperMasterKey'])
444 log.info("Got SMK=%d from OKS", result['SMK'])
445 if 'Alias' in db_attrs:
446 result['db_alias'] = db_attrs['Alias']
447 log.info("Got db_alias=%s from OKS", result['db_alias'])
448
449 # Get HLTImplementationDB for HLTPSK (relationship 'hlt')
450 if 'hlt' in trig_rels:
451 hlt_id = get_ref_id(trig_rels['hlt'])
452 if hlt_id:
453 url = f"{webdaq_base}/info/current/{partition}/oks/HLTImplementationDB/{hlt_id}?format=compact"
454 resp = requests.get(url, timeout=10)
455 if resp.status_code == 200:
456 hlt_attrs, _ = extract_oks_data(resp.json())
457 log.debug("HLTImplementationDB attributes: %s", hlt_attrs)
458 if 'hltPrescaleKey' in hlt_attrs:
459 result['HLTPSK'] = int(hlt_attrs['hltPrescaleKey'])
460 log.info("Got HLTPSK=%d from OKS", result['HLTPSK'])
461 else:
462 msg = f"Failed to fetch Partition from OKS: HTTP {response.status_code}"
463 if strict:
464 raise RuntimeError(msg + " (required for --online-environment)")
465 log.warning(msg)
466
467 except requests.exceptions.RequestException as e:
468 msg = f"Error fetching trigger keys from OKS: {e}"
469 if strict:
470 raise RuntimeError(msg + " (required for --online-environment)")
471 log.warning(msg)
472 except (ValueError, KeyError, TypeError) as e:
473 msg = f"Error parsing trigger keys from OKS: {e}"
474 if strict:
475 raise RuntimeError(msg + " (required for --online-environment)")
476 log.warning(msg)
477
478 # In strict mode, verify we got the required keys from OKS
479 if strict:
480 missing = [k for k in ['SMK', 'L1PSK', 'HLTPSK'] if result.get(k) is None]
481 if missing:
482 raise RuntimeError(f"Failed to get {', '.join(missing)} from OKS (required for --online-environment)")
483
484 return result
485
486
487def get_run_params(args=None, from_is=False, partition=None, webdaq_base=None, strict=False,
488 solenoid_current_override=None, toroids_current_override=None):
489 """
490 Get run parameters from the appropriate source.
491
492 This is the main entry point for obtaining run parameters. It provides
493 a single place to modify when adding new sources (like WEBDAQ).
494
495 Args:
496 args: argparse Namespace with command-line arguments (optional)
497 from_is: If True, try to read from WEBDAQ first
498 partition: Partition name for IS access (defaults to TDAQ_PARTITION env var)
499 webdaq_base: WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE env var)
500 strict: If True, raise an exception if IS read fails (for --online-environment)
501 solenoid_current_override: Command-line override for solenoid current
502 toroids_current_override: Command-line override for toroids current
503
504 Returns:
505 RunParams instance
506
507 Raises:
508 RuntimeError: If strict=True and IS read fails
509 """
510 if from_is:
511 return RunParams.from_is(partition=partition, webdaq_base=webdaq_base, strict=strict,
512 solenoid_current_override=solenoid_current_override,
513 toroids_current_override=toroids_current_override)
514 elif args is not None:
515 return RunParams.from_args(args)
516 else:
517 return RunParams()
518
519
521 """
522 Runner class that executes Gaudi configuration from JSON file or database.
523 Uses TrigConf::JobOptionsSvc with TYPE="FILE" or TYPE="DB" to load configuration.
524 Same approach used by PSC (Psc.cxx) - it sets JobOptionsType and
525 JobOptionsPath on the ApplicationMgr, and TrigConf::JobOptionsSvc handles both
526 FILE and DB modes transparently.
527 """
528 def __init__(self, job_options_type, job_options_path, run_params=None,
529 properties=None, db_server=None, smk=None,
530 num_threads=1, num_slots=1, ef_files=None):
531 """
532 Args:
533 job_options_type: "FILE" or "DB"
534 job_options_path: JSON file path (for FILE) or DB connection string (for DB)
535 run_params: Run parameters dict for prepareForStart
536 properties: Pre-loaded properties dict (optional, for FILE mode)
537 db_server: DB server alias (for store() in DB mode)
538 smk: Super Master Key (for store() in DB mode)
539 num_threads: Number of threads for AvalancheSchedulerSvc.ThreadPoolSize
540 num_slots: Number of event slots for EventDataSvc.NSlots
541 ef_files: List of input files for EFInterfaceSvc
542 """
543 self.job_options_type = job_options_type
544 self.job_options_path = job_options_path
545 self.run_params = run_params or {}
546 self.properties = properties
547 self.db_server = db_server # For store() in DB mode
548 self.smk = smk # For store() in DB mode
549 self.num_threads = num_threads
550 self.num_slots = num_slots
551 self.ef_files = ef_files or [] # Input files for EFInterfaceSvc
552 self._app = None
553
554 @classmethod
555 def from_json(cls, json_file, run_params=None, properties=None,
556 num_threads=1, num_slots=1, ef_files=None):
557 """Create runner for JSON file (TYPE=FILE)"""
558 return cls("FILE", os.path.abspath(json_file), run_params, properties,
559 num_threads=num_threads, num_slots=num_slots, ef_files=ef_files)
560
561 @classmethod
562 def from_database(cls, db_server, smk, l1psk=None, hltpsk=None, run_params=None,
563 num_threads=1, num_slots=1, ef_files=None):
564 """Create runner for database (TYPE=DB)"""
565 # Build the DB connection string: server=X;smkey=Y;lvl1key=Z;hltkey=W
566 db_path = f"server={db_server};smkey={smk}"
567 if l1psk is not None:
568 db_path += f";lvl1key={l1psk}"
569 if hltpsk is not None:
570 db_path += f";hltkey={hltpsk}"
571 return cls("DB", db_path, run_params, db_server=db_server, smk=smk,
572 num_threads=num_threads, num_slots=num_slots, ef_files=ef_files)
573
574 def run(self, maxEvents=None):
575 """
576 This follows the same pattern as PSC (Psc.cxx):
577 1. Create ApplicationMgr via BootstrapHelper
578 2. Set JobOptionsSvcType, JobOptionsType, JobOptionsPath
579 3. configure() -> initialize() -> prepareForStart() -> start() ->
580 hltUpdateAfterFork() -> run() -> stop() -> finalize() -> terminate()
581 """
582 from Gaudi.Main import BootstrapHelper
583
584 # For FILE mode, load properties from JSON if not already provided
585 if self.job_options_type == "FILE" and self.properties is None:
586 with open(self.job_options_path, 'r') as f:
587 jocat = json.load(f)
588 self.properties = jocat.get('properties', {})
589
590 bsh = BootstrapHelper()
591 app = bsh.createApplicationMgr()
592 self._app = app
593
594 # For FILE mode, set ApplicationMgr properties from JSON before configure
595 if self.job_options_type == "FILE" and self.properties:
596 app_props = self.properties.get('ApplicationMgr', {})
597 for k, v in app_props.items():
598 if k not in ('JobOptionsSvcType', 'JobOptionsType', 'JobOptionsPath'):
599 log.debug("Setting ApplicationMgr.%s = %s", k, v)
600 app.setProperty(k, str(v) if not isinstance(v, str) else v)
601
602 # Set JobOptionsSvc properties like PSC does in Psc.cxx
603 log.info("Configuring TrigConf::JobOptionsSvc with TYPE=%s, PATH=%s",
605 app.setProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
606 app.setProperty("JobOptionsType", self.job_options_type)
607 app.setProperty("JobOptionsPath", self.job_options_path)
608
609 # Configure the application - TrigConf::JobOptionsSvc will load from FILE or DB
610 app.configure()
611
612 # Override EvtMax AFTER configure() only if explicitly specified by user
613 # Otherwise use whatever value is in the DB
614 if maxEvents is not None:
615 log.info("Setting EvtMax=%d (overriding DB value)", maxEvents)
616 app.setProperty('EvtMax', str(maxEvents))
617
618 # All property overrides below use iProperty and must be done after configure()
619 # but before initialize() - this is the same pattern as PSC (Psc.cxx)
620 from GaudiPython.Bindings import iProperty
621
622 # Override EFInterfaceSvc.NumEvents if user specified it (overrides DB value)
623 if maxEvents is not None:
624 log.info("Setting EFInterfaceSvc.NumEvents=%d (overriding DB value)", maxEvents)
625 iProperty("EFInterfaceSvc").NumEvents = maxEvents
626
627 # Set threading configuration
628 log.info("Setting threading: ThreadPoolSize=%d, NSlots=%d", self.num_threads, self.num_slots)
629 iProperty("AvalancheSchedulerSvc").ThreadPoolSize = self.num_threads
630 iProperty("EventDataSvc").NSlots = self.num_slots
631
632 # Set input files and metadata for EFInterfaceSvc (overrides what's in DB/JSON config)
633 if self.ef_files:
634 log.info("Setting EFInterfaceSvc.Files = %s", self.ef_files)
635 iProperty("EFInterfaceSvc").Files = self.ef_files
636 iProperty("EFInterfaceSvc").T0ProjectTag = self.run_params.get('T0_project_tag', '')
637 iProperty("EFInterfaceSvc").BeamType = self.run_params.get('beam_type', 0)
638 iProperty("EFInterfaceSvc").BeamEnergy = self.run_params.get('beam_energy', 0)
639 iProperty("EFInterfaceSvc").TriggerType = self.run_params.get('trigger_type', 0)
640 iProperty("EFInterfaceSvc").Stream = self.run_params.get('stream', '')
641 iProperty("EFInterfaceSvc").Lumiblock = self.run_params.get('lumiblock', 0)
642 iProperty("EFInterfaceSvc").DetMask = self.run_params.get('detector_mask', '')
643
644 # If HLT PSK is set on command line, read it from DB instead of COOL (ATR-25974)
645 # This is the same logic as TrigPSCPythonDbSetup.py
646 from TrigPSC import PscConfig
647 if PscConfig.forcePSK:
648 log.info("PscConfig.forcePSK is set - configuring HLTPrescaleCondAlg to read from DB instead of COOL")
649 iProperty("HLTPrescaleCondAlg").Source = "DB"
650
651 # Set forceRunNumber on HltEventLoopMgr if conditions_run is specified
652 # This overrides the run number used for IOV lookup in conditions loading
653 conditions_run = self.run_params.get('conditions_run')
654 if conditions_run is not None:
655 log.info("Setting HltEventLoopMgr.forceRunNumber=%d for conditions lookup", conditions_run)
656 iProperty("HltEventLoopMgr").forceRunNumber = conditions_run
657
658 # Initialize
659 sc = app.initialize()
660 if not sc.isSuccess():
661 log.error("Failed to initialize AppMgr")
662 return sc
663
664 # Initialize TrigServicesHelper for lifecycle calls (prepareForStart, prepareForRun, hltUpdateAfterFork)
665 try:
666 from TrigServices.TrigServicesHelper import TrigServicesHelper
667 helper = TrigServicesHelper()
668 except ImportError as e:
669 log.error("TrigServicesHelper not available: %s", e)
670 log.error("Cannot proceed without TrigServicesHelper - required for HLTEventLoopMgr lifecycle")
671 raise RuntimeError("TrigServicesHelper not available") from e
672
673 # Call prepareForStart to set up ByteStreamMetadata (like PSC does)
674 try:
675 run_number = self.run_params['run_number']
676 det_mask = self.run_params['detector_mask']
677 sor_time = self.run_params['sor_time']
678 solenoid_current = self.run_params['solenoid_current']
679 toroids_current = self.run_params['toroids_current']
680
681 log.info("Calling prepareForStart with run=%d, det_mask=0x%s, sor_time=%s",
682 run_number, det_mask, sor_time)
683
684 success = helper.prepareForStart(
685 run_number=run_number,
686 det_mask=det_mask,
687 sor_time=sor_time,
688 solenoid_current=solenoid_current,
689 toroids_current=toroids_current
690 )
691 if not success:
692 log.error("prepareForStart failed")
693 raise RuntimeError("prepareForStart failed")
694 log.info("prepareForStart completed successfully")
695 except Exception as e:
696 log.error("Error calling prepareForStart: %s", e)
697 traceback.print_exc()
698 raise
699
700 # Start
701 sc = app.start()
702 if not sc.isSuccess():
703 log.error("Failed to start AppMgr")
704 return sc
705
706 # prepareForRun initializes COOL folder helper - must be called after start()
707 # which fires the start incident
708 try:
709 log.info("Calling prepareForRun to initialize COOL folder helper")
710 success = helper.prepareForRun()
711 if not success:
712 log.error("prepareForRun failed")
713 raise RuntimeError("prepareForRun failed")
714 log.info("prepareForRun completed successfully")
715 except Exception as e:
716 log.error("Error calling prepareForRun: %s", e)
717 traceback.print_exc()
718 raise
719
720 # hltUpdateAfterFork initializes the scheduler (like PSC does after fork)
721 # worker_id=1 for single-worker, non-forked mode
722 try:
723 log.info("Calling hltUpdateAfterFork to initialize scheduler (worker_id=1)")
724 success = helper.hltUpdateAfterFork(worker_id=1)
725 if not success:
726 log.error("hltUpdateAfterFork failed")
727 raise RuntimeError("hltUpdateAfterFork failed")
728 log.info("hltUpdateAfterFork completed successfully")
729 except Exception as e:
730 log.error("Error calling hltUpdateAfterFork: %s", e)
731 traceback.print_exc()
732 raise
733
734 # Run the event loop
735 # Note: Python signal handlers won't work during C++ execution.
736 nevt = maxEvents if maxEvents is not None else -1
737 sc = app.run(nevt)
738
739 if not sc.isSuccess():
740 log.error("Failure running application")
741 return sc
742
743 # Stop
744 sc = app.stop()
745 if not sc.isSuccess():
746 log.error("Failed to stop AppMgr")
747 return sc
748
749 # Finalize
750 sc = app.finalize()
751 if not sc.isSuccess():
752 log.error("Failed to finalize AppMgr")
753 return sc
754
755 # Terminate
756 sc = app.terminate()
757 return sc
758
759
760def load_from_json(json_file, run_params=None, num_threads=1, num_slots=1, ef_files=None):
761 """
762 Load configuration from a Gaudi joboptions JSON file.
763
764 Returns a ConfigRunner with a run() method that executes the configuration
765 using TrigConf::JobOptionsSvc with TYPE="FILE".
766 """
767 with open(json_file, 'r') as f:
768 jocat = json.load(f)
769
770 if jocat.get('filetype') != 'joboptions':
771 raise ValueError(f"Invalid JSON file type: {jocat.get('filetype')}, expected 'joboptions'")
772
773 properties = jocat.get('properties', {})
774 return ConfigRunner.from_json(json_file, run_params, properties,
775 num_threads=num_threads,
776 num_slots=num_slots,
777 ef_files=ef_files)
778
779
780def load_from_database(db_server, smk, l1psk=None, hltpsk=None, run_params=None,
781 num_threads=1, num_slots=1, ef_files=None):
782 """
783 Load configuration from trigger database using the Super Master Key (SMK).
784
785 Returns a ConfigRunner that uses TrigConf::JobOptionsSvc with TYPE="DB"
786 to load configuration directly from the database, same as athenaHLT.
787 """
788 log.info("Loading job options from database %s with SMK %d", db_server, smk)
789 return ConfigRunner.from_database(db_server, smk, l1psk, hltpsk, run_params,
790 num_threads=num_threads,
791 num_slots=num_slots,
792 ef_files=ef_files)
793
794
795
798def arg_sor_time(s) -> str:
799 """Convert possible SOR time arguments to an OWLTime compatible string"""
800 fmt = '%d/%m/%y %H:%M:%S.%f'
801 if s=='now': return dt.now().strftime(fmt)
802 elif s.isdigit(): return dt.fromtimestamp(float(s)/1e9).strftime(fmt)
803 else: return s
804
806 """Convert detector mask to format expected by eformat"""
807 if s=='all':
808 return RunParams.DEFAULT_DETECTOR_MASK
809 dmask = hex(int(s,16)) # Normalize input to hex-string
810 dmask = dmask.lower().replace('0x', '').replace('l', '') # remove markers
811 return '0' * (32 - len(dmask)) + dmask # (pad with 0s)
812
814 """Argument handler for log levels"""
815 lvls = s.split(',')
816 if len(lvls)==1: lvls.append('ERROR')
817 return lvls
818
819
820def check_args(parser, args):
821 """Consistency check of command line arguments (same as athenaHLT.py)"""
822
823 if not args.jobOptions and not args.use_database:
824 parser.error("No job options file specified")
825
826 if (not args.file and not args.dump_config_exit and args.efdf_interface_library == 'TrigDFEmulator'):
827 parser.error("--file is required unless using --dump-config-exit or online efdf-interface-library")
828
829 if args.use_crest and not args.use_database:
830 parser.error("--use-crest requires --use-database")
831
832
833def update_run_params(args, flags):
834 """Update run parameters from IS, file, or conditions DB"""
835
836 # If --online-environment is specified, try to read from Information Service first
837 if getattr(args, 'online_environment', False):
838 log.info("Reading run parameters from Information Service via WEBDAQ")
839 # Pass command-line magnet values as overrides (if provided)
840 # strict=True ensures we fail if IS read fails, rather than falling back to defaults
841 # But if user provided magnet values on command line, those take precedence over IS
842 solenoid_override = getattr(args, 'solenoid_current', None)
843 toroids_override = getattr(args, 'toroids_current', None)
844
845 run_params = get_run_params(from_is=True,
846 partition=getattr(args, 'partition', None),
847 webdaq_base=getattr(args, 'webdaq_base', None),
848 strict=True,
849 solenoid_current_override=solenoid_override,
850 toroids_current_override=toroids_override)
851 # Update args with values from IS (if not already set on command line)
852 if args.run_number is None and run_params.run_number is not None:
853 args.run_number = run_params.run_number
854 log.info("Using run_number=%d from IS", args.run_number)
855 if args.lb_number is None and run_params.lb_number is not None:
856 args.lb_number = run_params.lb_number
857 log.info("Using lb_number=%d from IS", args.lb_number)
858 if args.sor_time is None and run_params.sor_time is not None:
859 args.sor_time = run_params.sor_time
860 log.info("Using sor_time=%s from IS", args.sor_time)
861 if args.detector_mask is None and run_params.detector_mask is not None:
862 args.detector_mask = run_params.detector_mask
863 log.info("Using detector_mask=%s from IS", args.detector_mask)
864 # Update magnet currents from IS (run_params already has command-line overrides if provided)
865 args.solenoid_current = run_params.solenoid_current
866 args.toroids_current = run_params.toroids_current
867
868 if (args.run_number is not None and args.lb_number is None) or (args.run_number is None and args.lb_number is not None):
869 log.error("Both or neither of the options -R (--run-number) and -L (--lb-number) have to be specified")
870
871 # Read metadata from input file (like HLTMPPy/runner.py getRunParamsFromFile)
872 if args.file:
873 from eformat import EventStorage
874 dr = EventStorage.pickDataReader(args.file[0])
875 if args.run_number is None:
876 args.run_number = dr.runNumber()
877 args.lb_number = dr.lumiblockNumber()
878 args.T0_project_tag = dr.projectTag()
879 args.beam_type = dr.beamType()
880 args.beam_energy = dr.beamEnergy()
881 args.trigger_type = dr.triggerType()
882 args.stream = dr.stream()
883 args.lumiblock = dr.lumiblockNumber()
884 args.file_detector_mask = "{:032x}".format(dr.detectorMask())
885 else:
886 args.T0_project_tag = getattr(args, 'T0_project_tag', '')
887 args.beam_type = getattr(args, 'beam_type', 0)
888 args.beam_energy = getattr(args, 'beam_energy', 0)
889 args.trigger_type = getattr(args, 'trigger_type', 0)
890 args.stream = getattr(args, 'stream', '')
891 args.lumiblock = getattr(args, 'lumiblock', 0)
892 args.file_detector_mask = getattr(args, 'file_detector_mask', '00000000000000000000000000000000')
893
894 sor_params = None
895 if (args.sor_time is None or args.detector_mask is None) and args.run_number is not None:
896 sor_params = AthHLT.get_sor_params(args.run_number)
897 log.debug('SOR parameters: %s', sor_params)
898 if sor_params is None:
899 log.error("Run %d does not exist. If you want to use this run-number specify "
900 "remaining run parameters, e.g.: --sor-time=now --detector-mask=all", args.run_number)
901 sys.exit(1)
902
903 if args.sor_time is None and sor_params is not None:
904 args.sor_time = arg_sor_time(str(sor_params['SORTime']))
905
906 if args.detector_mask is None and sor_params is not None:
907 dmask = sor_params['DetectorMask']
908 if args.run_number < AthHLT.CondDB._run2:
909 dmask = hex(dmask)
910 args.detector_mask = arg_detector_mask(dmask)
911
912 if args.dump_config_exit and not args.run_number:
913 args.run_number = 0
914
915 # Apply defaults for magnet currents if not set (offline mode only)
916 # In online mode, magnets must come from IS or command line (handled above)
917 if getattr(args, 'solenoid_current', None) is None:
918 args.solenoid_current = RunParams.DEFAULT_SOLENOID_CURRENT
919 log.debug("Using default solenoid_current=%.1f", args.solenoid_current)
920 if getattr(args, 'toroids_current', None) is None:
921 args.toroids_current = RunParams.DEFAULT_TOROIDS_CURRENT
922 log.debug("Using default toroids_current=%.1f", args.toroids_current)
923
924
925def update_trigconf_keys(args, flags):
926 """Update trigger configuration keys from OKS, COOL, or CREST.
927
928 Priority order:
929 1. Command-line arguments (always take precedence)
930 2. OKS via WEBDAQ (if --online-environment is set)
931 3. CREST (if --use-crest is set)
932 4. COOL (default)
933 """
934
935 if args.smk is None or args.l1psk is None or args.hltpsk is None:
936 trigconf = None
937
938 # Try OKS first if --online-environment is set
939 if getattr(args, 'online_environment', False):
940 log.info("Reading trigger configuration keys from OKS (online environment)")
941 # strict=True ensures we fail if OKS read fails, rather than falling back to COOL
943 partition=getattr(args, 'partition', None),
944 webdaq_base=getattr(args, 'webdaq_base', None),
945 strict=True
946 )
947 log.info("Retrieved trigger keys from OKS: %s", oks_keys)
948
949 # With strict=True, we're guaranteed to have all keys or an exception was raised
950 trigconf = {
951 'SMK': oks_keys.get('SMK'),
952 'LVL1PSK': oks_keys.get('L1PSK'),
953 'HLTPSK': oks_keys.get('HLTPSK')
954 }
955 # Also update db_server if provided by OKS and not set on command line
956 if oks_keys.get('db_alias') and args.db_server == 'TRIGGERDB_RUN3':
957 args.db_server = oks_keys['db_alias']
958 log.info("Using db_server=%s from OKS", args.db_server)
959
960 # Fall back to CREST or COOL only if NOT in online-environment mode
961 if trigconf is None:
962 if args.use_crest:
963 crest_server = args.crest_server or flags.Trigger.crestServer
964 log.info("Reading trigger configuration keys from CREST for run %s", args.run_number)
965 trigconf = AthHLT.get_trigconf_keys_crest(args.run_number, args.lb_number, crest_server)
966 log.info("Retrieved trigger keys from CREST: %s", trigconf)
967 else:
968 log.info("Reading trigger configuration keys from COOL for run %s", args.run_number)
969 trigconf = AthHLT.get_trigconf_keys(args.run_number, args.lb_number)
970 log.info("Retrieved trigger keys from COOL: %s", trigconf)
971
972 try:
973 if args.smk is None:
974 args.smk = trigconf['SMK']
975 log.debug("Using SMK=%d from conditions DB/OKS", args.smk)
976 else:
977 log.debug("Using SMK=%d from command line (ignoring DB/OKS value %s)", args.smk, trigconf.get('SMK'))
978 if args.l1psk is None:
979 args.l1psk = trigconf['LVL1PSK']
980 log.debug("Using L1PSK=%d from conditions DB/OKS", args.l1psk)
981 else:
982 log.debug("Using L1PSK=%d from command line (ignoring DB/OKS value %s)", args.l1psk, trigconf.get('LVL1PSK'))
983 if args.hltpsk is None:
984 args.hltpsk = trigconf['HLTPSK']
985 log.debug("Using HLTPSK=%d from conditions DB/OKS", args.hltpsk)
986 else:
987 log.debug("Using HLTPSK=%d from command line (ignoring DB/OKS value %s)", args.hltpsk, trigconf.get('HLTPSK'))
988 except KeyError:
989 log.error("Cannot read trigger configuration keys from the conditions database for run %d", args.run_number)
990 sys.exit(1)
991 else:
992 log.info("Using trigger configuration keys from command line: SMK=%d, L1PSK=%d, HLTPSK=%d",
993 args.smk, args.l1psk, args.hltpsk)
994
995
996class MyHelp(argparse.Action):
997 """Custom help to hide/show expert groups"""
998 def __call__(self, parser, namespace, values, option_string=None):
999
1000 for g in parser.expert_groups:
1001 for a in g._group_actions:
1002 if values!='all':
1003 a.help = argparse.SUPPRESS
1004
1005 parser.print_help()
1006 if values!='all':
1007 print('\nUse --help=all to show all (expert) options')
1008 sys.exit(0)
1009
1010
1011def main():
1012 parser = argparse.ArgumentParser(prog='athenaEF.py', formatter_class=
1013 lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
1014 usage = '%(prog)s [OPTION]... -f FILE jobOptions',
1015 add_help=False)
1016 parser.expert_groups = [] # Keep list of expert option groups
1017
1018
1019 g = parser.add_argument_group('Options')
1020 g.add_argument('jobOptions', nargs='?', help='job options: CA module (package.module:function), pickle file (.pkl), or JSON file (.json)')
1021 g.add_argument('--threads', metavar='N', type=int, default=1, help='number of threads')
1022 g.add_argument('--concurrent-events', metavar='N', type=int, help='number of concurrent events if different from --threads')
1023 g.add_argument('--log-level', '-l', metavar='LVL', type=arg_log_level, default='INFO,ERROR', help='OutputLevel of athena,POOL')
1024 g.add_argument('--precommand', '-c', metavar='CMD', action='append', default=[],
1025 help='Python commands executed before job options')
1026 g.add_argument('--postcommand', '-C', metavar='CMD', action='append', default=[],
1027 help='Python commands executed after job options')
1028 g.add_argument('--interactive', '-i', action='store_true', help='interactive mode')
1029 g.add_argument('--help', '-h', nargs='?', choices=['all'], action=MyHelp, help='show help')
1030
1031 g = parser.add_argument_group('Input/Output')
1032 g.add_argument('--file', '--filesInput', '-f', action='append', help='input RAW file')
1033 g.add_argument('--save-output', '-o', metavar='FILE', help='output file name')
1034 g.add_argument('--number-of-events', '--evtMax', '-n', metavar='N', type=int, default=-1, help='processes N events (default: -1, means all)')
1035 g.add_argument('--skip-events', '--skipEvents', '-k', metavar='N', type=int, default=0, help='skip N first events')
1036 g.add_argument('--loop-files', action='store_true', help='loop over input files if no more events')
1037 g.add_argument('--efdf-interface-library', metavar='LIB', default='TrigDFEmulator',
1038 help='name of the EFDF interface shared library to load')
1039
1040
1041 g = parser.add_argument_group('Performance and debugging')
1042 g.add_argument('--perfmon', action='store_true', help='enable PerfMon')
1043 g.add_argument('--tcmalloc', action='store_true', default=True, help='use tcmalloc')
1044 g.add_argument('--stdcmalloc', action='store_true', help='use stdcmalloc')
1045 g.add_argument('--stdcmath', action='store_true', help='use stdcmath library')
1046 g.add_argument('--imf', action='store_true', default=True, help='use Intel math library')
1047 g.add_argument('--show-includes', '-s', action='store_true', help='show printout of included files')
1048
1049
1050 g = parser.add_argument_group('Conditions')
1051 g.add_argument('--run-number', '-R', metavar='RUN', type=int,
1052 help='run number (if None, read from first event)')
1053 g.add_argument('--lb-number', '-L', metavar='LBN', type=int,
1054 help='lumiblock number (if None, read from first event)')
1055 g.add_argument('--conditions-run', metavar='RUN', type=int, default=None,
1056 help='reference run number for conditions lookup (use when IS run number has no COOL data)')
1057 g.add_argument('--sor-time', type=arg_sor_time,
1058 help='The Start Of Run time. Three formats are accepted: '
1059 '1) the string "now", for current time; '
1060 '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
1061 '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from the conditions DB')
1062 g.add_argument('--detector-mask', metavar='MASK', type=arg_detector_mask,
1063 help='detector mask (if None, read from the conditions DB), use string "all" to enable all detectors')
1064
1065
1066 g = parser.add_argument_group('Database')
1067 g.add_argument('--use-database', '-b', action='store_true',
1068 help='configure from trigger database using SMK')
1069 g.add_argument('--db-server', metavar='DB', default='TRIGGERDB_RUN3', help='DB server name (alias)')
1070 g.add_argument('--smk', type=int, default=None, help='Super Master Key')
1071 g.add_argument('--l1psk', type=int, default=None, help='L1 prescale key')
1072 g.add_argument('--hltpsk', type=int, default=None, help='HLT prescale key')
1073 g.add_argument('--use-crest', action='store_true', default=False,
1074 help='Use CREST for trigger configuration')
1075 g.add_argument('--crest-server', metavar='URL', default=None,
1076 help='CREST server URL (defaults to flags.Trigger.crestServer)')
1077 g.add_argument('--dump-config', action='store_true', help='Dump joboptions JSON file')
1078 g.add_argument('--dump-config-exit', action='store_true', help='Dump joboptions JSON file and exit')
1079
1080
1081 g = parser.add_argument_group('Magnets')
1082 g.add_argument('--solenoid-current', type=float, default=None,
1083 help='Solenoid current in Amperes (default: nominal current for offline running, required from IS online)')
1084 g.add_argument('--toroids-current', type=float, default=None,
1085 help='Toroids current in Amperes (default: nominal current for offline running, required from IS online)')
1086
1087
1088 g = parser.add_argument_group('Online')
1089 g.add_argument('--online-environment', action='store_true',
1090 help='Enable online environment: read run parameters from IS and trigger '
1091 'configuration keys (SMK, L1PSK, HLTPSK) from OKS via WEBDAQ REST API')
1092 g.add_argument('--partition', metavar='NAME', default=None,
1093 help='TDAQ partition name (defaults to TDAQ_PARTITION environment variable)')
1094 g.add_argument('--webdaq-base', metavar='URL', default=None,
1095 help='WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE environment variable)')
1096
1097
1098 g = parser.add_argument_group('Online Histogramming')
1099 g.add_argument('--oh-monitoring', '-M', action='store_true', default=False,
1100 help='enable online histogram publishing via WebdaqHistSvc')
1101
1102
1103 g = parser.add_argument_group('Expert')
1104 parser.expert_groups.append(g)
1105 (args, unparsed_args) = parser.parse_known_args()
1106 check_args(parser, args)
1107
1108 # set ROOT to batch mode (ATR-21890)
1109 from PyUtils.Helpers import ROOTSetup
1110 ROOTSetup(batch=True)
1111
1112 # Enable ROOT thread safety
1113 import ROOT
1114 ROOT.ROOT.EnableThreadSafety()
1115
1116 # set default OutputLevels and file inclusion
1117 import AthenaCommon.Logging
1118 AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
1119 AthenaCommon.Logging.log.setFormat("%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
1120 if args.show_includes:
1121 from AthenaCommon.Include import include
1122 include.setShowIncludes( True )
1123
1124 # consistency checks for arguments
1125 if not args.concurrent_events:
1126 args.concurrent_events = args.threads
1127
1128 # Update args and set athena flags
1129 from TrigPSC import PscConfig
1130 from TrigPSC.PscDefaultFlags import defaultOnlineFlags
1131
1132 # Get flags with online defaults (same as athenaHLT)
1133 flags = defaultOnlineFlags()
1134
1135 # Enable WebdaqHistSvc for online histogram publishing if requested
1136 if args.oh_monitoring:
1137 flags.Trigger.Online.useOnlineWebdaqHistSvc = True
1138 log.info("Enabled WebdaqHistSvc for online histogram publishing")
1139
1140 # CREST configuration (same as athenaHLT)
1141 log.info("Using CREST for trigger configuration: %s", args.use_crest)
1142 if args.use_crest:
1143 flags.Trigger.useCrest = True
1144 if args.crest_server:
1145 flags.Trigger.crestServer = args.crest_server
1146 else:
1147 args.crest_server = flags.Trigger.crestServer
1148
1149 update_run_params(args, flags)
1150
1151 if args.use_database:
1152 # If HLTPSK was given on the command line OR from OKS (--online-environment),
1153 # we ignore what is stored in COOL and use the specified key directly from the DB.
1154 # This is needed because COOL may point to a different HLTPSK for the forced run number.
1155 PscConfig.forcePSK = (args.hltpsk is not None) or args.online_environment
1156 # Read trigger config keys from COOL/OKS if not specified
1157 update_trigconf_keys(args, flags)
1158
1159 # Fill flags from command line (if not running from DB/JSON)
1160 if not args.use_database and args.jobOptions and not args.jobOptions.endswith('.json'):
1161 PscConfig.unparsedArguments = unparsed_args
1162 for flag_arg in unparsed_args:
1163 flags.fillFromString(flag_arg)
1164
1165 PscConfig.interactive = args.interactive
1166 PscConfig.exitAfterDump = args.dump_config_exit
1167
1168 # NOTE: Do NOT set flags.Input.Files here!
1169 # Like athenaHLT, we keep Input.Files=[] during configuration to ensure the
1170 # configuration is portable and doesn't depend on specific input file metadata.
1171 # Input files are passed to EFInterface for runtime use only.
1172
1173 # Set conditions run number override (for test partitions with fake run numbers)
1174 if args.conditions_run is not None:
1175 log.info("Using conditions from reference run %d (overriding run %s for IOV lookup)",
1176 args.conditions_run, args.run_number)
1177 flags.Input.ConditionsRunNumber = args.conditions_run
1178
1179 # Set number of events
1180 if args.number_of_events > 0:
1181 flags.Exec.MaxEvents = args.number_of_events
1182
1183 # Set skip events
1184 if args.skip_events > 0:
1185 flags.Exec.SkipEvents = args.skip_events
1186
1187 # NOTE: Do NOT set flags.Concurrency.NumThreads or NumConcurrentEvents here.
1188 # Threading is set at runtime via iProperty after configure() - see ConfigRunner.run()
1189
1190 # Enable PerfMon if requested
1191 flags.PerfMon.doFastMonMT = args.perfmon
1192
1193 # Configure EF ByteStream services (mandatory to run without HLTMPPU)
1194 # This provides the data flow interface that would normally come from HLTMPPU
1195 flags.Trigger.Online.useEFByteStreamSvc = True
1196 ef = flags.Trigger.Online.EFInterface
1197 ef_files = args.file if args.file else []
1198 ef.Files = ef_files
1199 ef.OutputFileName = f"athenaEF_{args.save_output}" if args.save_output else ""
1200 ef.LoopFiles = args.loop_files
1201 ef.NumEvents = args.number_of_events
1202 ef.SkipEvents = args.skip_events
1203 ef.RunNumber = args.run_number
1204 ef.T0ProjectTag = args.T0_project_tag
1205 ef.BeamType = args.beam_type
1206 ef.BeamEnergy = args.beam_energy
1207 ef.TriggerType = args.trigger_type
1208 ef.Stream = args.stream
1209 ef.Lumiblock = args.lumiblock
1210 ef.DetMask = args.file_detector_mask
1211 ef.LibraryName = args.efdf_interface_library
1212
1213 # Execute precommands
1214 if args.precommand:
1215 log.info("Executing precommand(s)")
1216 for cmd in args.precommand:
1217 log.info(" %s", cmd)
1218 exec(cmd, globals(), {'flags': flags})
1219
1220 # Determine input type
1221 is_database = args.use_database
1222 is_pickle = False
1223 is_json = False
1224
1225 if not is_database and args.jobOptions:
1226 jobOptions = args.jobOptions
1227 is_pickle = jobOptions.endswith('.pkl')
1228 is_json = jobOptions.endswith('.json')
1229
1230 if is_database:
1231 # Load configuration from trigger database
1232 # Handle CREST vs standard DB access (same as athenaHLT)
1233 if args.use_crest:
1234 crestconn = TriggerCrestUtil.getCrestConnection(args.db_server)
1235 db_alias = f"{args.crest_server}/{crestconn}"
1236 log.info("Loading configuration via CREST from %s with SMK %d", db_alias, args.smk)
1237 else:
1238 db_alias = args.db_server
1239 log.info("Loading configuration from database %s with SMK %d", db_alias, args.smk)
1240
1241 # Get run parameters for prepareForStart
1242 run_params = get_run_params(args).to_dict()
1243 acc = load_from_database(db_alias, args.smk, args.l1psk, args.hltpsk, run_params,
1244 num_threads=args.threads, num_slots=args.concurrent_events,
1245 ef_files=ef_files)
1246 log.info("Configuration loaded from database")
1247
1248 elif is_pickle:
1249 # Load ComponentAccumulator from pickle file
1250 log.info("Loading configuration from pickle file: %s", jobOptions)
1251 with open(jobOptions, 'rb') as f:
1252 acc = pickle.load(f)
1253 log.info("Configuration loaded from pickle")
1254
1255 elif is_json:
1256 # Load configuration from JSON file
1257 log.info("Loading configuration from JSON file: %s", jobOptions)
1258 # Get run parameters for prepareForStart
1259 run_params = get_run_params(args).to_dict()
1260 acc = load_from_json(jobOptions, run_params,
1261 num_threads=args.threads, num_slots=args.concurrent_events,
1262 ef_files=ef_files)
1263 log.info("Configuration loaded from JSON")
1264
1265 else:
1266 # Load from CA module - follow the same pattern as athenaHLT/TrigPSCPythonCASetup:
1267 # 1. Build the full configuration with services
1268 # 2. Dump to JSON file
1269 # 3. Use TrigConf::JobOptionsSvc to load from JSON
1270 # This preserves the ability to use the same JobOptionsSvc as athenaHLT
1271 log.info("Loading CA configuration from: %s", jobOptions)
1272
1273 # Clone and lock flags for services configuration (as done in TrigPSCPythonCASetup)
1274 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
1275 from AthenaConfiguration.MainServicesConfig import addMainSequences
1276 from TrigServices.TriggerUnixStandardSetup import commonServicesCfg
1277 from AthenaConfiguration.ComponentFactory import CompFactory
1278
1279 locked_flags = flags.clone()
1280 locked_flags.lock()
1281
1282 # Create base CA with framework services (like TrigPSCPythonCASetup)
1283 cfg = ComponentAccumulator(CompFactory.AthSequencer("AthMasterSeq", Sequential=True))
1284 cfg.setAppProperty('ExtSvcCreates', False)
1285 cfg.setAppProperty("MessageSvcType", "TrigMessageSvc")
1286 cfg.setAppProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
1287
1288 # Add main sequences and common services (includes TrigServicesCfg)
1289 addMainSequences(locked_flags, cfg)
1290 cfg.merge(commonServicesCfg(locked_flags))
1291
1292 # Now merge user CA config (with unlocked flags, as in TrigPSCPythonCASetup)
1293 cfg_func = AthHLT.getCACfg(jobOptions)
1294 cfg.merge(cfg_func(flags))
1295
1296 # Execute postcommands before dumping (like TrigPSCPythonCASetup)
1297 if args.postcommand:
1298 log.info("Executing postcommand(s)")
1299 for cmd in args.postcommand:
1300 log.info(" %s", cmd)
1301 exec(cmd, globals(), {'flags': flags, 'cfg': cfg})
1302 args.postcommand = [] # Clear so we don't run them again later
1303
1304 # Dump configuration to JSON (like TrigPSCPythonCASetup)
1305 fname = "HLTJobOptions"
1306 log.info("Dumping configuration to %s.pkl and %s.json", fname, fname)
1307 with open(f"{fname}.pkl", "wb") as f:
1308 cfg.store(f)
1309
1310 from TrigConfIO.JsonUtils import create_joboptions_json
1311 create_joboptions_json(f"{fname}.pkl", f"{fname}.json")
1312
1313 # Check for dump-and-exit
1314 if args.dump_config_exit:
1315 log.info("Configuration dumped to %s.json. Exiting...", fname)
1316 sys.exit(0)
1317
1318 # Now load the JSON using JsonConfigRunner with TrigConf::JobOptionsSvc
1319 log.info("Loading configuration from %s.json via TrigConf::JobOptionsSvc", fname)
1320 # Get run parameters for prepareForStart
1321 run_params = get_run_params(args).to_dict()
1322 acc = load_from_json(f"{fname}.json", run_params,
1323 num_threads=args.threads, num_slots=args.concurrent_events,
1324 ef_files=ef_files)
1325
1326 log.info("Configuration loaded with HLT online services")
1327
1328 # Execute postcommands
1329 if args.postcommand:
1330 log.info("Executing postcommand(s)")
1331 for cmd in args.postcommand:
1332 log.info(" %s", cmd)
1333 exec(cmd, globals(), {'flags': flags, 'acc': acc})
1334
1335 # Dump configuration if requested
1336 if args.dump_config or args.dump_config_exit:
1337 fname = "HLTJobOptions"
1338
1339 if is_database:
1340 # For DB mode, fetch properties via Python API
1341 from TrigConfIO.HLTTriggerConfigAccess import HLTJobOptionsAccess
1342 log.info("Fetching configuration from database for dump...")
1343 jo_access = HLTJobOptionsAccess(dbalias=acc.db_server, smkey=acc.smk)
1344 props = jo_access.algorithms()
1345
1346 log.info("Dumping configuration to %s.json", fname)
1347 hlt_json = {'filetype': 'joboptions', 'properties': props}
1348 with open(f"{fname}.json", "w") as f:
1349 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1350
1351 elif is_json:
1352 # For JSON mode, properties were already loaded
1353 props = acc.properties
1354 if props:
1355 log.info("Dumping configuration to %s.json", fname)
1356 hlt_json = {'filetype': 'joboptions', 'properties': props}
1357 with open(f"{fname}.json", "w") as f:
1358 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1359 else:
1360 log.warning("No properties available to dump")
1361
1362 elif is_pickle:
1363 # For pickle-loaded ComponentAccumulator, gather properties
1364 app_props, msg_props, comp_props = acc.gatherProps()
1365 props = {"ApplicationMgr": app_props, "MessageSvc": msg_props}
1366 for comp, name, value in comp_props:
1367 props.setdefault(comp, {})[name] = value
1368
1369 log.info("Dumping configuration to %s.json", fname)
1370 hlt_json = {'filetype': 'joboptions', 'properties': props}
1371 with open(f"{fname}.json", "w") as f:
1372 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1373
1374 # Note: For CA module, dumping is already handled earlier
1375 # before converting to ConfigRunner
1376
1377 if args.dump_config_exit:
1378 log.info("Configuration dumped. Exiting...")
1379 sys.exit(0)
1380
1381 # Run the application directly (like athena.py does)
1382 log.info("Starting Athena execution...")
1383
1384 # Create worker directory structure that HLT services expect
1385 # (normally created by HLTMPPU/PSC). Worker ID 1 means single-worker, non-forked mode
1386 # and must match what we pass to hltUpdateAfterFork(worker_id=1) in ConfigRunner.run()
1387 worker_dir = os.path.join(os.getcwd(), "athenaHLT_workers", "athenaHLT-01")
1388 if not os.path.exists(worker_dir):
1389 log.info("Creating worker directory: %s", worker_dir)
1390 os.makedirs(worker_dir, exist_ok=True)
1391
1392 if args.interactive:
1393 log.info("Interactive mode - call acc.run() to execute")
1394 import code
1395 code.interact(local={'acc': acc, 'flags': flags})
1396 else:
1397 # Run the application
1398 from AthenaCommon import ExitCodes
1399 exitcode = 0
1400 try:
1401 # Pass maxEvents if explicitly set (including -1 for all events)
1402 sc = acc.run(args.number_of_events)
1403 if sc.isFailure():
1404 exitcode = ExitCodes.EXE_ALG_FAILURE
1405 except SystemExit as e:
1406 exitcode = ExitCodes.EXE_ALG_FAILURE if e.code == 1 else e.code
1407 except Exception:
1408 traceback.print_exc()
1409 exitcode = ExitCodes.UNKNOWN_EXCEPTION
1410
1411 log.info('Leaving with code %d: "%s"', exitcode, ExitCodes.what(exitcode))
1412 sys.exit(exitcode)
1413
1414
1415if "__main__" in __name__:
1416 sys.exit(main())
void print(char *figname, TCanvas *c1)
Helper class to call ITrigEventLoopMgr methods from Python.
__init__(self, job_options_type, job_options_path, run_params=None, properties=None, db_server=None, smk=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:530
from_json(cls, json_file, run_params=None, properties=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:556
from_database(cls, db_server, smk, l1psk=None, hltpsk=None, run_params=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:563
run(self, maxEvents=None)
Definition athenaEF.py:574
__call__(self, parser, namespace, values, option_string=None)
Definition athenaEF.py:998
from_is(cls, partition=None, webdaq_base=None, strict=False, solenoid_current_override=None, toroids_current_override=None)
Definition athenaEF.py:154
from_args(cls, args)
Definition athenaEF.py:137
bool DEFAULT_RECORDING_ENABLED
Definition athenaEF.py:81
str DEFAULT_DETECTOR_MASK
Definition athenaEF.py:73
__init__(self, run_number=None, lb_number=None, detector_mask=None, sor_time=None, solenoid_current=None, toroids_current=None, beam_type=None, beam_energy=None, run_type=None, trigger_type=None, recording_enabled=None, conditions_run=None, T0_project_tag='', stream='', lumiblock=0)
Definition athenaEF.py:98
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:312
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:132
load_from_database(db_server, smk, l1psk=None, hltpsk=None, run_params=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:781
arg_detector_mask(s)
Definition athenaEF.py:805
get_trigconf_keys_from_oks(partition=None, webdaq_base=None, strict=False)
Definition athenaEF.py:324
str arg_sor_time(s)
The following arg_* methods are used as custom types in argparse.
Definition athenaEF.py:798
arg_log_level(s)
Definition athenaEF.py:813
update_run_params(args, flags)
Definition athenaEF.py:833
check_args(parser, args)
Definition athenaEF.py:820
update_trigconf_keys(args, flags)
Definition athenaEF.py:925
get_run_params(args=None, from_is=False, partition=None, webdaq_base=None, strict=False, solenoid_current_override=None, toroids_current_override=None)
Definition athenaEF.py:488
load_from_json(json_file, run_params=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:760