ATLAS Offline Software
Loading...
Searching...
No Matches
Psc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3*/
4
12
13#include "TrigPSC/Psc.h"
14#include "TrigPSC/PscIssues.h"
15#include "TrigPSC/Utils.h"
18
19#include "eformat/ROBFragment.h"
20#include "ers/ers.h"
21
22#undef _POSIX_C_SOURCE
23#undef _XOPEN_SOURCE
24#include <Python.h>
25
26// Include files for Gaudi
27#include "GaudiKernel/Bootstrap.h"
28#include "GaudiKernel/IAppMgrUI.h"
29#include "GaudiKernel/IEventProcessor.h"
30#include "GaudiKernel/IProperty.h"
31#include "GaudiKernel/IService.h"
32#include "Gaudi/Interfaces/IOptionsSvc.h"
33#include "GaudiKernel/IAlgManager.h"
34#include "GaudiKernel/IAlgorithm.h"
35#include "GaudiKernel/ServiceHandle.h"
36#include "Gaudi/Property.h"
37#include "GaudiKernel/System.h"
38
39#include "TROOT.h"
40
41#include <sstream>
42#include <algorithm>
43#include <vector>
44#include <locale>
45#include <codecvt>
46
47#include <boost/property_tree/xml_parser.hpp>
48
49using namespace boost::property_tree;
50
51namespace
52{
54 std::string to_string(const ptree& p)
55 {
56 using T = ptree::key_type;
57 std::ostringstream oss;
58 xml_parser::write_xml(oss, p,
59 xml_parser::xml_writer_make_settings<T>(' ', 2));
60 return oss.str();
61 }
62}
63
64//--------------------------------------------------------------------------------
65// Destructor
66//--------------------------------------------------------------------------------
68{
69 if (m_pesaAppMgr) {
70 m_pesaAppMgr->release() ;
71 }
72}
73
74//--------------------------------------------------------------------------------
75// Configure transition
76//--------------------------------------------------------------------------------
77
79{
80 psc::Utils::ScopeTimer timer("Psc configuration");
81
82 ROOT::EnableThreadSafety();
83
84 ERS_DEBUG(1, "psc::Psc::configure ptree:\n" << to_string(config));
85 try
86 {
87 m_config = std::make_unique<Config>(config);
88 }
89 catch(const std::exception& e)
90 {
91 ERS_PSC_ERROR("Cannot configure. " << e.what());
92 return false;
93 }
94
95 // Print PSC configuration
96 ERS_LOG("---> Dump of config cache: \n" << m_config->dumpOptions() );
97 ERS_LOG("---> Pesa JobOptions file is = " << m_config->getOption("JOBOPTIONSPATH") );
98
99 // -----------------------------
100 // Create C++ ApplicationMgr
101 // -----------------------------
102 ERS_DEBUG(1,"---> Create Pesa Application Manager");
103 m_pesaAppMgr = Gaudi::createApplicationMgr();
104 ERS_DEBUG(1,"m_pesaAppMgr = " << m_pesaAppMgr);
105
106 if( !m_pesaAppMgr ) {
107 ERS_PSC_ERROR("Error while creating the ApplicationMgr");
108 return false;
109 }
110
111 // configure Pesa
112 ERS_DEBUG(1,"---> Configure Pesa Property Manager");
113 SmartIF<IProperty> propMgr ( m_pesaAppMgr );
114 if( !propMgr.isValid() ) {
115 ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
116 return false;
117 }
118
119 // Set the JobOptionsSvcType
120 StatusCode sc;
121 sc = propMgr->setProperty( "JobOptionsSvcType", m_config->getOption("JOBOPTIONSSVCTYPE"));
122 if (sc.isFailure()) {
123 ERS_PSC_ERROR("Can not set Property = JobOptionsSvcType");
124 return false;
125 }
126
127 // Set the JobOptionsType
128 sc = propMgr->setProperty( "JobOptionsType", m_config->getOption("JOBOPTIONSTYPE"));
129 if (sc.isFailure()) {
130 ERS_PSC_ERROR("Can not set Property = JobOptionsType");
131 return false;
132 }
133
134 // Set JobOptionsPath
135 sc = propMgr->setProperty( "JobOptionsPath", m_config->getOption("JOBOPTIONSPATH") );
136 if (sc.isFailure()) {
137 ERS_PSC_ERROR("Can not set Property = JobOptionsPath");
138 return false;
139 }
140
141 // Select the MessageSvc Type
142 sc = propMgr->setProperty( "MessageSvcType", m_config->getOption("MESSAGESVCTYPE"));
143 if (sc.isFailure()) {
144 ERS_PSC_ERROR("Can not set Property = MessageSvcType");
145 return false;
146 }
147
148 // Need to setup the python interpreter if:
149 // 1) run from job options
150 // 2) run from DB but user specified a pre/postcommand
151 // 3) run from DB but user specified a logLevel via "-l" in athenaMT/PT
152 bool jobOptConfig, needPython;
153 if ( m_config->getOption("JOBOPTIONSTYPE") == "NONE" ) {
154 jobOptConfig = needPython = true;
155 }
156 else if ( m_config->getOption("JOBOPTIONSTYPE") == "DB" ||
157 m_config->getOption("JOBOPTIONSTYPE") == "FILE") {
158 jobOptConfig = needPython = false;
159 if ( (m_config->getOption("PRECOMMAND")!="") || (m_config->getOption("POSTCOMMAND")!="") ) {
160 needPython = true;
161 }
162 if (m_config->didUserSetLogLevel()) {
163 needPython = true;
164 }
165 }
166 else {
167 ERS_PSC_ERROR("Unsupported configuration method \"" << m_config->getOption("JOBOPTIONSTYPE") << "\"");
168 return false;
169 }
170
171
172 if ( jobOptConfig || needPython ) {
173 // ----------------------------
174 // Configuration with .py files
175 // ----------------------------
176
177 // Try to initialize the Python interpreter
178 if ( ! Py_IsInitialized() ) {
179 ERS_DEBUG(1,"Initializing Python interpreter");
180
181 PyConfig config;
182 PyConfig_InitPythonConfig (&config);
183 PyStatus status = PyConfig_SetBytesArgv (&config, System::argc(), System::argv());
184 if (PyStatus_Exception (status)) {
185 PyConfig_Clear (&config);
186 ERS_PSC_ERROR("Error: Python could not be initialized.");
187 return false;
188 }
189 status = Py_InitializeFromConfig (&config);
190 if (PyStatus_Exception (status)) {
191 PyConfig_Clear (&config);
192 ERS_PSC_ERROR("Error: Python could not be initialized.");
193 return false;
194 }
195
196 /*
197 * The GIL is initialized by Py_Initialize() since Python 3.7."
198 */
199
200 // check
201 if ( ! Py_IsInitialized() ) {
202 ERS_PSC_ERROR("Error: Python could not be initialized.");
203 return false;
204 }
205
206 }
207 else {
208 ERS_DEBUG(1,"Python interpreter already initialized");
209 }
210
211 // Copy Config.optmap to python module
212 // This is how we transfer the options to the python setup script
213 PyObject* pModule = PyImport_ImportModule("TrigPSC.PscConfig");
214 if ( pModule ) {
215 PyObject* optmap = PyObject_GetAttrString(pModule, "optmap");
216 if ( optmap ) {
217 std::map<std::string,std::string>::const_iterator iter;
218 for (iter=m_config->optmap.begin(); iter!=m_config->optmap.end(); ++iter) {
219 PyObject* v = PyUnicode_FromString(iter->second.c_str());
220 std::vector<char> writable(iter->first.size() + 1);
221 std::copy(iter->first.begin(), iter->first.end(), writable.begin());
222 PyMapping_SetItemString(optmap, &writable[0], v);
223 }
224 Py_DECREF(optmap);
225 }
226 else {
227 ERS_DEBUG(1, "Could not import TrigPSC.PscConfig.optmap");
228 }
229 Py_DECREF(pModule);
230 }
231 }
232
233 if ( jobOptConfig ) {
234 // Do the python setup (including user job options)
235 std::string pyBasicFile = m_config->getOption("PYTHONSETUPFILE") ;
236 if ( !psc::Utils::execFile(pyBasicFile) ) {
237 ERS_PSC_ERROR("Basic Python configuration failed.");
238 return false;
239 }
240 }
241
242 if ( !jobOptConfig ) {
243 // -----------------------------
244 // Configuration from database
245 // -----------------------------
246
247 // Run pre-command (you're on your own, no basic python setup)
248 std::string cmd = m_config->getOption("PRECOMMAND");
249 if ( cmd != "" ) {
250 ERS_LOG("Running pre-configure command '" << cmd << "'");
251 if ( !psc::Utils::execPython(cmd) ) {
252 ERS_PSC_ERROR("Pre-configure command failed.");
253 return false;
254 }
255 }
256
257 // Configure ApplicationMgr
258 ERS_DEBUG(1,"Configure ApplicationMgr from database.");
259 StatusCode sc = m_pesaAppMgr->configure();
260 if( sc.isFailure() ) {
261 ERS_PSC_ERROR("Error while configuring the ApplicationMgr");
262 return false;
263 }
264
265 // Do the basic python setup if postcommand or logLevel was changed
266 if ( needPython ) {
267 // only used in athenaHLT, but not in partition running
268 std::string pyBasicFile = m_config->getOption("PYTHONSETUPFILE", /*quiet*/true) ;
269 if ( !pyBasicFile.empty() ) {
270 if ( !psc::Utils::execFile(pyBasicFile) ) {
271 ERS_PSC_ERROR("Basic Python configuration failed.");
272 return false;
273 }
274 }
275 }
276 }
277
278 // Check if running interactively
279 // If yes, delay AppMgr::initialize() until connect
280 if ( needPython ) {
281 PyObject* pModule = PyImport_ImportModule("TrigPSC.PscConfig");
282 if ( pModule ) {
283 PyObject* pInteractive = PyObject_GetAttrString(pModule, "interactive");
284 if ( pInteractive && PyBool_Check(pInteractive) ) {
285 m_interactive = (pInteractive==Py_True);
286 ERS_DEBUG(1, "TrigPSC.PscConfig.interactive = " << m_interactive);
287 Py_DECREF(pInteractive);
288 }
289 else {
290 ERS_DEBUG(1, "Could not read TrigPSC.PscConfig.interactive");
291 }
292 Py_DECREF(pModule);
293 }
294 else {
295 ERS_DEBUG(1, "Could not import TrigPSC.PscConfig");
296 }
297 }
298
299 if ( m_interactive ) ERS_LOG("Running in interactive mode");
300
301 ERS_DEBUG(1,"Configured ApplicationMgr in state: " << m_pesaAppMgr->FSMState());
302
303 ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
304 SmartIF<IProperty> jos_propif{&(*jobOptionSvc)};
305 if(m_config->didUserSetLogLevel())
306 jos_propif->setProperty("OutputLevel", m_config->getLogLevelAsNumStr()).ignore();
307
308 // Write Data Flow parameters in JobOptions catalogue
309
310 // map of dataflow parameters to store in the JobOptions Catalogue
311 auto props = std::map<std::string,std::string>{
312 {"DF_PartitionName", "DF_PARTITION_NAME"},
313 {"DF_ApplicationName", "DF_APPLICATIONNAME"},
314 {"DF_MachineName", "DF_MACHINE_NAME"},
315 {"DF_Ppid", "DF_PPID"},
316 {"DF_Pid", "DF_PID"},
317 {"DF_HostId", "DF_HOST_ID"},
318 {"DF_RandomSeed", "DF_RANDOM_SEED"},
319 {"DF_NumberOfWorkers", "NPROCS"}
320 };
321 if(!setDFProperties(props))
322 return false;
323
324 // Write list of configured ROB IDs into the JobOptions Catalogue
325 if ( m_config->enabled_robs.size() != 0 ) {
326 jobOptionSvc->set("DataFlowConfig.DF_Enabled_ROB_IDs",
327 Gaudi::Utils::toString<std::vector<uint32_t>>(m_config->enabled_robs));
328 ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for enabled ROBs in JobOptions Catalogue: "
329 <<" number of ROB IDs read from OKS = " << m_config->enabled_robs.size());
330 }
331
332 // Write list of configured Sub Det IDs into the JobOptions Catalogue
333 if ( m_config->enabled_SubDets.size() != 0 ) {
334 jobOptionSvc->set("DataFlowConfig.DF_Enabled_SubDet_IDs",
335 Gaudi::Utils::toString<std::vector<uint32_t>>(m_config->enabled_SubDets));
336 ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for enabled sub detectors in JobOptions Catalogue: "
337 <<" number of Sub Det IDs read from OKS = " << m_config->enabled_SubDets.size());
338 }
339
340 // Write the maximum HLT output size into the JobOptions Catalogue
341 if (std::string opt = m_config->getOption("MAXEVENTSIZEMB"); !opt.empty()) {
342 jobOptionSvc->set("DataFlowConfig.DF_MaxEventSizeMB", opt);
343 ERS_DEBUG(1,"psc::Psc::configure: Wrote DF_MaxEventSizeMB=" << opt << " in JobOptions Catalogue");
344 }
345
346 // Write configuration for HLT muon calibration infrastructure in JobOptions catalogue
347 if ( (m_config->getOption("MUONCALBUFFERNAME") != "NONE") && (m_config->getOption("MUONCALBUFFERNAME") != "") ) {
348 jobOptionSvc->set("MuonHltCalibrationConfig.MuonCalBufferName", m_config->getOption("MUONCALBUFFERNAME"));
349 jobOptionSvc->set("MuonHltCalibrationConfig.MuonCalBufferSize", m_config->getOption("MUONCALBUFFERSIZE"));
350
351 ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for HLT Muon Calibration in JobOptions Catalogue: "
352 <<" MuonCalBufferName = " << m_config->getOption("MUONCALBUFFERNAME")
353 <<" MuonCalBufferSize = " << m_config->getOption("MUONCALBUFFERSIZE") );
354 }
355
356 // Write configuration specific to athena (HltEventLoopMgr)
357 if(!setAthenaProperties()) return false;
358
359 if ( !jobOptConfig ) {
360 // Run post-command
361 std::string cmd = m_config->getOption("POSTCOMMAND");
362 if ( cmd != "" ) {
363 ERS_LOG("Running post-configure command '" << cmd << "'");
364 if ( !psc::Utils::execPython(cmd) ) {
365 ERS_PSC_ERROR("Post-configure command failed.");
366 return false;
367 }
368 }
369 }
370
371 if ( !m_interactive ) {
372 if ( !doAppMgrInit() ) return false;
373 }
374
375 return true;
376}
377
378
380{
381 // Initialize the application manager
382 StatusCode sc = m_pesaAppMgr->initialize();
383 ERS_DEBUG(1,"Initialize ApplicationMgr : " << m_pesaAppMgr->FSMState() <<
384 ", Status = " << sc.getCode());
385 if( sc.isFailure() ) {
386 ERS_PSC_ERROR("Error while initializing the ApplicationMgr");
387 return false;
388 }
389
390 // Handle to service locator
391 m_svcLoc = SmartIF<ISvcLocator>( m_pesaAppMgr );
392 if ( !m_svcLoc.isValid() ) {
393 ERS_PSC_ERROR("Error retrieving Service Locator:") ;
394 return false;
395 }
396
397 SmartIF<IProperty> propMgr ( m_pesaAppMgr );
398 if( !propMgr.isValid() ) {
399 ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
400 return false;
401 }
402
403 // Find the basename for the EventLoopMgr
404 std::string value ;
405 sc = propMgr->getProperty( "EventLoop", value );
406 if( sc.isFailure() ) {
407 ERS_PSC_ERROR("Error while retrieving Property \'EventLoop\'.");
408 } else {
409 m_nameEventLoopMgr.assign(value, value.find_first_of("\"")+1,
410 value.find_last_of("\"")-value.find_first_of("\"")-1) ;
411 }
412
413 return true;
414}
415
416
417//--------------------------------------------------------------------------------
418// Connect transition
419//--------------------------------------------------------------------------------
420
421bool psc::Psc::connect (const ptree& /*args*/)
422{
423 psc::Utils::ScopeTimer timer("Psc connect");
424
425 if ( m_interactive ) {
426 if ( !doAppMgrInit() ) return false;
427 }
428 return true;
429}
430
431
432//--------------------------------------------------------------------------------
433// PrepareForRun transition
434//--------------------------------------------------------------------------------
435
437{
438 psc::Utils::ScopeTimer timer("Psc prepareForRun");
439
440 ERS_DEBUG(1, "psc::Psc::prepareForRun ptree:\n" << to_string(args));
441
442 try
443 {
444 // save run number
445 m_run_number = args.get<uint32_t>("RunParams.run_number");
446 }
447 catch(const ptree_bad_path & e)
448 {
449 ERS_PSC_ERROR("Bad ptree path: \"" << e.path<ptree::path_type>().dump()
450 << "\" - " << e.what())
451 return false;
452 }
453 catch(const ptree_bad_data & e)
454 {
455 ERS_PSC_ERROR("Bad ptree data: \"" << e.data<ptree::data_type>() << "\" - "
456 << e.what())
457 return false;
458 }
459
460 // Initializations needed for start()
461 if(!callOnEventLoopMgr<ITrigEventLoopMgr>([&args](ITrigEventLoopMgr* mgr) {return mgr->prepareForStart(args);},
462 "prepareForStart").isSuccess()) {
463 return false;
464 }
465
466 // Start ApplicationMgr (which starts all services including EventLoopMgr)
467 StatusCode sc = m_pesaAppMgr->start();
468 if( !sc.isSuccess() ) {
469 ERS_PSC_ERROR("Error executing ApplicationMgr::start() command ");
470 return false;
471 }
472
473 // bind args to prepareForRun
474 auto prep = [&args](ITrigEventLoopMgr* mgr) {
475 // FIXME: ITrigEventLookMgr::prepareForRun is declared NOT_THREAD_SAFE.
476 // Probably this method shoud also be NOT_THREAD_SAFE, but that's
477 // awkward because it implements a tdaq interface from hltinterface.
478 StatusCode ret ATLAS_THREAD_SAFE = mgr->prepareForRun (args);
479
480 // This dance is needed to prevent RV optimization.
481 // Otherwise, the optimizer loses the ATLAS_THREAD_SAFE attribute
482 // on RET before the thread-safety checker gets to see the code.
483 if (ret.isSuccess()) {
484 return StatusCode (StatusCode::SUCCESS);
485 }
486 return ret;
487 };
488 if(!callOnEventLoopMgr<ITrigEventLoopMgr>(prep, "prepareForRun").isSuccess()) {
489 return false;
490 }
491
492 if ( Py_IsInitialized() ) {
493 // After prepareForRun the HLTMPPU will fork the workers. Tell the Python
494 // interpreter this is happening to avoid deadlocks (ATR-23428).
495 ERS_DEBUG(1, "Pre-fork initialization of Python interpreter");
496 PyOS_BeforeFork();
497 }
498
499 return true;
500}
501
502
503//--------------------------------------------------------------------------------
504// StopRun transition
505//--------------------------------------------------------------------------------
506
507bool psc::Psc::stopRun (const ptree& /*args*/)
508{
509 psc::Utils::ScopeTimer timer("Psc stopRun");
510
511 if(!callOnEventLoopMgr<IService>(&IService::sysStop, "sysStop").isSuccess())
512 {
513 return false;
514 }
515
516 // Stop ApplicationMgr and all services.
517 // The EventLoopMgr is already stopped by above code but that's fine.
518 auto sc = m_pesaAppMgr->stop();
519 if ( !sc.isSuccess() ) {
520 ERS_PSC_ERROR("Error executing stop(void) for ApplicationMgr");
521 return false;
522 }
523
524 // Workers: store histograms at the end of stop as the children may not
525 // go through finalize with SkipFinalizeWorker=1.
526 if (m_workerID != 0) {
527 SmartIF<IService> histsvc = m_svcLoc->service("THistSvc", /*createIf=*/ false);
528 if (histsvc.isValid()) {
529 ERS_LOG("Finalize THistSvc to save histograms");
530 if (histsvc->finalize().isFailure()) {
531 ERS_PSC_ERROR("Error executing finalize for THistSvc");
532 }
533 }
534 }
535
536
537 return true;
538}
539
541{
542 if (m_pesaAppMgr==nullptr) return true; // already finalized
543
544 // Finalize the application manager
545 StatusCode sc = m_pesaAppMgr->finalize();
546 ERS_DEBUG(1,"Finalize ApplicationMgr: " << m_pesaAppMgr->FSMState()
547 << ". Status : " << sc.getCode());
548
549 if( sc.isFailure() ) {
550 ERS_PSC_ERROR("Error while finalizing the ApplicationMgr.");
551 return false;
552 }
553
554 // Terminate the application manager
555 sc = m_pesaAppMgr->terminate();
556 ERS_DEBUG(1,"Terminate ApplicationMgr: " << m_pesaAppMgr->FSMState()
557 << ". Status : " << sc.getCode());
558
559 if ( sc.isFailure() ) {
560 ERS_PSC_ERROR("Error while terminating the ApplicationMgr.");
561 return false;
562 }
563
564 // Make sure we get a new instance the next time
565 Gaudi::setInstance(static_cast<IAppMgrUI*>(nullptr));
566 m_pesaAppMgr = nullptr;
567
568 return true;
569}
570
571//--------------------------------------------------------------------------------
572// Disconnect transition
573//--------------------------------------------------------------------------------
574
575bool psc::Psc::disconnect (const ptree& /*args*/)
576{
577 psc::Utils::ScopeTimer timer("Psc disconnect");
578 return doAppMgrFinalize();
579}
580
581
582//--------------------------------------------------------------------------------
583// Unconfigure transition
584//--------------------------------------------------------------------------------
585
586bool psc::Psc::unconfigure (const ptree& /*args*/)
587{
588 psc::Utils::ScopeTimer timer("Psc unconfigure");
589 return true;
590}
591
592
593//--------------------------------------------------------------------------------
594// Publish statistics
595//--------------------------------------------------------------------------------
596
597bool psc::Psc::publishStatistics (const ptree& /*args*/)
598{
599 psc::Utils::ScopeTimer timer("Psc publishStatistics");
600 return true;
601}
602
603
604//--------------------------------------------------------------------------------
605// User command can be sent via:
606// partition: rc_sendcommand -p [PART] -n [APP] USER command args
607// But they are no longer supported because they don't propagate from mother to child
608//--------------------------------------------------------------------------------
610{
611 ERS_DEBUG(1, "psc::Psc::hltUserCommand ptree:\n" << to_string(args));
612
613 // Default if no action on command
614 return true;
615}
616
618{
619 ERS_LOG("psc::Psc::doEventLoop: start of doEventLoop()");
620 StatusCode sc;
621 try
622 {
623 // bind maxevt=-1 (meaning all events) to executeRun
624 auto exec = [](IEventProcessor * mgr)
625 {return mgr->executeRun(-1);};
626 sc = callOnEventLoopMgr<IEventProcessor>(exec, "executeRun");
627 }
628 catch(const ers::Issue& e)
629 {
630 ERS_PSC_ERROR("Caught an unexpected ers::Issue: '" << e.what() << "'");
631 sc = StatusCode::FAILURE;
632 }
633 catch(const std::exception& e)
634 {
635 ERS_PSC_ERROR("Caught an unexpected std::exception: '" << e.what() << "'");
636 sc = StatusCode::FAILURE;
637 }
638 catch(...)
639 {
640 ERS_PSC_ERROR("Caught an unknown exception");
641 sc = StatusCode::FAILURE;
642 }
643 if (sc.isFailure()) {
644 ERS_PSC_ERROR("psc::Psc::doEventLoop failed");
645 return false;
646 }
647 ERS_LOG("psc::Psc::doEventLoop: end of doEventLoop()");
648 return true;
649}
650
651bool psc::Psc::prepareWorker (const boost::property_tree::ptree& args)
652{
653 psc::Utils::ScopeTimer timer("Psc prepareWorker");
654
655 ROOT::EnableThreadSafety();
656 if ( Py_IsInitialized() ) {
657 ERS_DEBUG(1, "Post-fork initialization of Python interpreter");
658 PyOS_AfterFork_Child();
659
660 /* Release the Python GIL (which we inherited from the mother)
661 to avoid dead-locking on the first call to Python. Only relevant
662 if Python is initialized and Python-based algorithms are used. */
663 ERS_DEBUG(1, "Releasing Python GIL");
664 PyEval_SaveThread();
665 }
666
667 m_workerID = args.get<int>("workerId");
668
669 ERS_LOG("Individualizing DF properties");
670 m_config->prepareWorker(args);
671
672 if (!setDFProperties({{"DF_Pid", "DF_PID"},
673 {"DF_Ppid", "DF_PPID"},
674 {"DF_RandomSeed", "DF_RANDOM_SEED"},
675 {"DF_WorkerId", "DF_WORKER_ID"},
676 {"DF_NumberOfWorkers", "DF_NUMBER_OF_WORKERS"},
677 {"DF_ApplicationName", "DF_APPLICATIONNAME"}})
678 ) return false;
679
680 // bind args to hltUpdateAfterFork
681 auto upd = [&args](ITrigEventLoopMgr * mgr)
682 {return mgr->hltUpdateAfterFork(args);};
683 if(!callOnEventLoopMgr<ITrigEventLoopMgr>(upd, "hltUpdateAfterFork").isSuccess())
684 {
685 return false;
686 }
687
688 return true;
689}
690
691bool psc::Psc::finalizeWorker (const boost::property_tree::ptree& /*args*/)
692{
693 psc::Utils::ScopeTimer timer("Psc finalizeWorker");
694 return doAppMgrFinalize();
695}
696
697bool psc::Psc::setDFProperties(const std::map<std::string, std::string>& name_tr_table)
698{
699 ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
700 for(const auto& prop : name_tr_table)
701 {
702 const auto& val = m_config->getOption(prop.second);
703 jobOptionSvc->set("DataFlowConfig."+prop.first, val);
704 ERS_DEBUG(0,"Wrote configuration for Data Flow in JobOptions Catalogue: "
705 << prop.first << " = " << val);
706 }
707
708 return true;
709}
710
712 // Use the IProperty interface of the ApplicationManager to find the EventLoopMgr name
713 SmartIF<IProperty> propMgr(m_pesaAppMgr);
714 if (!propMgr.isValid()) {
715 ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
716 return false;
717 }
718 std::string eventLoopMgrName;
719 if (propMgr->getProperty("EventLoop", eventLoopMgrName).isFailure()) {
720 ERS_PSC_ERROR("Error while retrieving the property ApplicationMgr.EventLoop");
721 return false;
722 }
723
724 // Use the JobOptionsSvc to write athena-specific options in JobOptions Catalogue of EventLoopMgr
725 ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
726
727 std::string opt = m_config->getOption("HARDTIMEOUT");
728 if (!opt.empty()) {
729 jobOptionSvc->set(eventLoopMgrName+".HardTimeout", opt);
730 }
731 else {
732 ERS_PSC_ERROR("Failed to get the HARDTIMEOUT property from the configuration tree");
733 return false;
734 }
735
736 opt = m_config->getOption("SOFTTIMEOUTFRACTION");
737 if (!opt.empty()) {
738 jobOptionSvc->set(eventLoopMgrName+".SoftTimeoutFraction", opt);
739 }
740 else {
741 ERS_PSC_ERROR("Failed to get the SOFTTIMEOUTFRACTION property from the configuration tree");
742 return false;
743 }
744
745 /* The names "EventDataSvc" and "AvalancheSchedulerSvc" are hard-coded below, because it is not possible
746 to retrieve them from HltEventLoopMgr properties before it is initialised. Here we need to set the properties
747 before the services are initialised." */
748
749 opt = m_config->getOption("NEVENTSLOTS");
750 if (!opt.empty()) {
751 jobOptionSvc->set("EventDataSvc.NSlots", opt);
752 }
753 else {
754 ERS_PSC_ERROR("Failed to get the NEVENTSLOTS property from the configuration tree");
755 return false;
756 }
757
758 opt = m_config->getOption("NTHREADS");
759 if (!opt.empty()) {
760 jobOptionSvc->set("AvalancheSchedulerSvc.ThreadPoolSize", opt);
761 }
762 else {
763 ERS_PSC_ERROR("Failed to get the NTHREADS property from the configuration tree");
764 return false;
765 }
766
767 return true;
768}
769
770
771template <typename T>
772StatusCode psc::Psc::callOnEventLoopMgr(std::function<StatusCode (T*)> func,
773 const std::string& name) const
774{
775 SmartIF<T> processingMgr{m_svcLoc->service(m_nameEventLoopMgr)};
776 if(!processingMgr) {
777 ERS_PSC_ERROR("Error retrieving EventLoopMgr = '" << m_nameEventLoopMgr << "'" );
778 return StatusCode::FAILURE;
779 }
780
781 // Call the given function of the EventLoopMgr
782 StatusCode sc = func(processingMgr); // processingMgr->func()
783 if(!sc.isSuccess())
784 {
785 ERS_PSC_ERROR("Error executing " << name << " for EventLoopMgr = '"
786 << m_nameEventLoopMgr << "'") ;
787 return sc;
788 }
789
790 return sc;
791}
static std::string to_string(const std::vector< T > &v)
Some helpers for the PSC.
_object PyObject
boost::property_tree::ptree ptree
static Double_t sc
ERS Issues for PSC.
#define ERS_PSC_ERROR(message)
Definition PscIssues.h:51
HLT Pesa Steering Controller.
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
EventLoopMgr interface implemented by the HLT event loop manager.
bool setAthenaProperties()
Definition Psc.cxx:711
uint32_t m_run_number
(initial) run number to be used for this run
Definition Psc.h:134
bool setDFProperties(const std::map< std::string, std::string > &name_tr_table)
Definition Psc.cxx:697
virtual bool unconfigure(const boost::property_tree::ptree &args) override
Unconfigures the framework, releasing all acquired resources.
Definition Psc.cxx:586
int m_workerID
worker ID (0=mother)
Definition Psc.h:139
virtual ~Psc()
Virtual desctuctor.
Definition Psc.cxx:67
virtual bool hltUserCommand(const boost::property_tree::ptree &args) override
Calls the HLT framework to notify it that a user command has arrived.
Definition Psc.cxx:609
virtual bool configure(const boost::property_tree::ptree &config) override
Configures the framework.
Definition Psc.cxx:78
bool m_interactive
Running in interactive mode (athenaHLT)
Definition Psc.h:137
IAppMgrUI * m_pesaAppMgr
Application Manager.
Definition Psc.h:135
bool doAppMgrFinalize()
Finalize the application manager.
Definition Psc.cxx:540
bool doAppMgrInit()
Initialize the application manager.
Definition Psc.cxx:379
virtual bool prepareWorker(const boost::property_tree::ptree &args) override
Method which can be called for a worker to perform the necessary steps to set unique worker IDs and a...
Definition Psc.cxx:651
virtual bool disconnect(const boost::property_tree::ptree &args) override
Disconnects the framework.
Definition Psc.cxx:575
virtual bool publishStatistics(const boost::property_tree::ptree &args) override
Calls the HLT framework to publish statistics, after the run has finished.
Definition Psc.cxx:597
std::string m_nameEventLoopMgr
name of the event loop manager
Definition Psc.h:136
virtual bool prepareForRun(const boost::property_tree::ptree &args) override
prepares the HLT framework for a run
Definition Psc.cxx:436
virtual bool connect(const boost::property_tree::ptree &args) override
Connects the framework.
Definition Psc.cxx:421
virtual bool stopRun(const boost::property_tree::ptree &args) override
stops the HLT framework without re-configuring
Definition Psc.cxx:507
SmartIF< ISvcLocator > m_svcLoc
Service locator handle.
Definition Psc.h:141
StatusCode callOnEventLoopMgr(std::function< StatusCode(T *)> func, const std::string &name) const
Utility method to call a method on the event loop manager.
Definition Psc.cxx:772
virtual bool finalizeWorker(const boost::property_tree::ptree &args) override
Method which can be called for a worker to perform a cleanup before the worker gets killed.
Definition Psc.cxx:691
virtual bool doEventLoop() override
Starts the HLT event loop.
Definition Psc.cxx:617
std::unique_ptr< psc::Config > m_config
Config derived from ptree.
Definition Psc.h:138
bool execFile(const std::string &pyFileName)
Execute a python file (via include or import)
bool execPython(const std::string &pyCmd)
Execute a python command in the python interpreter.