ATLAS Offline Software
Loading...
Searching...
No Matches
Psc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
12
13#include "TrigPSC/Psc.h"
14#include "TrigPSC/PscIssues.h"
15#include "TrigPSC/Utils.h"
18
19#include "eformat/ROBFragment.h"
20#include "ers/ers.h"
21
22#undef _POSIX_C_SOURCE
23#undef _XOPEN_SOURCE
24#include <Python.h>
25
26// Include files for Gaudi
27#include "GaudiKernel/Bootstrap.h"
28#include "GaudiKernel/IAppMgrUI.h"
29#include "GaudiKernel/IEventProcessor.h"
30#include "GaudiKernel/IProperty.h"
31#include "GaudiKernel/IService.h"
32#include "Gaudi/Interfaces/IOptionsSvc.h"
33#include "GaudiKernel/IAlgManager.h"
34#include "GaudiKernel/IAlgorithm.h"
35#include "GaudiKernel/ServiceHandle.h"
36#include "Gaudi/Property.h"
37#include "GaudiKernel/System.h"
38
39#include "TROOT.h"
40
41#include <sstream>
42#include <algorithm>
43#include <vector>
44#include <locale>
45#include <codecvt>
46
47#include <boost/property_tree/xml_parser.hpp>
48
49using namespace boost::property_tree;
50
51namespace
52{
54 std::string to_string(const ptree& p)
55 {
56 using T = ptree::key_type;
57 std::ostringstream oss;
58 xml_parser::write_xml(oss, p,
59 xml_parser::xml_writer_make_settings<T>(' ', 2));
60 return oss.str();
61 }
62}
63
64//--------------------------------------------------------------------------------
65// Destructor
66//--------------------------------------------------------------------------------
68{
69 if (m_pesaAppMgr) {
70 m_pesaAppMgr->release() ;
71 }
72}
73
74//--------------------------------------------------------------------------------
75// Configure transition
76//--------------------------------------------------------------------------------
77
78bool psc::Psc::configure(const ptree& config)
79{
80 psc::Utils::ScopeTimer timer("Psc configuration");
81
82 ROOT::EnableThreadSafety();
83
84 ERS_DEBUG(1, "psc::Psc::configure ptree:\n" << to_string(config));
85 try
86 {
87 m_config = std::make_unique<Config>(config);
88 }
89 catch(const std::exception& e)
90 {
91 ERS_PSC_ERROR("Cannot configure. " << e.what());
92 return false;
93 }
94
95 // Print PSC configuration
96 ERS_LOG("---> Dump of config cache: \n" << m_config->dumpOptions() );
97 ERS_LOG("---> Pesa JobOptions file is = " << m_config->getOption("JOBOPTIONSPATH") );
98
99 // -----------------------------
100 // Create C++ ApplicationMgr
101 // -----------------------------
102 ERS_DEBUG(1,"---> Create Pesa Application Manager");
103 m_pesaAppMgr = Gaudi::createApplicationMgr();
104 ERS_DEBUG(1,"m_pesaAppMgr = " << m_pesaAppMgr);
105
106 if( !m_pesaAppMgr ) {
107 ERS_PSC_ERROR("Error while creating the ApplicationMgr");
108 return false;
109 }
110
111 // configure Pesa
112 ERS_DEBUG(1,"---> Configure Pesa Property Manager");
113 SmartIF<IProperty> propMgr ( m_pesaAppMgr );
114 if( !propMgr.isValid() ) {
115 ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
116 return false;
117 }
118
119 // Set the JobOptionsSvcType
120 StatusCode sc;
121 sc = propMgr->setProperty( "JobOptionsSvcType", m_config->getOption("JOBOPTIONSSVCTYPE"));
122 if (sc.isFailure()) {
123 ERS_PSC_ERROR("Can not set Property = JobOptionsSvcType");
124 return false;
125 }
126
127 // Set the JobOptionsType
128 sc = propMgr->setProperty( "JobOptionsType", m_config->getOption("JOBOPTIONSTYPE"));
129 if (sc.isFailure()) {
130 ERS_PSC_ERROR("Can not set Property = JobOptionsType");
131 return false;
132 }
133
134 // Set JobOptionsPath
135 sc = propMgr->setProperty( "JobOptionsPath", m_config->getOption("JOBOPTIONSPATH") );
136 if (sc.isFailure()) {
137 ERS_PSC_ERROR("Can not set Property = JobOptionsPath");
138 return false;
139 }
140
141 // Select the MessageSvc Type
142 sc = propMgr->setProperty( "MessageSvcType", m_config->getOption("MESSAGESVCTYPE"));
143 if (sc.isFailure()) {
144 ERS_PSC_ERROR("Can not set Property = MessageSvcType");
145 return false;
146 }
147
148 // Need to setup the python interpreter if:
149 // 1) run from job options
150 // 2) run from DB but user specified a pre/postcommand
151 // 3) run from DB but user specified a logLevel via "-l" in athenaMT/PT
152 bool jobOptConfig, needPython;
153 if ( m_config->getOption("JOBOPTIONSTYPE") == "NONE" ) {
154 jobOptConfig = needPython = true;
155 }
156 else if ( m_config->getOption("JOBOPTIONSTYPE") == "DB" ||
157 m_config->getOption("JOBOPTIONSTYPE") == "FILE") {
158 jobOptConfig = needPython = false;
159 if ( (m_config->getOption("PRECOMMAND")!="") || (m_config->getOption("POSTCOMMAND")!="") ) {
160 needPython = true;
161 }
162 if (m_config->didUserSetLogLevel()) {
163 needPython = true;
164 }
165 }
166 else {
167 ERS_PSC_ERROR("Unsupported configuration method \"" << m_config->getOption("JOBOPTIONSTYPE") << "\"");
168 return false;
169 }
170
171
172 if ( jobOptConfig || needPython ) {
173 // ----------------------------
174 // Configuration with .py files
175 // ----------------------------
176
177 // Try to initialize the Python interpreter
178 if ( ! Py_IsInitialized() ) {
179 ERS_DEBUG(1,"Initializing Python interpreter");
180
181 PyConfig thisConfig;
182 PyConfig_InitPythonConfig (&thisConfig);
183 PyStatus status = PyConfig_SetBytesArgv (&thisConfig, System::argc(), System::argv());
184 if (PyStatus_Exception (status)) {
185 PyConfig_Clear (&thisConfig);
186 ERS_PSC_ERROR("Error: Python could not be initialized.");
187 return false;
188 }
189 status = Py_InitializeFromConfig (&thisConfig);
190 if (PyStatus_Exception (status)) {
191 PyConfig_Clear (&thisConfig);
192 ERS_PSC_ERROR("Error: Python could not be initialized.");
193 return false;
194 }
195
196 /*
197 * The GIL is initialized by Py_Initialize() since Python 3.7."
198 */
199
200 // check
201 if ( ! Py_IsInitialized() ) {
202 ERS_PSC_ERROR("Error: Python could not be initialized.");
203 return false;
204 }
205
206 }
207 else {
208 ERS_DEBUG(1,"Python interpreter already initialized");
209 }
210
211 // Copy Config.optmap to python module
212 // This is how we transfer the options to the python setup script
213 PyObject* pModule = PyImport_ImportModule("TrigPSC.PscConfig");
214 if ( pModule ) {
215 PyObject* optmap = PyObject_GetAttrString(pModule, "optmap");
216 if ( optmap ) {
217 std::map<std::string,std::string>::const_iterator iter;
218 for (iter=m_config->optmap.begin(); iter!=m_config->optmap.end(); ++iter) {
219 PyObject* v = PyUnicode_FromString(iter->second.c_str());
220 std::vector<char> writable(iter->first.size() + 1);
221 std::copy(iter->first.begin(), iter->first.end(), writable.begin());
222 PyMapping_SetItemString(optmap, &writable[0], v);
223 }
224 Py_DECREF(optmap);
225 }
226 else {
227 ERS_DEBUG(1, "Could not import TrigPSC.PscConfig.optmap");
228 }
229 Py_DECREF(pModule);
230 }
231 }
232
233 if ( jobOptConfig ) {
234 // Do the python setup (including user job options)
235 std::string pyBasicFile = m_config->getOption("PYTHONSETUPFILE") ;
236 if ( !psc::Utils::execFile(pyBasicFile) ) {
237 ERS_PSC_ERROR("Basic Python configuration failed.");
238 return false;
239 }
240 }
241
242 if ( !jobOptConfig ) {
243 // -----------------------------
244 // Configuration from database
245 // -----------------------------
246
247 // Run pre-command (you're on your own, no basic python setup)
248 std::string cmd = m_config->getOption("PRECOMMAND");
249 if ( cmd != "" ) {
250 ERS_LOG("Running pre-configure command '" << cmd << "'");
251 if ( !psc::Utils::execPython(cmd) ) {
252 ERS_PSC_ERROR("Pre-configure command failed.");
253 return false;
254 }
255 }
256
257 // Configure ApplicationMgr
258 ERS_DEBUG(1,"Configure ApplicationMgr from database.");
259 StatusCode sc = m_pesaAppMgr->configure();
260 if( sc.isFailure() ) {
261 ERS_PSC_ERROR("Error while configuring the ApplicationMgr");
262 return false;
263 }
264
265 // Do the basic python setup if postcommand or logLevel was changed
266 if ( needPython ) {
267 // only used in athenaHLT, but not in partition running
268 std::string pyBasicFile = m_config->getOption("PYTHONSETUPFILE", /*quiet*/true) ;
269 if ( !pyBasicFile.empty() ) {
270 if ( !psc::Utils::execFile(pyBasicFile) ) {
271 ERS_PSC_ERROR("Basic Python configuration failed.");
272 return false;
273 }
274 }
275 }
276 }
277
278 // Check if running interactively
279 // If yes, delay AppMgr::initialize() until connect
280 if ( needPython ) {
281 PyObject* pModule = PyImport_ImportModule("TrigPSC.PscConfig");
282 if ( pModule ) {
283 PyObject* pInteractive = PyObject_GetAttrString(pModule, "interactive");
284 if ( pInteractive && PyBool_Check(pInteractive) ) {
285 m_interactive = (pInteractive==Py_True);
286 ERS_DEBUG(1, "TrigPSC.PscConfig.interactive = " << m_interactive);
287 Py_DECREF(pInteractive);
288 }
289 else {
290 ERS_DEBUG(1, "Could not read TrigPSC.PscConfig.interactive");
291 }
292 Py_DECREF(pModule);
293 }
294 else {
295 ERS_DEBUG(1, "Could not import TrigPSC.PscConfig");
296 }
297 }
298
299 if ( m_interactive ) ERS_LOG("Running in interactive mode");
300
301 ERS_DEBUG(1,"Configured ApplicationMgr in state: " << m_pesaAppMgr->FSMState());
302
303 ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
304 SmartIF<IProperty> jos_propif{&(*jobOptionSvc)};
305 if(m_config->didUserSetLogLevel())
306 jos_propif->setProperty("OutputLevel", m_config->getLogLevelAsNumStr()).ignore();
307
308 // Write Data Flow parameters in JobOptions catalogue
309
310 // map of dataflow parameters to store in the JobOptions Catalogue
311 auto props = std::map<std::string,std::string>{
312 {"DF_PartitionName", "DF_PARTITION_NAME"},
313 {"DF_ApplicationName", "DF_APPLICATIONNAME"},
314 {"DF_MachineName", "DF_MACHINE_NAME"},
315 {"DF_Ppid", "DF_PPID"},
316 {"DF_Pid", "DF_PID"},
317 {"DF_HostId", "DF_HOST_ID"},
318 {"DF_RandomSeed", "DF_RANDOM_SEED"},
319 {"DF_NumberOfWorkers", "NPROCS"}
320 };
321 if(!setDFProperties(props))
322 return false;
323
324 // Write list of configured ROB IDs into the JobOptions Catalogue
325 if ( m_config->enabled_robs.size() != 0 ) {
326 jobOptionSvc->set("DataFlowConfig.DF_Enabled_ROB_IDs",
327 Gaudi::Utils::toString<std::vector<uint32_t>>(m_config->enabled_robs));
328 ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for enabled ROBs in JobOptions Catalogue: "
329 <<" number of ROB IDs read from OKS = " << m_config->enabled_robs.size());
330 }
331
332 // Write list of configured Sub Det IDs into the JobOptions Catalogue
333 if ( m_config->enabled_SubDets.size() != 0 ) {
334 jobOptionSvc->set("DataFlowConfig.DF_Enabled_SubDet_IDs",
335 Gaudi::Utils::toString<std::vector<uint32_t>>(m_config->enabled_SubDets));
336 ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for enabled sub detectors in JobOptions Catalogue: "
337 <<" number of Sub Det IDs read from OKS = " << m_config->enabled_SubDets.size());
338 }
339
340 // Write the maximum HLT output size into the JobOptions Catalogue
341 if (std::string opt = m_config->getOption("MAXEVENTSIZEMB"); !opt.empty()) {
342 jobOptionSvc->set("DataFlowConfig.DF_MaxEventSizeMB", opt);
343 ERS_DEBUG(1,"psc::Psc::configure: Wrote DF_MaxEventSizeMB=" << opt << " in JobOptions Catalogue");
344 }
345
346 // Write configuration for HLT muon calibration infrastructure in JobOptions catalogue
347 if ( (m_config->getOption("MUONCALBUFFERNAME") != "NONE") && (m_config->getOption("MUONCALBUFFERNAME") != "") ) {
348 jobOptionSvc->set("MuonHltCalibrationConfig.MuonCalBufferName", m_config->getOption("MUONCALBUFFERNAME"));
349 jobOptionSvc->set("MuonHltCalibrationConfig.MuonCalBufferSize", m_config->getOption("MUONCALBUFFERSIZE"));
350
351 ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for HLT Muon Calibration in JobOptions Catalogue: "
352 <<" MuonCalBufferName = " << m_config->getOption("MUONCALBUFFERNAME")
353 <<" MuonCalBufferSize = " << m_config->getOption("MUONCALBUFFERSIZE") );
354 }
355
356 // Write configuration specific to athena (HltEventLoopMgr)
357 if(!setAthenaProperties()) return false;
358
359 if ( !jobOptConfig ) {
360 // Run post-command
361 std::string cmd = m_config->getOption("POSTCOMMAND");
362 if ( cmd != "" ) {
363 ERS_LOG("Running post-configure command '" << cmd << "'");
364 if ( !psc::Utils::execPython(cmd) ) {
365 ERS_PSC_ERROR("Post-configure command failed.");
366 return false;
367 }
368 }
369 }
370
371 if ( !m_interactive ) {
372 if ( !doAppMgrInit() ) return false;
373 }
374
375 return true;
376}
377
378
380{
381 // Initialize the application manager
382 StatusCode sc = m_pesaAppMgr->initialize();
383 ERS_DEBUG(1,"Initialize ApplicationMgr : " << m_pesaAppMgr->FSMState() <<
384 ", Status = " << sc.getCode());
385 if( sc.isFailure() ) {
386 ERS_PSC_ERROR("Error while initializing the ApplicationMgr");
387 return false;
388 }
389
390 // Handle to service locator
391 m_svcLoc = SmartIF<ISvcLocator>( m_pesaAppMgr );
392 if ( !m_svcLoc.isValid() ) {
393 ERS_PSC_ERROR("Error retrieving Service Locator:") ;
394 return false;
395 }
396
397 SmartIF<IProperty> propMgr ( m_pesaAppMgr );
398 if( !propMgr.isValid() ) {
399 ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
400 return false;
401 }
402
403 // Find the basename for the EventLoopMgr
404 std::string value ;
405 sc = propMgr->getProperty( "EventLoop", value );
406 if( sc.isFailure() ) {
407 ERS_PSC_ERROR("Error while retrieving Property \'EventLoop\'.");
408 } else {
409 m_nameEventLoopMgr.assign(value, value.find_first_of("\"")+1,
410 value.find_last_of("\"")-value.find_first_of("\"")-1) ;
411 }
412
413 return true;
414}
415
416
417//--------------------------------------------------------------------------------
418// Connect transition
419//--------------------------------------------------------------------------------
420
421bool psc::Psc::connect (const ptree& /*args*/)
422{
423 psc::Utils::ScopeTimer timer("Psc connect");
424
425 if ( m_interactive ) {
426 if ( !doAppMgrInit() ) return false;
427 }
428 return true;
429}
430
431
432//--------------------------------------------------------------------------------
433// PrepareForRun transition
434//--------------------------------------------------------------------------------
435
437{
438 psc::Utils::ScopeTimer timer("Psc prepareForRun");
439
440 ERS_DEBUG(1, "psc::Psc::prepareForRun ptree:\n" << to_string(args));
441
442 try
443 {
444 // save run number
445 m_run_number = args.get<uint32_t>("RunParams.run_number");
446 }
447 catch(const ptree_bad_path & e)
448 {
449 ERS_PSC_ERROR("Bad ptree path: \"" << e.path<ptree::path_type>().dump()
450 << "\" - " << e.what())
451 return false;
452 }
453 catch(const ptree_bad_data & e)
454 {
455 ERS_PSC_ERROR("Bad ptree data: \"" << e.data<ptree::data_type>() << "\" - "
456 << e.what())
457 return false;
458 }
459
460 // Initializations needed for start()
461 if(!callOnEventLoopMgr<ITrigEventLoopMgr>([&args](ITrigEventLoopMgr* mgr) {return mgr->prepareForStart(args);},
462 "prepareForStart").isSuccess()) {
463 return false;
464 }
465
466 // Start ApplicationMgr (which starts all services including EventLoopMgr)
467 StatusCode sc = m_pesaAppMgr->start();
468 if( !sc.isSuccess() ) {
469 ERS_PSC_ERROR("Error executing ApplicationMgr::start() command ");
470 return false;
471 }
472
473 // bind args to prepareForRun
474 auto prep = [&args](ITrigEventLoopMgr* mgr) {
475 // FIXME: ITrigEventLookMgr::prepareForRun is declared NOT_THREAD_SAFE.
476 // Probably this method shoud also be NOT_THREAD_SAFE, but that's
477 // awkward because it implements a tdaq interface from hltinterface.
478 StatusCode ret ATLAS_THREAD_SAFE = mgr->prepareForRun (args);
479
480 // This dance is needed to prevent RV optimization.
481 // Otherwise, the optimizer loses the ATLAS_THREAD_SAFE attribute
482 // on RET before the thread-safety checker gets to see the code.
483 if (ret.isSuccess()) {
484 return StatusCode (StatusCode::SUCCESS);
485 }
486 return ret;
487 };
488 if(!callOnEventLoopMgr<ITrigEventLoopMgr>(prep, "prepareForRun").isSuccess()) {
489 return false;
490 }
491
492 if ( Py_IsInitialized() ) {
493 // After prepareForRun the HLTMPPU will fork the workers. Tell the Python
494 // interpreter this is happening to avoid deadlocks (ATR-23428).
495 ERS_DEBUG(1, "Pre-fork initialization of Python interpreter");
496 PyOS_BeforeFork();
497 }
498
499 return true;
500}
501
502
503//--------------------------------------------------------------------------------
504// StopRun transition
505//--------------------------------------------------------------------------------
506
507bool psc::Psc::stopRun (const ptree& /*args*/)
508{
509 psc::Utils::ScopeTimer timer("Psc stopRun");
510
511 if ( Py_IsInitialized() && m_workerID==0 ) {
512 // We are back in the mother process after fork. Fixes deadlock with Python 3.13 (ATR-32627).
513 ERS_DEBUG(1, "After-fork initialization of Python interpreter");
514 PyOS_AfterFork_Parent();
515 }
516
517 if(!callOnEventLoopMgr<IService>(&IService::sysStop, "sysStop").isSuccess())
518 {
519 return false;
520 }
521
522 // Stop ApplicationMgr and all services.
523 // The EventLoopMgr is already stopped by above code but that's fine.
524 auto sc = m_pesaAppMgr->stop();
525 if ( !sc.isSuccess() ) {
526 ERS_PSC_ERROR("Error executing stop(void) for ApplicationMgr");
527 return false;
528 }
529
530 // Workers: store histograms at the end of stop as the children may not
531 // go through finalize with SkipFinalizeWorker=1.
532 if (m_workerID != 0) {
533 SmartIF<IService> histsvc = m_svcLoc->service("THistSvc", /*createIf=*/ false);
534 if (histsvc.isValid()) {
535 ERS_LOG("Finalize THistSvc to save histograms");
536 if (histsvc->finalize().isFailure()) {
537 ERS_PSC_ERROR("Error executing finalize for THistSvc");
538 }
539 }
540 }
541
542
543 return true;
544}
545
547{
548 if (m_pesaAppMgr==nullptr) return true; // already finalized
549
550 // Finalize the application manager
551 StatusCode sc = m_pesaAppMgr->finalize();
552 ERS_DEBUG(1,"Finalize ApplicationMgr: " << m_pesaAppMgr->FSMState()
553 << ". Status : " << sc.getCode());
554
555 if( sc.isFailure() ) {
556 ERS_PSC_ERROR("Error while finalizing the ApplicationMgr.");
557 return false;
558 }
559
560 // Terminate the application manager
561 sc = m_pesaAppMgr->terminate();
562 ERS_DEBUG(1,"Terminate ApplicationMgr: " << m_pesaAppMgr->FSMState()
563 << ". Status : " << sc.getCode());
564
565 if ( sc.isFailure() ) {
566 ERS_PSC_ERROR("Error while terminating the ApplicationMgr.");
567 return false;
568 }
569
570 // Make sure we get a new instance the next time
571 Gaudi::setInstance(static_cast<IAppMgrUI*>(nullptr));
572 m_pesaAppMgr = nullptr;
573
574 return true;
575}
576
577//--------------------------------------------------------------------------------
578// Disconnect transition
579//--------------------------------------------------------------------------------
580
581bool psc::Psc::disconnect (const ptree& /*args*/)
582{
583 psc::Utils::ScopeTimer timer("Psc disconnect");
584 return doAppMgrFinalize();
585}
586
587
588//--------------------------------------------------------------------------------
589// Unconfigure transition
590//--------------------------------------------------------------------------------
591
592bool psc::Psc::unconfigure (const ptree& /*args*/)
593{
594 psc::Utils::ScopeTimer timer("Psc unconfigure");
595 return true;
596}
597
598
599//--------------------------------------------------------------------------------
600// Publish statistics
601//--------------------------------------------------------------------------------
602
603bool psc::Psc::publishStatistics (const ptree& /*args*/)
604{
605 psc::Utils::ScopeTimer timer("Psc publishStatistics");
606 return true;
607}
608
609
610//--------------------------------------------------------------------------------
611// User command can be sent via:
612// partition: rc_sendcommand -p [PART] -n [APP] USER command args
613// But they are no longer supported because they don't propagate from mother to child
614//--------------------------------------------------------------------------------
616{
617 ERS_DEBUG(1, "psc::Psc::hltUserCommand ptree:\n" << to_string(args));
618
619 // Default if no action on command
620 return true;
621}
622
624{
625 ERS_LOG("psc::Psc::doEventLoop: start of doEventLoop()");
626 StatusCode sc;
627 try
628 {
629 // bind maxevt=-1 (meaning all events) to executeRun
630 auto exec = [](IEventProcessor * mgr)
631 {return mgr->executeRun(-1);};
632 sc = callOnEventLoopMgr<IEventProcessor>(exec, "executeRun");
633 }
634 catch(const ers::Issue& e)
635 {
636 ERS_PSC_ERROR("Caught an unexpected ers::Issue: '" << e.what() << "'");
637 sc = StatusCode::FAILURE;
638 }
639 catch(const std::exception& e)
640 {
641 ERS_PSC_ERROR("Caught an unexpected std::exception: '" << e.what() << "'");
642 sc = StatusCode::FAILURE;
643 }
644 catch(...)
645 {
646 ERS_PSC_ERROR("Caught an unknown exception");
647 sc = StatusCode::FAILURE;
648 }
649 if (sc.isFailure()) {
650 ERS_PSC_ERROR("psc::Psc::doEventLoop failed");
651 return false;
652 }
653 ERS_LOG("psc::Psc::doEventLoop: end of doEventLoop()");
654 return true;
655}
656
657bool psc::Psc::prepareWorker (const boost::property_tree::ptree& args)
658{
659 psc::Utils::ScopeTimer timer("Psc prepareWorker");
660
661 ROOT::EnableThreadSafety();
662 if ( Py_IsInitialized() ) {
663 ERS_DEBUG(1, "Post-fork initialization of Python interpreter");
664 PyOS_AfterFork_Child();
665
666 /* Release the Python GIL (which we inherited from the mother)
667 to avoid dead-locking on the first call to Python. Only relevant
668 if Python is initialized and Python-based algorithms are used. */
669 ERS_DEBUG(1, "Releasing Python GIL");
670 PyEval_SaveThread();
671 }
672
673 m_workerID = args.get<int>("workerId");
674
675 ERS_LOG("Individualizing DF properties");
676 m_config->prepareWorker(args);
677
678 if (!setDFProperties({{"DF_Pid", "DF_PID"},
679 {"DF_Ppid", "DF_PPID"},
680 {"DF_RandomSeed", "DF_RANDOM_SEED"},
681 {"DF_WorkerId", "DF_WORKER_ID"},
682 {"DF_NumberOfWorkers", "DF_NUMBER_OF_WORKERS"},
683 {"DF_ApplicationName", "DF_APPLICATIONNAME"}})
684 ) return false;
685
686 // bind args to hltUpdateAfterFork
687 auto upd = [&args](ITrigEventLoopMgr * mgr)
688 {return mgr->hltUpdateAfterFork(args);};
689 if(!callOnEventLoopMgr<ITrigEventLoopMgr>(upd, "hltUpdateAfterFork").isSuccess())
690 {
691 return false;
692 }
693
694 return true;
695}
696
697bool psc::Psc::finalizeWorker (const boost::property_tree::ptree& /*args*/)
698{
699 psc::Utils::ScopeTimer timer("Psc finalizeWorker");
700 return doAppMgrFinalize();
701}
702
703bool psc::Psc::setDFProperties(const std::map<std::string, std::string>& name_tr_table)
704{
705 ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
706 for(const auto& prop : name_tr_table)
707 {
708 const auto& val = m_config->getOption(prop.second);
709 jobOptionSvc->set("DataFlowConfig."+prop.first, val);
710 ERS_DEBUG(0,"Wrote configuration for Data Flow in JobOptions Catalogue: "
711 << prop.first << " = " << val);
712 }
713
714 return true;
715}
716
718 // Use the IProperty interface of the ApplicationManager to find the EventLoopMgr name
719 SmartIF<IProperty> propMgr(m_pesaAppMgr);
720 if (!propMgr.isValid()) {
721 ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
722 return false;
723 }
724 std::string eventLoopMgrName;
725 if (propMgr->getProperty("EventLoop", eventLoopMgrName).isFailure()) {
726 ERS_PSC_ERROR("Error while retrieving the property ApplicationMgr.EventLoop");
727 return false;
728 }
729
730 // Use the JobOptionsSvc to write athena-specific options in JobOptions Catalogue of EventLoopMgr
731 ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
732
733 std::string opt = m_config->getOption("HARDTIMEOUT");
734 if (!opt.empty()) {
735 jobOptionSvc->set(eventLoopMgrName+".HardTimeout", opt);
736 }
737 else {
738 ERS_PSC_ERROR("Failed to get the HARDTIMEOUT property from the configuration tree");
739 return false;
740 }
741
742 opt = m_config->getOption("SOFTTIMEOUTFRACTION");
743 if (!opt.empty()) {
744 jobOptionSvc->set(eventLoopMgrName+".SoftTimeoutFraction", opt);
745 }
746 else {
747 ERS_PSC_ERROR("Failed to get the SOFTTIMEOUTFRACTION property from the configuration tree");
748 return false;
749 }
750
751 /* The names "EventDataSvc" and "AvalancheSchedulerSvc" are hard-coded below, because it is not possible
752 to retrieve them from HltEventLoopMgr properties before it is initialised. Here we need to set the properties
753 before the services are initialised." */
754
755 opt = m_config->getOption("NEVENTSLOTS");
756 if (!opt.empty()) {
757 jobOptionSvc->set("EventDataSvc.NSlots", opt);
758 }
759 else {
760 ERS_PSC_ERROR("Failed to get the NEVENTSLOTS property from the configuration tree");
761 return false;
762 }
763
764 opt = m_config->getOption("NTHREADS");
765 if (!opt.empty()) {
766 jobOptionSvc->set("AvalancheSchedulerSvc.ThreadPoolSize", opt);
767 }
768 else {
769 ERS_PSC_ERROR("Failed to get the NTHREADS property from the configuration tree");
770 return false;
771 }
772
773 return true;
774}
775
776
777template <typename T>
778StatusCode psc::Psc::callOnEventLoopMgr(std::function<StatusCode (T*)> func,
779 const std::string& name) const
780{
781 SmartIF<T> processingMgr{m_svcLoc->service(m_nameEventLoopMgr)};
782 if(!processingMgr) {
783 ERS_PSC_ERROR("Error retrieving EventLoopMgr = '" << m_nameEventLoopMgr << "'" );
784 return StatusCode::FAILURE;
785 }
786
787 // Call the given function of the EventLoopMgr
788 StatusCode sc = func(processingMgr); // processingMgr->func()
789 if(!sc.isSuccess())
790 {
791 ERS_PSC_ERROR("Error executing " << name << " for EventLoopMgr = '"
792 << m_nameEventLoopMgr << "'") ;
793 return sc;
794 }
795
796 return sc;
797}
static std::string to_string(const std::vector< T > &v)
Some helpers for the PSC.
_object PyObject
boost::property_tree::ptree ptree
static Double_t sc
ERS Issues for PSC.
#define ERS_PSC_ERROR(message)
Definition PscIssues.h:51
HLT Pesa Steering Controller.
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
EventLoopMgr interface implemented by the HLT event loop manager.
bool setAthenaProperties()
Definition Psc.cxx:717
uint32_t m_run_number
(initial) run number to be used for this run
Definition Psc.h:134
bool setDFProperties(const std::map< std::string, std::string > &name_tr_table)
Definition Psc.cxx:703
virtual bool unconfigure(const boost::property_tree::ptree &args) override
Unconfigures the framework, releasing all acquired resources.
Definition Psc.cxx:592
int m_workerID
worker ID (0=mother)
Definition Psc.h:139
virtual ~Psc()
Virtual desctuctor.
Definition Psc.cxx:67
virtual bool hltUserCommand(const boost::property_tree::ptree &args) override
Calls the HLT framework to notify it that a user command has arrived.
Definition Psc.cxx:615
virtual bool configure(const boost::property_tree::ptree &config) override
Configures the framework.
Definition Psc.cxx:78
bool m_interactive
Running in interactive mode (athenaHLT).
Definition Psc.h:137
IAppMgrUI * m_pesaAppMgr
Application Manager.
Definition Psc.h:135
bool doAppMgrFinalize()
Finalize the application manager.
Definition Psc.cxx:546
bool doAppMgrInit()
Initialize the application manager.
Definition Psc.cxx:379
virtual bool prepareWorker(const boost::property_tree::ptree &args) override
Method which can be called for a worker to perform the necessary steps to set unique worker IDs and a...
Definition Psc.cxx:657
virtual bool disconnect(const boost::property_tree::ptree &args) override
Disconnects the framework.
Definition Psc.cxx:581
virtual bool publishStatistics(const boost::property_tree::ptree &args) override
Calls the HLT framework to publish statistics, after the run has finished.
Definition Psc.cxx:603
std::string m_nameEventLoopMgr
name of the event loop manager
Definition Psc.h:136
virtual bool prepareForRun(const boost::property_tree::ptree &args) override
prepares the HLT framework for a run
Definition Psc.cxx:436
virtual bool connect(const boost::property_tree::ptree &args) override
Connects the framework.
Definition Psc.cxx:421
virtual bool stopRun(const boost::property_tree::ptree &args) override
stops the HLT framework without re-configuring
Definition Psc.cxx:507
SmartIF< ISvcLocator > m_svcLoc
Service locator handle.
Definition Psc.h:141
StatusCode callOnEventLoopMgr(std::function< StatusCode(T *)> func, const std::string &name) const
Utility method to call a method on the event loop manager.
Definition Psc.cxx:778
virtual bool finalizeWorker(const boost::property_tree::ptree &args) override
Method which can be called for a worker to perform a cleanup before the worker gets killed.
Definition Psc.cxx:697
virtual bool doEventLoop() override
Starts the HLT event loop.
Definition Psc.cxx:623
std::unique_ptr< psc::Config > m_config
Config derived from ptree.
Definition Psc.h:138
bool execFile(const std::string &pyFileName)
Execute a python file (via include or import).
bool execPython(const std::string &pyCmd)
Execute a python command in the python interpreter.