ATLAS Offline Software
Loading...
Searching...
No Matches
athenaEF.py
Go to the documentation of this file.
1#!/bin/sh
2# -*- mode: python -*-
3#
4# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
5#
6# athenaEF.py - A modified version of athenaHLT.py that runs the HLT configuration
7# directly without using HLTMPPy/HLTMPPU. It creates the configuration like
8# athenaHLT but executes it like athena.py does.
9#
10"""date"
11
12# defaults
13export USETCMALLOC=1
14export USEIMF=1
15
16# parse command line arguments
17for a in ${@}
18do
19 case "$a" in
20 --stdcmalloc) USETCMALLOC=0;;
21 --tcmalloc) USETCMALLOC=1;;
22 --stdcmath) USEIMF=0;;
23 --imf) USEIMF=1;;
24 --preloadlib*) export ATHENA_ADD_PRELOAD=${a#*=};;
25 --no-ers-signal-handlers) export TDAQ_ERS_NO_SIGNAL_HANDLERS=1;;
26 esac
27done
28
29# Do the actual preloading via LD_PRELOAD
30source `which athena_preload.sh `
31
32# Now resurrect ourselves as python script
33python_path=`which python`
34"exec" "$python_path" "-tt" "$0" "$@";
35
36"""
37
38import sys
39import os
40import argparse
41import json
42import pickle
43import traceback
44from datetime import datetime as dt
45
46from TrigConfStorage.TriggerCrestUtil import TriggerCrestUtil
47
48# Use single-threaded oracle client library to avoid extra
49# threads when forking (see ATR-21890, ATDBOPS-115)
50os.environ["CORAL_ORA_NO_OCI_THREADED"] = "1"
51
52from TrigCommon import AthHLT
53from AthenaCommon.Logging import logging
54log = logging.getLogger('athenaEF')
55
56
57# =============================================================================
58# Run Parameters Configuration
59# =============================================================================
60# Default values for run parameters used by prepareForStart.
61# These can be overridden by command-line arguments or fetched from IS.
62
64 """
65 Container for run parameters needed by HltEventLoopMgr::prepareForStart().
66
67 This class centralizes all run parameter defaults or their retrieval from IS.
68 """
69
70 # Default values
71 DEFAULT_RUN_NUMBER = 0
72 DEFAULT_LB_NUMBER = 0
73 DEFAULT_DETECTOR_MASK = 'f' * 32 # All detectors enabled
74 DEFAULT_SOR_TIME = None # Will use 'now' if not set
75 DEFAULT_SOLENOID_CURRENT = 7730.0 # (nominal)
76 DEFAULT_TOROIDS_CURRENT = 20400.0 # (nominal)
77 DEFAULT_BEAM_TYPE = 0
78 DEFAULT_BEAM_ENERGY = 0
79 DEFAULT_RUN_TYPE = "Physics"
80 DEFAULT_TRIGGER_TYPE = 0
81 DEFAULT_RECORDING_ENABLED = False
82
83 def __init__(self,
84 run_number=None,
85 lb_number=None,
86 detector_mask=None,
87 sor_time=None,
88 solenoid_current=None,
89 toroids_current=None,
90 beam_type=None,
91 beam_energy=None,
92 run_type=None,
93 trigger_type=None,
94 recording_enabled=None,
95 conditions_run=None):
96 """Initialize run parameters with defaults for any unspecified values."""
97 self.run_number = run_number if run_number is not None else self.DEFAULT_RUN_NUMBER
98 self.lb_number = lb_number if lb_number is not None else self.DEFAULT_LB_NUMBER
99 self.detector_mask = detector_mask if detector_mask is not None else self.DEFAULT_DETECTOR_MASK
100 self.sor_time = sor_time if sor_time is not None else dt.now().strftime('%d/%m/%y %H:%M:%S.%f')
101 self.solenoid_current = solenoid_current
102 self.toroids_current = toroids_current
103 self.beam_type = beam_type if beam_type is not None else self.DEFAULT_BEAM_TYPE
104 self.beam_energy = beam_energy if beam_energy is not None else self.DEFAULT_BEAM_ENERGY
105 self.run_type = run_type if run_type is not None else self.DEFAULT_RUN_TYPE
106 self.trigger_type = trigger_type if trigger_type is not None else self.DEFAULT_TRIGGER_TYPE
107 self.recording_enabled = recording_enabled if recording_enabled is not None else self.DEFAULT_RECORDING_ENABLED
108 self.conditions_run = conditions_run # Reference run for conditions lookup (None = use run_number)
109
110 def to_dict(self):
111 """Return run parameters as a dictionary for prepareForStart."""
112 return {
113 'run_number': self.run_number,
114 'lb_number': self.lb_number,
115 'detector_mask': self.detector_mask,
116 'sor_time': self.sor_time,
117 'solenoid_current': self.solenoid_current,
118 'toroids_current': self.toroids_current,
119 'beam_type': self.beam_type,
120 'beam_energy': self.beam_energy,
121 'run_type': self.run_type,
122 'trigger_type': self.trigger_type,
123 'recording_enabled': self.recording_enabled,
124 'conditions_run': self.conditions_run,
125 }
126
127 @classmethod
128 def from_args(cls, args):
129 """Create RunParams from argparse args, using defaults for unset values."""
130 return cls(
131 run_number=args.run_number,
132 lb_number=args.lb_number,
133 detector_mask=args.detector_mask,
134 sor_time=args.sor_time,
135 solenoid_current=getattr(args, 'solenoid_current', None),
136 toroids_current=getattr(args, 'toroids_current', None),
137 conditions_run=getattr(args, 'conditions_run', None),
138 )
139
140 @classmethod
141 def from_is(cls, partition=None, webdaq_base=None, strict=False,
142 solenoid_current_override=None, toroids_current_override=None):
143 """
144 Create RunParams by reading from IS via the WEBDAQ REST API.
145
146 This uses the webis_server REST API to fetch run parameters, avoiding
147 direct dependencies on TDAQ libraries. The API endpoint is determined by:
148 1. The webdaq_base parameter if provided
149 2. The TDAQ_WEBDAQ_BASE environment variable
150
151 The IS objects accessed are:
152 - RunParams.RunParams: run_number, det_mask, timeSOR, trigger_type, etc.
153 - Magnets.Magnets: SolenoidCurrent, ToroidsCurrent
154
155 Args:
156 partition: The partition name (default: from TDAQ_PARTITION env var)
157 webdaq_base: Base URL for webis_server (default: from TDAQ_WEBDAQ_BASE env var)
158 strict: If True, raise an exception if IS read fails (for --online-environment)
159 solenoid_current_override: If provided, skip IS fetch for solenoid (command-line override)
160 toroids_current_override: If provided, skip IS fetch for toroids (command-line override)
161
162 Returns:
163 RunParams instance with values from IS, or defaults if unavailable
164
165 Raises:
166 RuntimeError: If strict=True and IS read fails
167 """
168 import requests
169
170 # Determine the base URL
171 if webdaq_base is None:
172 webdaq_base = os.environ.get('TDAQ_WEBDAQ_BASE')
173
174 if not webdaq_base:
175 msg = "TDAQ_WEBDAQ_BASE not set, cannot read from IS"
176 if strict:
177 raise RuntimeError(msg + " (required for --online-environment)")
178 log.warning(msg + ". Using defaults.")
179 return cls()
180
181 # Determine partition
182 if partition is None:
183 partition = os.environ.get('TDAQ_PARTITION', 'ATLAS')
184
185 log.info("Reading run parameters from IS via WEBDAQ: %s (partition=%s)", webdaq_base, partition)
186
187 params = {}
188
189 # Fetch RunParams from IS
190 # API: GET /info/current/{partition}/is/{server}/{server}.{name}?format=compact
191 # Response format: [name, type, timestamp, data] - we need element [3]
192 try:
193 url = f"{webdaq_base}/info/current/{partition}/is/RunParams/RunParams.RunParams?format=compact"
194 log.debug("Fetching RunParams from: %s", url)
195
196 response = requests.get(url, timeout=10)
197 if response.status_code == 200:
198 response_data = response.json()
199 log.debug("RunParams response from IS: %s", response_data)
200
201 # Response is [name, type, timestamp, data]
202 if isinstance(response_data, list) and len(response_data) >= 4:
203 runparams = response_data[3]
204 else:
205 runparams = response_data
206
207 log.debug("RunParams data: %s", runparams)
208
209 # Map IS fields to our RunParams fields
210 if 'run_number' in runparams:
211 params['run_number'] = int(runparams['run_number'])
212 if 'lumiblock' in runparams:
213 params['lb_number'] = int(runparams['lumiblock'])
214 if 'det_mask' in runparams:
215 params['detector_mask'] = runparams['det_mask']
216 if 'timeSOR' in runparams:
217 sor_time = runparams['timeSOR']
218 # Ensure microseconds are present (TrigSORFromPtreeHelper expects format with .%f)
219 if '.' not in sor_time:
220 sor_time += '.000000'
221 params['sor_time'] = sor_time
222 if 'beam_type' in runparams:
223 params['beam_type'] = int(runparams['beam_type'])
224 if 'beam_energy' in runparams:
225 params['beam_energy'] = int(runparams['beam_energy'])
226 if 'run_type' in runparams:
227 params['run_type'] = runparams['run_type']
228 if 'trigger_type' in runparams:
229 params['trigger_type'] = int(runparams['trigger_type'])
230 if 'recording_enabled' in runparams:
231 params['recording_enabled'] = runparams['recording_enabled'] in ('1', 'true', 'True', True, 1)
232
233 log.info("Got run parameters from IS: run=%s, lb=%s",
234 params.get('run_number'), params.get('lb_number'))
235 else:
236 msg = f"Failed to fetch RunParams from IS: HTTP {response.status_code}"
237 if strict:
238 raise RuntimeError(msg + " (required for --online-environment)")
239 log.warning(msg)
240
241 except requests.exceptions.RequestException as e:
242 msg = f"Error fetching RunParams from IS: {e}"
243 if strict:
244 raise RuntimeError(msg + " (required for --online-environment)")
245 log.warning(msg)
246 except (ValueError, KeyError) as e:
247 msg = f"Error parsing RunParams from IS: {e}"
248 if strict:
249 raise RuntimeError(msg + " (required for --online-environment)")
250 log.warning(msg)
251
252 # Fetch Magnets from IS
253 # In strict mode (online environment), magnets are required unless provided via command line
254 # If command-line overrides are provided, use those instead of fetching from IS
255 have_solenoid_override = solenoid_current_override is not None
256 have_toroids_override = toroids_current_override is not None
257
258 if have_solenoid_override:
259 params['solenoid_current'] = solenoid_current_override
260 log.info("Using solenoid_current=%.1f from command line override", solenoid_current_override)
261 if have_toroids_override:
262 params['toroids_current'] = toroids_current_override
263 log.info("Using toroids_current=%.1f from command line override", toroids_current_override)
264
265 # Only fetch from IS if we need at least one value
266 if not (have_solenoid_override and have_toroids_override):
267 try:
268 url = f"{webdaq_base}/info/current/{partition}/is/Magnets/Magnets.Magnets?format=compact"
269 log.debug("Fetching Magnets from: %s", url)
270
271 response = requests.get(url, timeout=10)
272 if response.status_code == 200:
273 response_data = response.json()
274 log.debug("Magnets response from IS: %s", response_data)
275
276 magnets = response_data[3] if isinstance(response_data, list) and len(response_data) >= 4 else response_data
277 log.debug("Magnets data: %s", magnets)
278
279 # Magnets structure: { "SolenoidCurrent": {"value": ..., "ts": ...},
280 # "ToroidsCurrent": {"value": ..., "ts": ...} }
281 if not have_solenoid_override:
282 params['solenoid_current'] = float(magnets['SolenoidCurrent']['value'])
283 if not have_toroids_override:
284 params['toroids_current'] = float(magnets['ToroidsCurrent']['value'])
285
286 log.info("Got magnet currents from IS: solenoid=%s, toroids=%s",
287 params.get('solenoid_current'), params.get('toroids_current'))
288 elif strict:
289 raise RuntimeError(f"Magnets not available from IS: HTTP {response.status_code} "
290 "(required for --online-environment, use --solenoid-current and --toroids-current to override)")
291 else:
292 log.debug("Magnets not available from IS: HTTP %d", response.status_code)
293
294 except requests.exceptions.RequestException as e:
295 if strict:
296 raise RuntimeError(f"Error fetching Magnets from IS: {e} "
297 "(required for --online-environment, use --solenoid-current and --toroids-current to override)")
298 log.debug("Error fetching Magnets from IS: %s", e)
299 except (ValueError, KeyError, TypeError) as e:
300 if strict:
301 raise RuntimeError(f"Error parsing Magnets from IS: {e} "
302 "(required for --online-environment, use --solenoid-current and --toroids-current to override)")
303 log.debug("Error parsing Magnets from IS: %s", e)
304
305 # In strict mode, verify we got at least run_number from IS
306 if strict and 'run_number' not in params:
307 raise RuntimeError("Failed to get run_number from IS (required for --online-environment)")
308
309 return cls(**params)
310
311
312def get_trigconf_keys_from_oks(partition=None, webdaq_base=None, strict=False):
313 """
314 Read trigger configuration keys (SMK, L1PSK, HLTPSK) and DB info from OKS via WEBDAQ REST API.
315
316 This reads the keys from the partition's TriggerConfiguration object and its
317 related L1TriggerConfiguration and TriggerDBConnection objects.
318
319 OKS Structure:
320 - Partition -> TriggerConfiguration -> L1TriggerConfiguration (Lvl1PrescaleKey)
321 - Partition -> TriggerConfiguration -> TriggerDBConnection (SuperMasterKey)
322 - Partition -> TriggerConfiguration -> HLTImplementationDB (hltPrescaleKey)
323
324 Args:
325 partition: The partition name (default: from TDAQ_PARTITION env var)
326 webdaq_base: Base URL for webis_server (default: from TDAQ_WEBDAQ_BASE env var)
327 strict: If True, raise an exception if OKS read fails (for --online-environment)
328
329 Returns:
330 dict with keys: SMK, L1PSK, HLTPSK, db_alias (values may be None if not found)
331
332 Raises:
333 RuntimeError: If strict=True and OKS read fails
334 """
335 import requests
336
337 # Determine the base URL
338 if webdaq_base is None:
339 webdaq_base = os.environ.get('TDAQ_WEBDAQ_BASE')
340
341 if not webdaq_base:
342 msg = "TDAQ_WEBDAQ_BASE not set, cannot read from OKS"
343 if strict:
344 raise RuntimeError(msg + " (required for --online-environment)")
345 log.warning(msg)
346 return {'SMK': None, 'L1PSK': None, 'HLTPSK': None, 'db_alias': None}
347
348 # Determine partition
349 if partition is None:
350 partition = os.environ.get('TDAQ_PARTITION', 'ATLAS')
351
352 log.info("Reading trigger configuration keys from OKS via WEBDAQ: %s (partition=%s)",
353 webdaq_base, partition)
354
355 result = {'SMK': None, 'L1PSK': None, 'HLTPSK': None, 'db_alias': None}
356
357 def extract_oks_data(response_json):
358 """
359 Extract data from OKS compact format: [name, type, attributes, relationships]
360 Returns tuple (attributes_dict, relationships_dict)
361 """
362 if isinstance(response_json, list) and len(response_json) >= 4:
363 return response_json[2], response_json[3] # attributes, relationships
364 elif isinstance(response_json, list) and len(response_json) >= 3:
365 return response_json[2], {} # attributes only
366 return response_json, {} # fallback
367
368 def get_ref_id(ref):
369 """Extract object ID from a relationship reference."""
370 if isinstance(ref, list) and len(ref) >= 2:
371 return ref[0] # [id, class] format
372 elif isinstance(ref, dict) and 'id' in ref:
373 return ref['id']
374 elif isinstance(ref, str):
375 return ref
376 return None
377
378 # OKS API: GET /info/current/{partition}/oks/{class}/{name}?format=compact
379 # Response format: [name, type, attributes, relationships]
380 # - attributes: dict of simple values (strings, ints, etc.)
381 # - relationships: dict of references to other objects
382 try:
383 url = f"{webdaq_base}/info/current/{partition}/oks/Partition/{partition}?format=compact"
384 log.debug("Fetching Partition from OKS: %s", url)
385
386 response = requests.get(url, timeout=10)
387 if response.status_code == 200:
388 part_attrs, part_rels = extract_oks_data(response.json())
389 log.debug("Partition attributes: %s", part_attrs)
390 log.debug("Partition relationships: %s", part_rels)
391
392 # Get TriggerConfiguration reference from relationships
393 trig_conf_id = None
394 if 'TriggerConfiguration' in part_rels:
395 trig_conf_id = get_ref_id(part_rels['TriggerConfiguration'])
396
397 if trig_conf_id:
398 log.debug("TriggerConfiguration ID: %s", trig_conf_id)
399
400 # Get TriggerConfiguration object
401 url = f"{webdaq_base}/info/current/{partition}/oks/TriggerConfiguration/{trig_conf_id}?format=compact"
402 response = requests.get(url, timeout=10)
403 if response.status_code == 200:
404 trig_attrs, trig_rels = extract_oks_data(response.json())
405 log.debug("TriggerConfiguration attributes: %s", trig_attrs)
406 log.debug("TriggerConfiguration relationships: %s", trig_rels)
407
408 # Get L1TriggerConfiguration for L1PSK (relationship 'l1')
409 if 'l1' in trig_rels:
410 l1_id = get_ref_id(trig_rels['l1'])
411 if l1_id:
412 url = f"{webdaq_base}/info/current/{partition}/oks/L1TriggerConfiguration/{l1_id}?format=compact"
413 resp = requests.get(url, timeout=10)
414 if resp.status_code == 200:
415 l1_attrs, _ = extract_oks_data(resp.json())
416 log.debug("L1TriggerConfiguration attributes: %s", l1_attrs)
417 if 'Lvl1PrescaleKey' in l1_attrs:
418 result['L1PSK'] = int(l1_attrs['Lvl1PrescaleKey'])
419 log.info("Got L1PSK=%d from OKS", result['L1PSK'])
420
421 # Get TriggerDBConnection for SMK and db_alias (relationship 'TriggerDBConnection')
422 if 'TriggerDBConnection' in trig_rels:
423 db_id = get_ref_id(trig_rels['TriggerDBConnection'])
424 if db_id:
425 url = f"{webdaq_base}/info/current/{partition}/oks/TriggerDBConnection/{db_id}?format=compact"
426 resp = requests.get(url, timeout=10)
427 if resp.status_code == 200:
428 db_attrs, _ = extract_oks_data(resp.json())
429 log.debug("TriggerDBConnection attributes: %s", db_attrs)
430 if 'SuperMasterKey' in db_attrs:
431 result['SMK'] = int(db_attrs['SuperMasterKey'])
432 log.info("Got SMK=%d from OKS", result['SMK'])
433 if 'Alias' in db_attrs:
434 result['db_alias'] = db_attrs['Alias']
435 log.info("Got db_alias=%s from OKS", result['db_alias'])
436
437 # Get HLTImplementationDB for HLTPSK (relationship 'hlt')
438 if 'hlt' in trig_rels:
439 hlt_id = get_ref_id(trig_rels['hlt'])
440 if hlt_id:
441 url = f"{webdaq_base}/info/current/{partition}/oks/HLTImplementationDB/{hlt_id}?format=compact"
442 resp = requests.get(url, timeout=10)
443 if resp.status_code == 200:
444 hlt_attrs, _ = extract_oks_data(resp.json())
445 log.debug("HLTImplementationDB attributes: %s", hlt_attrs)
446 if 'hltPrescaleKey' in hlt_attrs:
447 result['HLTPSK'] = int(hlt_attrs['hltPrescaleKey'])
448 log.info("Got HLTPSK=%d from OKS", result['HLTPSK'])
449 else:
450 msg = f"Failed to fetch Partition from OKS: HTTP {response.status_code}"
451 if strict:
452 raise RuntimeError(msg + " (required for --online-environment)")
453 log.warning(msg)
454
455 except requests.exceptions.RequestException as e:
456 msg = f"Error fetching trigger keys from OKS: {e}"
457 if strict:
458 raise RuntimeError(msg + " (required for --online-environment)")
459 log.warning(msg)
460 except (ValueError, KeyError, TypeError) as e:
461 msg = f"Error parsing trigger keys from OKS: {e}"
462 if strict:
463 raise RuntimeError(msg + " (required for --online-environment)")
464 log.warning(msg)
465
466 # In strict mode, verify we got the required keys from OKS
467 if strict:
468 missing = [k for k in ['SMK', 'L1PSK', 'HLTPSK'] if result.get(k) is None]
469 if missing:
470 raise RuntimeError(f"Failed to get {', '.join(missing)} from OKS (required for --online-environment)")
471
472 return result
473
474
475def get_run_params(args=None, from_is=False, partition=None, webdaq_base=None, strict=False,
476 solenoid_current_override=None, toroids_current_override=None):
477 """
478 Get run parameters from the appropriate source.
479
480 This is the main entry point for obtaining run parameters. It provides
481 a single place to modify when adding new sources (like WEBDAQ).
482
483 Args:
484 args: argparse Namespace with command-line arguments (optional)
485 from_is: If True, try to read from WEBDAQ first
486 partition: Partition name for IS access (defaults to TDAQ_PARTITION env var)
487 webdaq_base: WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE env var)
488 strict: If True, raise an exception if IS read fails (for --online-environment)
489 solenoid_current_override: Command-line override for solenoid current
490 toroids_current_override: Command-line override for toroids current
491
492 Returns:
493 RunParams instance
494
495 Raises:
496 RuntimeError: If strict=True and IS read fails
497 """
498 if from_is:
499 return RunParams.from_is(partition=partition, webdaq_base=webdaq_base, strict=strict,
500 solenoid_current_override=solenoid_current_override,
501 toroids_current_override=toroids_current_override)
502 elif args is not None:
503 return RunParams.from_args(args)
504 else:
505 return RunParams()
506
507
509 """
510 Runner class that executes Gaudi configuration from JSON file or database.
511 Uses TrigConf::JobOptionsSvc with TYPE="FILE" or TYPE="DB" to load configuration.
512 Same approach used by PSC (Psc.cxx) - it sets JobOptionsType and
513 JobOptionsPath on the ApplicationMgr, and TrigConf::JobOptionsSvc handles both
514 FILE and DB modes transparently.
515 """
516 def __init__(self, job_options_type, job_options_path, run_params=None,
517 properties=None, db_server=None, smk=None,
518 num_threads=1, num_slots=1, ef_files=None):
519 """
520 Args:
521 job_options_type: "FILE" or "DB"
522 job_options_path: JSON file path (for FILE) or DB connection string (for DB)
523 run_params: Run parameters dict for prepareForStart
524 properties: Pre-loaded properties dict (optional, for FILE mode)
525 db_server: DB server alias (for store() in DB mode)
526 smk: Super Master Key (for store() in DB mode)
527 num_threads: Number of threads for AvalancheSchedulerSvc.ThreadPoolSize
528 num_slots: Number of event slots for EventDataSvc.NSlots
529 ef_files: List of input files for EFInterfaceSvc
530 """
531 self.job_options_type = job_options_type
532 self.job_options_path = job_options_path
533 self.run_params = run_params or {}
534 self.properties = properties
535 self.db_server = db_server # For store() in DB mode
536 self.smk = smk # For store() in DB mode
537 self.num_threads = num_threads
538 self.num_slots = num_slots
539 self.ef_files = ef_files or [] # Input files for EFInterfaceSvc
540 self._app = None
541
542 @classmethod
543 def from_json(cls, json_file, run_params=None, properties=None,
544 num_threads=1, num_slots=1, ef_files=None):
545 """Create runner for JSON file (TYPE=FILE)"""
546 return cls("FILE", os.path.abspath(json_file), run_params, properties,
547 num_threads=num_threads, num_slots=num_slots, ef_files=ef_files)
548
549 @classmethod
550 def from_database(cls, db_server, smk, l1psk=None, hltpsk=None, run_params=None,
551 num_threads=1, num_slots=1, ef_files=None):
552 """Create runner for database (TYPE=DB)"""
553 # Build the DB connection string: server=X;smkey=Y;lvl1key=Z;hltkey=W
554 db_path = f"server={db_server};smkey={smk}"
555 if l1psk is not None:
556 db_path += f";lvl1key={l1psk}"
557 if hltpsk is not None:
558 db_path += f";hltkey={hltpsk}"
559 return cls("DB", db_path, run_params, db_server=db_server, smk=smk,
560 num_threads=num_threads, num_slots=num_slots, ef_files=ef_files)
561
562 def run(self, maxEvents=None):
563 """
564 This follows the same pattern as PSC (Psc.cxx):
565 1. Create ApplicationMgr via BootstrapHelper
566 2. Set JobOptionsSvcType, JobOptionsType, JobOptionsPath
567 3. configure() -> initialize() -> prepareForStart() -> start() ->
568 hltUpdateAfterFork() -> run() -> stop() -> finalize() -> terminate()
569 """
570 from Gaudi.Main import BootstrapHelper
571
572 # For FILE mode, load properties from JSON if not already provided
573 if self.job_options_type == "FILE" and self.properties is None:
574 with open(self.job_options_path, 'r') as f:
575 jocat = json.load(f)
576 self.properties = jocat.get('properties', {})
577
578 bsh = BootstrapHelper()
579 app = bsh.createApplicationMgr()
580 self._app = app
581
582 # For FILE mode, set ApplicationMgr properties from JSON before configure
583 if self.job_options_type == "FILE" and self.properties:
584 app_props = self.properties.get('ApplicationMgr', {})
585 for k, v in app_props.items():
586 if k not in ('JobOptionsSvcType', 'JobOptionsType', 'JobOptionsPath'):
587 log.debug("Setting ApplicationMgr.%s = %s", k, v)
588 app.setProperty(k, str(v) if not isinstance(v, str) else v)
589
590 # Set JobOptionsSvc properties like PSC does in Psc.cxx
591 log.info("Configuring TrigConf::JobOptionsSvc with TYPE=%s, PATH=%s",
593 app.setProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
594 app.setProperty("JobOptionsType", self.job_options_type)
595 app.setProperty("JobOptionsPath", self.job_options_path)
596
597 # Configure the application - TrigConf::JobOptionsSvc will load from FILE or DB
598 app.configure()
599
600 # Override EvtMax AFTER configure() only if explicitly specified by user
601 # Otherwise use whatever value is in the DB
602 if maxEvents is not None:
603 log.info("Setting EvtMax=%d (overriding DB value)", maxEvents)
604 app.setProperty('EvtMax', str(maxEvents))
605
606 # All property overrides below use iProperty and must be done after configure()
607 # but before initialize() - this is the same pattern as PSC (Psc.cxx)
608 from GaudiPython.Bindings import iProperty
609
610 # Override EFInterfaceSvc.NumEvents if user specified it (overrides DB value)
611 if maxEvents is not None:
612 log.info("Setting EFInterfaceSvc.NumEvents=%d (overriding DB value)", maxEvents)
613 iProperty("EFInterfaceSvc").NumEvents = maxEvents
614
615 # Set threading configuration
616 log.info("Setting threading: ThreadPoolSize=%d, NSlots=%d", self.num_threads, self.num_slots)
617 iProperty("AvalancheSchedulerSvc").ThreadPoolSize = self.num_threads
618 iProperty("EventDataSvc").NSlots = self.num_slots
619
620 # Set input files for EFInterfaceSvc (overrides what's in DB/JSON config)
621 if self.ef_files:
622 log.info("Setting EFInterfaceSvc.Files = %s", self.ef_files)
623 iProperty("EFInterfaceSvc").Files = self.ef_files
624
625 # If HLT PSK is set on command line, read it from DB instead of COOL (ATR-25974)
626 # This is the same logic as TrigPSCPythonDbSetup.py
627 from TrigPSC import PscConfig
628 if PscConfig.forcePSK:
629 log.info("PscConfig.forcePSK is set - configuring HLTPrescaleCondAlg to read from DB instead of COOL")
630 iProperty("HLTPrescaleCondAlg").Source = "DB"
631
632 # Set forceRunNumber on HltEventLoopMgr if conditions_run is specified
633 # This overrides the run number used for IOV lookup in conditions loading
634 conditions_run = self.run_params.get('conditions_run')
635 if conditions_run is not None:
636 log.info("Setting HltEventLoopMgr.forceRunNumber=%d for conditions lookup", conditions_run)
637 iProperty("HltEventLoopMgr").forceRunNumber = conditions_run
638
639 # Initialize
640 sc = app.initialize()
641 if not sc.isSuccess():
642 log.error("Failed to initialize AppMgr")
643 return sc
644
645 # Initialize TrigServicesHelper for lifecycle calls (prepareForStart, prepareForRun, hltUpdateAfterFork)
646 try:
647 from TrigServices.TrigServicesHelper import TrigServicesHelper
648 helper = TrigServicesHelper()
649 except ImportError as e:
650 log.error("TrigServicesHelper not available: %s", e)
651 log.error("Cannot proceed without TrigServicesHelper - required for HLTEventLoopMgr lifecycle")
652 raise RuntimeError("TrigServicesHelper not available") from e
653
654 # Call prepareForStart to set up ByteStreamMetadata (like PSC does)
655 try:
656 run_number = self.run_params['run_number']
657 det_mask = self.run_params['detector_mask']
658 sor_time = self.run_params['sor_time']
659 solenoid_current = self.run_params['solenoid_current']
660 toroids_current = self.run_params['toroids_current']
661
662 log.info("Calling prepareForStart with run=%d, det_mask=0x%s, sor_time=%s",
663 run_number, det_mask, sor_time)
664
665 success = helper.prepareForStart(
666 run_number=run_number,
667 det_mask=det_mask,
668 sor_time=sor_time,
669 solenoid_current=solenoid_current,
670 toroids_current=toroids_current
671 )
672 if not success:
673 log.error("prepareForStart failed")
674 raise RuntimeError("prepareForStart failed")
675 log.info("prepareForStart completed successfully")
676 except Exception as e:
677 log.error("Error calling prepareForStart: %s", e)
678 traceback.print_exc()
679 raise
680
681 # Start
682 sc = app.start()
683 if not sc.isSuccess():
684 log.error("Failed to start AppMgr")
685 return sc
686
687 # prepareForRun initializes COOL folder helper - must be called after start()
688 # which fires the start incident
689 try:
690 log.info("Calling prepareForRun to initialize COOL folder helper")
691 success = helper.prepareForRun()
692 if not success:
693 log.error("prepareForRun failed")
694 raise RuntimeError("prepareForRun failed")
695 log.info("prepareForRun completed successfully")
696 except Exception as e:
697 log.error("Error calling prepareForRun: %s", e)
698 traceback.print_exc()
699 raise
700
701 # hltUpdateAfterFork initializes the scheduler (like PSC does after fork)
702 # worker_id=1 for single-worker, non-forked mode
703 try:
704 log.info("Calling hltUpdateAfterFork to initialize scheduler (worker_id=1)")
705 success = helper.hltUpdateAfterFork(worker_id=1)
706 if not success:
707 log.error("hltUpdateAfterFork failed")
708 raise RuntimeError("hltUpdateAfterFork failed")
709 log.info("hltUpdateAfterFork completed successfully")
710 except Exception as e:
711 log.error("Error calling hltUpdateAfterFork: %s", e)
712 traceback.print_exc()
713 raise
714
715 # Run the event loop
716 # Note: Python signal handlers won't work during C++ execution.
717 nevt = maxEvents if maxEvents is not None else -1
718 sc = app.run(nevt)
719
720 if not sc.isSuccess():
721 log.error("Failure running application")
722 return sc
723
724 # Stop
725 sc = app.stop()
726 if not sc.isSuccess():
727 log.error("Failed to stop AppMgr")
728 return sc
729
730 # Finalize
731 sc = app.finalize()
732 if not sc.isSuccess():
733 log.error("Failed to finalize AppMgr")
734 return sc
735
736 # Terminate
737 sc = app.terminate()
738 return sc
739
740
741def load_from_json(json_file, run_params=None, num_threads=1, num_slots=1, ef_files=None):
742 """
743 Load configuration from a Gaudi joboptions JSON file.
744
745 Returns a ConfigRunner with a run() method that executes the configuration
746 using TrigConf::JobOptionsSvc with TYPE="FILE".
747 """
748 with open(json_file, 'r') as f:
749 jocat = json.load(f)
750
751 if jocat.get('filetype') != 'joboptions':
752 raise ValueError(f"Invalid JSON file type: {jocat.get('filetype')}, expected 'joboptions'")
753
754 properties = jocat.get('properties', {})
755 return ConfigRunner.from_json(json_file, run_params, properties,
756 num_threads=num_threads,
757 num_slots=num_slots,
758 ef_files=ef_files)
759
760
761def load_from_database(db_server, smk, l1psk=None, hltpsk=None, run_params=None,
762 num_threads=1, num_slots=1, ef_files=None):
763 """
764 Load configuration from trigger database using the Super Master Key (SMK).
765
766 Returns a ConfigRunner that uses TrigConf::JobOptionsSvc with TYPE="DB"
767 to load configuration directly from the database, same as athenaHLT.
768 """
769 log.info("Loading job options from database %s with SMK %d", db_server, smk)
770 return ConfigRunner.from_database(db_server, smk, l1psk, hltpsk, run_params,
771 num_threads=num_threads,
772 num_slots=num_slots,
773 ef_files=ef_files)
774
775
776
779def arg_sor_time(s) -> str:
780 """Convert possible SOR time arguments to an OWLTime compatible string"""
781 fmt = '%d/%m/%y %H:%M:%S.%f'
782 if s=='now': return dt.now().strftime(fmt)
783 elif s.isdigit(): return dt.fromtimestamp(float(s)/1e9).strftime(fmt)
784 else: return s
785
787 """Convert detector mask to format expected by eformat"""
788 if s=='all':
789 return RunParams.DEFAULT_DETECTOR_MASK
790 dmask = hex(int(s,16)) # Normalize input to hex-string
791 dmask = dmask.lower().replace('0x', '').replace('l', '') # remove markers
792 return '0' * (32 - len(dmask)) + dmask # (pad with 0s)
793
795 """Argument handler for log levels"""
796 lvls = s.split(',')
797 if len(lvls)==1: lvls.append('ERROR')
798 return lvls
799
800
801def check_args(parser, args):
802 """Consistency check of command line arguments (same as athenaHLT.py)"""
803
804 if not args.jobOptions and not args.use_database:
805 parser.error("No job options file specified")
806
807 if not args.file and not args.dump_config_exit:
808 parser.error("--file is required unless using --dump-config-exit")
809
810 if args.use_crest and not args.use_database:
811 parser.error("--use-crest requires --use-database")
812
813
814def update_run_params(args, flags):
815 """Update run parameters from IS, file, or conditions DB"""
816
817 # If --online-environment is specified, try to read from Information Service first
818 if getattr(args, 'online_environment', False):
819 log.info("Reading run parameters from Information Service via WEBDAQ")
820 # Pass command-line magnet values as overrides (if provided)
821 # strict=True ensures we fail if IS read fails, rather than falling back to defaults
822 # But if user provided magnet values on command line, those take precedence over IS
823 solenoid_override = getattr(args, 'solenoid_current', None)
824 toroids_override = getattr(args, 'toroids_current', None)
825
826 run_params = get_run_params(from_is=True,
827 partition=getattr(args, 'partition', None),
828 webdaq_base=getattr(args, 'webdaq_base', None),
829 strict=True,
830 solenoid_current_override=solenoid_override,
831 toroids_current_override=toroids_override)
832 # Update args with values from IS (if not already set on command line)
833 if args.run_number is None and run_params.run_number is not None:
834 args.run_number = run_params.run_number
835 log.info("Using run_number=%d from IS", args.run_number)
836 if args.lb_number is None and run_params.lb_number is not None:
837 args.lb_number = run_params.lb_number
838 log.info("Using lb_number=%d from IS", args.lb_number)
839 if args.sor_time is None and run_params.sor_time is not None:
840 args.sor_time = run_params.sor_time
841 log.info("Using sor_time=%s from IS", args.sor_time)
842 if args.detector_mask is None and run_params.detector_mask is not None:
843 args.detector_mask = run_params.detector_mask
844 log.info("Using detector_mask=%s from IS", args.detector_mask)
845 # Update magnet currents from IS (run_params already has command-line overrides if provided)
846 args.solenoid_current = run_params.solenoid_current
847 args.toroids_current = run_params.toroids_current
848
849 if (args.run_number is not None and args.lb_number is None) or (args.run_number is None and args.lb_number is not None):
850 log.error("Both or neither of the options -R (--run-number) and -L (--lb-number) have to be specified")
851
852 if args.run_number is None and args.file:
853 from eformat import EventStorage
854 dr = EventStorage.pickDataReader(args.file[0])
855 args.run_number = dr.runNumber()
856 args.lb_number = dr.lumiblockNumber()
857
858 sor_params = None
859 if (args.sor_time is None or args.detector_mask is None) and args.run_number is not None:
860 sor_params = AthHLT.get_sor_params(args.run_number)
861 log.debug('SOR parameters: %s', sor_params)
862 if sor_params is None:
863 log.error("Run %d does not exist. If you want to use this run-number specify "
864 "remaining run parameters, e.g.: --sor-time=now --detector-mask=all", args.run_number)
865 sys.exit(1)
866
867 if args.sor_time is None and sor_params is not None:
868 args.sor_time = arg_sor_time(str(sor_params['SORTime']))
869
870 if args.detector_mask is None and sor_params is not None:
871 dmask = sor_params['DetectorMask']
872 if args.run_number < AthHLT.CondDB._run2:
873 dmask = hex(dmask)
874 args.detector_mask = arg_detector_mask(dmask)
875
876 # Apply defaults for magnet currents if not set (offline mode only)
877 # In online mode, magnets must come from IS or command line (handled above)
878 if getattr(args, 'solenoid_current', None) is None:
879 args.solenoid_current = RunParams.DEFAULT_SOLENOID_CURRENT
880 log.debug("Using default solenoid_current=%.1f", args.solenoid_current)
881 if getattr(args, 'toroids_current', None) is None:
882 args.toroids_current = RunParams.DEFAULT_TOROIDS_CURRENT
883 log.debug("Using default toroids_current=%.1f", args.toroids_current)
884
885
886def update_trigconf_keys(args, flags):
887 """Update trigger configuration keys from OKS, COOL, or CREST.
888
889 Priority order:
890 1. Command-line arguments (always take precedence)
891 2. OKS via WEBDAQ (if --online-environment is set)
892 3. CREST (if --use-crest is set)
893 4. COOL (default)
894 """
895
896 if args.smk is None or args.l1psk is None or args.hltpsk is None:
897 trigconf = None
898
899 # Try OKS first if --online-environment is set
900 if getattr(args, 'online_environment', False):
901 log.info("Reading trigger configuration keys from OKS (online environment)")
902 # strict=True ensures we fail if OKS read fails, rather than falling back to COOL
904 partition=getattr(args, 'partition', None),
905 webdaq_base=getattr(args, 'webdaq_base', None),
906 strict=True
907 )
908 log.info("Retrieved trigger keys from OKS: %s", oks_keys)
909
910 # With strict=True, we're guaranteed to have all keys or an exception was raised
911 trigconf = {
912 'SMK': oks_keys.get('SMK'),
913 'LVL1PSK': oks_keys.get('L1PSK'),
914 'HLTPSK': oks_keys.get('HLTPSK')
915 }
916 # Also update db_server if provided by OKS and not set on command line
917 if oks_keys.get('db_alias') and args.db_server == 'TRIGGERDB_RUN3':
918 args.db_server = oks_keys['db_alias']
919 log.info("Using db_server=%s from OKS", args.db_server)
920
921 # Fall back to CREST or COOL only if NOT in online-environment mode
922 if trigconf is None:
923 if args.use_crest:
924 crest_server = args.crest_server or flags.Trigger.crestServer
925 log.info("Reading trigger configuration keys from CREST for run %s", args.run_number)
926 trigconf = AthHLT.get_trigconf_keys_crest(args.run_number, args.lb_number, crest_server)
927 log.info("Retrieved trigger keys from CREST: %s", trigconf)
928 else:
929 log.info("Reading trigger configuration keys from COOL for run %s", args.run_number)
930 trigconf = AthHLT.get_trigconf_keys(args.run_number, args.lb_number)
931 log.info("Retrieved trigger keys from COOL: %s", trigconf)
932
933 try:
934 if args.smk is None:
935 args.smk = trigconf['SMK']
936 log.debug("Using SMK=%d from conditions DB/OKS", args.smk)
937 else:
938 log.debug("Using SMK=%d from command line (ignoring DB/OKS value %s)", args.smk, trigconf.get('SMK'))
939 if args.l1psk is None:
940 args.l1psk = trigconf['LVL1PSK']
941 log.debug("Using L1PSK=%d from conditions DB/OKS", args.l1psk)
942 else:
943 log.debug("Using L1PSK=%d from command line (ignoring DB/OKS value %s)", args.l1psk, trigconf.get('LVL1PSK'))
944 if args.hltpsk is None:
945 args.hltpsk = trigconf['HLTPSK']
946 log.debug("Using HLTPSK=%d from conditions DB/OKS", args.hltpsk)
947 else:
948 log.debug("Using HLTPSK=%d from command line (ignoring DB/OKS value %s)", args.hltpsk, trigconf.get('HLTPSK'))
949 except KeyError:
950 log.error("Cannot read trigger configuration keys from the conditions database for run %d", args.run_number)
951 sys.exit(1)
952 else:
953 log.info("Using trigger configuration keys from command line: SMK=%d, L1PSK=%d, HLTPSK=%d",
954 args.smk, args.l1psk, args.hltpsk)
955
956
957class MyHelp(argparse.Action):
958 """Custom help to hide/show expert groups"""
959 def __call__(self, parser, namespace, values, option_string=None):
960
961 for g in parser.expert_groups:
962 for a in g._group_actions:
963 if values!='all':
964 a.help = argparse.SUPPRESS
965
966 parser.print_help()
967 if values!='all':
968 print('\nUse --help=all to show all (expert) options')
969 sys.exit(0)
970
971
972def main():
973 parser = argparse.ArgumentParser(prog='athenaEF.py', formatter_class=
974 lambda prog : argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=32, width=100),
975 usage = '%(prog)s [OPTION]... -f FILE jobOptions',
976 add_help=False)
977 parser.expert_groups = [] # Keep list of expert option groups
978
979
980 g = parser.add_argument_group('Options')
981 g.add_argument('jobOptions', nargs='?', help='job options: CA module (package.module:function), pickle file (.pkl), or JSON file (.json)')
982 g.add_argument('--threads', metavar='N', type=int, default=1, help='number of threads')
983 g.add_argument('--concurrent-events', metavar='N', type=int, help='number of concurrent events if different from --threads')
984 g.add_argument('--log-level', '-l', metavar='LVL', type=arg_log_level, default='INFO,ERROR', help='OutputLevel of athena,POOL')
985 g.add_argument('--precommand', '-c', metavar='CMD', action='append', default=[],
986 help='Python commands executed before job options')
987 g.add_argument('--postcommand', '-C', metavar='CMD', action='append', default=[],
988 help='Python commands executed after job options')
989 g.add_argument('--interactive', '-i', action='store_true', help='interactive mode')
990 g.add_argument('--help', '-h', nargs='?', choices=['all'], action=MyHelp, help='show help')
991
992 g = parser.add_argument_group('Input/Output')
993 g.add_argument('--file', '--filesInput', '-f', action='append', help='input RAW file')
994 g.add_argument('--save-output', '-o', metavar='FILE', help='output file name')
995 g.add_argument('--number-of-events', '--evtMax', '-n', metavar='N', type=int, default=-1, help='processes N events (default: -1, means all)')
996 g.add_argument('--skip-events', '--skipEvents', '-k', metavar='N', type=int, default=0, help='skip N first events')
997 g.add_argument('--loop-files', action='store_true', help='loop over input files if no more events')
998
999
1000 g = parser.add_argument_group('Performance and debugging')
1001 g.add_argument('--perfmon', action='store_true', help='enable PerfMon')
1002 g.add_argument('--tcmalloc', action='store_true', default=True, help='use tcmalloc')
1003 g.add_argument('--stdcmalloc', action='store_true', help='use stdcmalloc')
1004 g.add_argument('--stdcmath', action='store_true', help='use stdcmath library')
1005 g.add_argument('--imf', action='store_true', default=True, help='use Intel math library')
1006 g.add_argument('--show-includes', '-s', action='store_true', help='show printout of included files')
1007
1008
1009 g = parser.add_argument_group('Conditions')
1010 g.add_argument('--run-number', '-R', metavar='RUN', type=int,
1011 help='run number (if None, read from first event)')
1012 g.add_argument('--lb-number', '-L', metavar='LBN', type=int,
1013 help='lumiblock number (if None, read from first event)')
1014 g.add_argument('--conditions-run', metavar='RUN', type=int, default=None,
1015 help='reference run number for conditions lookup (use when IS run number has no COOL data)')
1016 g.add_argument('--sor-time', type=arg_sor_time,
1017 help='The Start Of Run time. Three formats are accepted: '
1018 '1) the string "now", for current time; '
1019 '2) the number of nanoseconds since epoch (e.g. 1386355338658000000 or int(time.time() * 1e9)); '
1020 '3) human-readable "20/11/18 17:40:42.3043". If not specified the sor-time is read from the conditions DB')
1021 g.add_argument('--detector-mask', metavar='MASK', type=arg_detector_mask,
1022 help='detector mask (if None, read from the conditions DB), use string "all" to enable all detectors')
1023
1024
1025 g = parser.add_argument_group('Database')
1026 g.add_argument('--use-database', '-b', action='store_true',
1027 help='configure from trigger database using SMK')
1028 g.add_argument('--db-server', metavar='DB', default='TRIGGERDB_RUN3', help='DB server name (alias)')
1029 g.add_argument('--smk', type=int, default=None, help='Super Master Key')
1030 g.add_argument('--l1psk', type=int, default=None, help='L1 prescale key')
1031 g.add_argument('--hltpsk', type=int, default=None, help='HLT prescale key')
1032 g.add_argument('--use-crest', action='store_true', default=False,
1033 help='Use CREST for trigger configuration')
1034 g.add_argument('--crest-server', metavar='URL', default=None,
1035 help='CREST server URL (defaults to flags.Trigger.crestServer)')
1036 g.add_argument('--dump-config', action='store_true', help='Dump joboptions JSON file')
1037 g.add_argument('--dump-config-exit', action='store_true', help='Dump joboptions JSON file and exit')
1038
1039
1040 g = parser.add_argument_group('Magnets')
1041 g.add_argument('--solenoid-current', type=float, default=None,
1042 help='Solenoid current in Amperes (default: nominal current for offline running, required from IS online)')
1043 g.add_argument('--toroids-current', type=float, default=None,
1044 help='Toroids current in Amperes (default: nominal current for offline running, required from IS online)')
1045
1046
1047 g = parser.add_argument_group('Online')
1048 g.add_argument('--online-environment', action='store_true',
1049 help='Enable online environment: read run parameters from IS and trigger '
1050 'configuration keys (SMK, L1PSK, HLTPSK) from OKS via WEBDAQ REST API')
1051 g.add_argument('--partition', metavar='NAME', default=None,
1052 help='TDAQ partition name (defaults to TDAQ_PARTITION environment variable)')
1053 g.add_argument('--webdaq-base', metavar='URL', default=None,
1054 help='WEBDAQ base URL (defaults to TDAQ_WEBDAQ_BASE environment variable)')
1055
1056
1057 g = parser.add_argument_group('Online Histogramming')
1058 g.add_argument('--oh-monitoring', '-M', action='store_true', default=False,
1059 help='enable online histogram publishing via WebdaqHistSvc')
1060
1061
1062 g = parser.add_argument_group('Expert')
1063 parser.expert_groups.append(g)
1064 (args, unparsed_args) = parser.parse_known_args()
1065 check_args(parser, args)
1066
1067 # set ROOT to batch mode (ATR-21890)
1068 from ROOT import gROOT
1069 gROOT.SetBatch()
1070
1071 # set default OutputLevels and file inclusion
1072 import AthenaCommon.Logging
1073 AthenaCommon.Logging.log.setLevel(getattr(logging, args.log_level[0]))
1074 AthenaCommon.Logging.log.setFormat("%(asctime)s Py:%(name)-31s %(levelname)7s %(message)s")
1075 if args.show_includes:
1076 from AthenaCommon.Include import include
1077 include.setShowIncludes( True )
1078
1079 # consistency checks for arguments
1080 if not args.concurrent_events:
1081 args.concurrent_events = args.threads
1082
1083 # Update args and set athena flags
1084 from TrigPSC import PscConfig
1085 from TrigPSC.PscDefaultFlags import defaultOnlineFlags
1086
1087 # Get flags with online defaults (same as athenaHLT)
1088 flags = defaultOnlineFlags()
1089
1090 # Enable WebdaqHistSvc for online histogram publishing if requested
1091 if args.oh_monitoring:
1092 flags.Trigger.Online.useOnlineWebdaqHistSvc = True
1093 log.info("Enabled WebdaqHistSvc for online histogram publishing")
1094
1095 # CREST configuration (same as athenaHLT)
1096 log.info("Using CREST for trigger configuration: %s", args.use_crest)
1097 if args.use_crest:
1098 flags.Trigger.useCrest = True
1099 if args.crest_server:
1100 flags.Trigger.crestServer = args.crest_server
1101 else:
1102 args.crest_server = flags.Trigger.crestServer
1103
1104 update_run_params(args, flags)
1105
1106 if args.use_database:
1107 # If HLTPSK was given on the command line OR from OKS (--online-environment),
1108 # we ignore what is stored in COOL and use the specified key directly from the DB.
1109 # This is needed because COOL may point to a different HLTPSK for the forced run number.
1110 PscConfig.forcePSK = (args.hltpsk is not None) or args.online_environment
1111 # Read trigger config keys from COOL/OKS if not specified
1112 update_trigconf_keys(args, flags)
1113
1114 # Fill flags from command line (if not running from DB/JSON)
1115 if not args.use_database and args.jobOptions and not args.jobOptions.endswith('.json'):
1116 PscConfig.unparsedArguments = unparsed_args
1117 for flag_arg in unparsed_args:
1118 flags.fillFromString(flag_arg)
1119
1120 PscConfig.interactive = args.interactive
1121 PscConfig.exitAfterDump = args.dump_config_exit
1122
1123 # NOTE: Do NOT set flags.Input.Files here!
1124 # Like athenaHLT, we keep Input.Files=[] during configuration to ensure the
1125 # configuration is portable and doesn't depend on specific input file metadata.
1126 # Input files are passed to EFInterface for runtime use only.
1127
1128 # Set conditions run number override (for test partitions with fake run numbers)
1129 if args.conditions_run is not None:
1130 log.info("Using conditions from reference run %d (overriding run %s for IOV lookup)",
1131 args.conditions_run, args.run_number)
1132 flags.Input.ConditionsRunNumber = args.conditions_run
1133
1134 # Set number of events
1135 if args.number_of_events > 0:
1136 flags.Exec.MaxEvents = args.number_of_events
1137
1138 # Set skip events
1139 if args.skip_events > 0:
1140 flags.Exec.SkipEvents = args.skip_events
1141
1142 # NOTE: Do NOT set flags.Concurrency.NumThreads or NumConcurrentEvents here.
1143 # Threading is set at runtime via iProperty after configure() - see ConfigRunner.run()
1144
1145 # Enable PerfMon if requested
1146 flags.PerfMon.doFastMonMT = args.perfmon
1147
1148 # Configure EF ByteStream services (mandatory to run without HLTMPPU)
1149 # This provides the data flow interface that would normally come from HLTMPPU
1150 flags.Trigger.Online.useEFByteStreamSvc = True
1151 ef = flags.Trigger.Online.EFInterface
1152 ef_files = args.file if args.file else []
1153 ef.Files = ef_files
1154 ef.LoopFiles = args.loop_files
1155 ef.NumEvents = args.number_of_events
1156 ef.SkipEvents = args.skip_events
1157 ef.RunNumber = args.run_number
1158
1159 # Execute precommands
1160 if args.precommand:
1161 log.info("Executing precommand(s)")
1162 for cmd in args.precommand:
1163 log.info(" %s", cmd)
1164 exec(cmd, globals(), {'flags': flags})
1165
1166 # Determine input type
1167 is_database = args.use_database
1168 is_pickle = False
1169 is_json = False
1170
1171 if not is_database and args.jobOptions:
1172 jobOptions = args.jobOptions
1173 is_pickle = jobOptions.endswith('.pkl')
1174 is_json = jobOptions.endswith('.json')
1175
1176 if is_database:
1177 # Load configuration from trigger database
1178 # Handle CREST vs standard DB access (same as athenaHLT)
1179 if args.use_crest:
1180 crestconn = TriggerCrestUtil.getCrestConnection(args.db_server)
1181 db_alias = f"{args.crest_server}/{crestconn}"
1182 log.info("Loading configuration via CREST from %s with SMK %d", db_alias, args.smk)
1183 else:
1184 db_alias = args.db_server
1185 log.info("Loading configuration from database %s with SMK %d", db_alias, args.smk)
1186
1187 # Get run parameters for prepareForStart
1188 run_params = get_run_params(args).to_dict()
1189 acc = load_from_database(db_alias, args.smk, args.l1psk, args.hltpsk, run_params,
1190 num_threads=args.threads, num_slots=args.concurrent_events,
1191 ef_files=ef_files)
1192 log.info("Configuration loaded from database")
1193
1194 elif is_pickle:
1195 # Load ComponentAccumulator from pickle file
1196 log.info("Loading configuration from pickle file: %s", jobOptions)
1197 with open(jobOptions, 'rb') as f:
1198 acc = pickle.load(f)
1199 log.info("Configuration loaded from pickle")
1200
1201 elif is_json:
1202 # Load configuration from JSON file
1203 log.info("Loading configuration from JSON file: %s", jobOptions)
1204 # Get run parameters for prepareForStart
1205 run_params = get_run_params(args).to_dict()
1206 acc = load_from_json(jobOptions, run_params,
1207 num_threads=args.threads, num_slots=args.concurrent_events,
1208 ef_files=ef_files)
1209 log.info("Configuration loaded from JSON")
1210
1211 else:
1212 # Load from CA module - follow the same pattern as athenaHLT/TrigPSCPythonCASetup:
1213 # 1. Build the full configuration with services
1214 # 2. Dump to JSON file
1215 # 3. Use TrigConf::JobOptionsSvc to load from JSON
1216 # This preserves the ability to use the same JobOptionsSvc as athenaHLT
1217 log.info("Loading CA configuration from: %s", jobOptions)
1218
1219 # Clone and lock flags for services configuration (as done in TrigPSCPythonCASetup)
1220 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
1221 from AthenaConfiguration.MainServicesConfig import addMainSequences
1222 from TrigServices.TriggerUnixStandardSetup import commonServicesCfg
1223 from AthenaConfiguration.ComponentFactory import CompFactory
1224
1225 locked_flags = flags.clone()
1226 locked_flags.lock()
1227
1228 # Create base CA with framework services (like TrigPSCPythonCASetup)
1229 cfg = ComponentAccumulator(CompFactory.AthSequencer("AthMasterSeq", Sequential=True))
1230 cfg.setAppProperty('ExtSvcCreates', False)
1231 cfg.setAppProperty("MessageSvcType", "TrigMessageSvc")
1232 cfg.setAppProperty("JobOptionsSvcType", "TrigConf::JobOptionsSvc")
1233
1234 # Add main sequences and common services (includes TrigServicesCfg)
1235 addMainSequences(locked_flags, cfg)
1236 cfg.merge(commonServicesCfg(locked_flags))
1237
1238 # Now merge user CA config (with unlocked flags, as in TrigPSCPythonCASetup)
1239 cfg_func = AthHLT.getCACfg(jobOptions)
1240 cfg.merge(cfg_func(flags))
1241
1242 # Execute postcommands before dumping (like TrigPSCPythonCASetup)
1243 if args.postcommand:
1244 log.info("Executing postcommand(s)")
1245 for cmd in args.postcommand:
1246 log.info(" %s", cmd)
1247 exec(cmd, globals(), {'flags': flags, 'cfg': cfg})
1248 args.postcommand = [] # Clear so we don't run them again later
1249
1250 # Dump configuration to JSON (like TrigPSCPythonCASetup)
1251 fname = "HLTJobOptions_EF"
1252 log.info("Dumping configuration to %s.pkl and %s.json", fname, fname)
1253 with open(f"{fname}.pkl", "wb") as f:
1254 cfg.store(f)
1255
1256 from TrigConfIO.JsonUtils import create_joboptions_json
1257 create_joboptions_json(f"{fname}.pkl", f"{fname}.json")
1258
1259 # Check for dump-and-exit
1260 if args.dump_config_exit:
1261 log.info("Configuration dumped to %s.json. Exiting...", fname)
1262 sys.exit(0)
1263
1264 # Now load the JSON using JsonConfigRunner with TrigConf::JobOptionsSvc
1265 log.info("Loading configuration from %s.json via TrigConf::JobOptionsSvc", fname)
1266 # Get run parameters for prepareForStart
1267 run_params = get_run_params(args).to_dict()
1268 acc = load_from_json(f"{fname}.json", run_params,
1269 num_threads=args.threads, num_slots=args.concurrent_events,
1270 ef_files=ef_files)
1271
1272 log.info("Configuration loaded with HLT online services")
1273
1274 # Execute postcommands
1275 if args.postcommand:
1276 log.info("Executing postcommand(s)")
1277 for cmd in args.postcommand:
1278 log.info(" %s", cmd)
1279 exec(cmd, globals(), {'flags': flags, 'acc': acc})
1280
1281 # Dump configuration if requested
1282 if args.dump_config or args.dump_config_exit:
1283 fname = "HLTJobOptions_EF"
1284
1285 if is_database:
1286 # For DB mode, fetch properties via Python API
1287 from TrigConfIO.HLTTriggerConfigAccess import HLTJobOptionsAccess
1288 log.info("Fetching configuration from database for dump...")
1289 jo_access = HLTJobOptionsAccess(dbalias=acc.db_server, smkey=acc.smk)
1290 props = jo_access.algorithms()
1291
1292 log.info("Dumping configuration to %s.json", fname)
1293 hlt_json = {'filetype': 'joboptions', 'properties': props}
1294 with open(f"{fname}.json", "w") as f:
1295 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1296
1297 elif is_json:
1298 # For JSON mode, properties were already loaded
1299 props = acc.properties
1300 if props:
1301 log.info("Dumping configuration to %s.json", fname)
1302 hlt_json = {'filetype': 'joboptions', 'properties': props}
1303 with open(f"{fname}.json", "w") as f:
1304 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1305 else:
1306 log.warning("No properties available to dump")
1307
1308 elif is_pickle:
1309 # For pickle-loaded ComponentAccumulator, gather properties
1310 app_props, msg_props, comp_props = acc.gatherProps()
1311 props = {"ApplicationMgr": app_props, "MessageSvc": msg_props}
1312 for comp, name, value in comp_props:
1313 props.setdefault(comp, {})[name] = value
1314
1315 log.info("Dumping configuration to %s.json", fname)
1316 hlt_json = {'filetype': 'joboptions', 'properties': props}
1317 with open(f"{fname}.json", "w") as f:
1318 json.dump(hlt_json, f, indent=4, sort_keys=True, ensure_ascii=True)
1319
1320 # Note: For CA module, dumping is already handled earlier
1321 # before converting to ConfigRunner
1322
1323 if args.dump_config_exit:
1324 log.info("Configuration dumped. Exiting...")
1325 sys.exit(0)
1326
1327 # Run the application directly (like athena.py does)
1328 log.info("Starting Athena execution...")
1329
1330 # Create worker directory structure that HLT services expect
1331 # (normally created by HLTMPPU/PSC). Worker ID 1 means single-worker, non-forked mode
1332 # and must match what we pass to hltUpdateAfterFork(worker_id=1) in ConfigRunner.run()
1333 worker_dir = os.path.join(os.getcwd(), "athenaHLT_workers", "athenaHLT-01")
1334 if not os.path.exists(worker_dir):
1335 log.info("Creating worker directory: %s", worker_dir)
1336 os.makedirs(worker_dir, exist_ok=True)
1337
1338 if args.interactive:
1339 log.info("Interactive mode - call acc.run() to execute")
1340 import code
1341 code.interact(local={'acc': acc, 'flags': flags})
1342 else:
1343 # Run the application
1344 from AthenaCommon import ExitCodes
1345 exitcode = 0
1346 try:
1347 # Pass maxEvents if explicitly set (including -1 for all events)
1348 sc = acc.run(args.number_of_events)
1349 if sc.isFailure():
1350 exitcode = ExitCodes.EXE_ALG_FAILURE
1351 except SystemExit as e:
1352 exitcode = ExitCodes.EXE_ALG_FAILURE if e.code == 1 else e.code
1353 except Exception:
1354 traceback.print_exc()
1355 exitcode = ExitCodes.UNKNOWN_EXCEPTION
1356
1357 log.info('Leaving with code %d: "%s"', exitcode, ExitCodes.what(exitcode))
1358 sys.exit(exitcode)
1359
1360
1361if "__main__" in __name__:
1362 sys.exit(main())
void print(char *figname, TCanvas *c1)
Helper class to call ITrigEventLoopMgr methods from Python.
__init__(self, job_options_type, job_options_path, run_params=None, properties=None, db_server=None, smk=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:518
from_json(cls, json_file, run_params=None, properties=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:544
from_database(cls, db_server, smk, l1psk=None, hltpsk=None, run_params=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:551
run(self, maxEvents=None)
Definition athenaEF.py:562
__call__(self, parser, namespace, values, option_string=None)
Definition athenaEF.py:959
from_is(cls, partition=None, webdaq_base=None, strict=False, solenoid_current_override=None, toroids_current_override=None)
Definition athenaEF.py:142
__init__(self, run_number=None, lb_number=None, detector_mask=None, sor_time=None, solenoid_current=None, toroids_current=None, beam_type=None, beam_energy=None, run_type=None, trigger_type=None, recording_enabled=None, conditions_run=None)
Definition athenaEF.py:95
from_args(cls, args)
Definition athenaEF.py:128
bool DEFAULT_RECORDING_ENABLED
Definition athenaEF.py:81
str DEFAULT_DETECTOR_MASK
Definition athenaEF.py:73
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
load_from_database(db_server, smk, l1psk=None, hltpsk=None, run_params=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:762
arg_detector_mask(s)
Definition athenaEF.py:786
get_trigconf_keys_from_oks(partition=None, webdaq_base=None, strict=False)
Definition athenaEF.py:312
str arg_sor_time(s)
The following arg_* methods are used as custom types in argparse.
Definition athenaEF.py:779
arg_log_level(s)
Definition athenaEF.py:794
update_run_params(args, flags)
Definition athenaEF.py:814
check_args(parser, args)
Definition athenaEF.py:801
update_trigconf_keys(args, flags)
Definition athenaEF.py:886
get_run_params(args=None, from_is=False, partition=None, webdaq_base=None, strict=False, solenoid_current_override=None, toroids_current_override=None)
Definition athenaEF.py:476
load_from_json(json_file, run_params=None, num_threads=1, num_slots=1, ef_files=None)
Definition athenaEF.py:741