ATLAS Offline Software
Psc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
13 #include "TrigPSC/Psc.h"
14 #include "TrigPSC/PscIssues.h"
15 #include "TrigPSC/Utils.h"
18 
19 #include "eformat/ROBFragment.h"
20 #include "ers/ers.h"
21 
22 #undef _POSIX_C_SOURCE
23 #undef _XOPEN_SOURCE
24 #include <Python.h>
25 
26 // Include files for Gaudi
27 #include "GaudiKernel/Bootstrap.h"
28 #include "GaudiKernel/IAppMgrUI.h"
29 #include "GaudiKernel/IEventProcessor.h"
30 #include "GaudiKernel/IProperty.h"
31 #include "GaudiKernel/IService.h"
32 #include "Gaudi/Interfaces/IOptionsSvc.h"
33 #include "GaudiKernel/IAlgManager.h"
34 #include "GaudiKernel/IAlgorithm.h"
35 #include "GaudiKernel/ServiceHandle.h"
36 #include "Gaudi/Property.h"
37 #include "GaudiKernel/System.h"
38 
39 #include "TROOT.h"
40 
41 #include <sstream>
42 #include <algorithm>
43 #include <vector>
44 #include <locale>
45 #include <codecvt>
46 
47 #include <boost/property_tree/xml_parser.hpp>
48 
49 using namespace boost::property_tree;
50 
51 namespace
52 {
54  std::string to_string(const ptree& p)
55  {
56  using T = ptree::key_type;
57  std::ostringstream oss;
58  xml_parser::write_xml(oss, p,
59  xml_parser::xml_writer_make_settings<T>(' ', 2));
60  return oss.str();
61  }
62 }
63 
64 //--------------------------------------------------------------------------------
65 // Destructor
66 //--------------------------------------------------------------------------------
68 {
69  if (m_pesaAppMgr) {
70  m_pesaAppMgr->release() ;
71  }
72 }
73 
74 //--------------------------------------------------------------------------------
75 // Configure transition
76 //--------------------------------------------------------------------------------
77 
79 {
80  psc::Utils::ScopeTimer timer("Psc configuration");
81 
82  ROOT::EnableThreadSafety();
83 
84  ERS_DEBUG(1, "psc::Psc::configure ptree:\n" << to_string(config));
85  try
86  {
87  m_config = std::make_unique<Config>(config);
88  }
89  catch(const std::exception& e)
90  {
91  ERS_PSC_ERROR("Cannot configure. " << e.what());
92  return false;
93  }
94 
95  // Print PSC configuration
96  ERS_LOG("---> Dump of config cache: \n" << m_config->dumpOptions() );
97  ERS_LOG("---> Pesa JobOptions file is = " << m_config->getOption("JOBOPTIONSPATH") );
98 
99  // -----------------------------
100  // Create C++ ApplicationMgr
101  // -----------------------------
102  ERS_DEBUG(1,"---> Create Pesa Application Manager");
103  m_pesaAppMgr = Gaudi::createApplicationMgr();
104  ERS_DEBUG(1,"m_pesaAppMgr = " << m_pesaAppMgr);
105 
106  if( !m_pesaAppMgr ) {
107  ERS_PSC_ERROR("Error while creating the ApplicationMgr");
108  return false;
109  }
110 
111  // configure Pesa
112  ERS_DEBUG(1,"---> Configure Pesa Property Manager");
113  SmartIF<IProperty> propMgr ( m_pesaAppMgr );
114  if( !propMgr.isValid() ) {
115  ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
116  return false;
117  }
118 
119  // Set the JobOptionsSvcType
120  StatusCode sc;
121  sc = propMgr->setProperty( "JobOptionsSvcType", m_config->getOption("JOBOPTIONSSVCTYPE"));
122  if (sc.isFailure()) {
123  ERS_PSC_ERROR("Can not set Property = JobOptionsSvcType");
124  return false;
125  }
126 
127  // Set the JobOptionsType
128  sc = propMgr->setProperty( "JobOptionsType", m_config->getOption("JOBOPTIONSTYPE"));
129  if (sc.isFailure()) {
130  ERS_PSC_ERROR("Can not set Property = JobOptionsType");
131  return false;
132  }
133 
134  // Set JobOptionsPath
135  sc = propMgr->setProperty( "JobOptionsPath", m_config->getOption("JOBOPTIONSPATH") );
136  if (sc.isFailure()) {
137  ERS_PSC_ERROR("Can not set Property = JobOptionsPath");
138  return false;
139  }
140 
141  // Select the MessageSvc Type
142  sc = propMgr->setProperty( "MessageSvcType", m_config->getOption("MESSAGESVCTYPE"));
143  if (sc.isFailure()) {
144  ERS_PSC_ERROR("Can not set Property = MessageSvcType");
145  return false;
146  }
147 
148  // Need to setup the python interpreter if:
149  // 1) run from job options
150  // 2) run from DB but user specified a pre/postcommand
151  // 3) run from DB but user specified a logLevel via "-l" in athenaMT/PT
152  bool jobOptConfig, needPython;
153  if ( m_config->getOption("JOBOPTIONSTYPE") == "NONE" ) {
154  jobOptConfig = needPython = true;
155  }
156  else if ( m_config->getOption("JOBOPTIONSTYPE") == "DB" ||
157  m_config->getOption("JOBOPTIONSTYPE") == "FILE") {
158  jobOptConfig = needPython = false;
159  if ( (m_config->getOption("PRECOMMAND")!="") || (m_config->getOption("POSTCOMMAND")!="") ) {
160  needPython = true;
161  }
162  if (m_config->didUserSetLogLevel()) {
163  needPython = true;
164  }
165  }
166  else {
167  ERS_PSC_ERROR("Unsupported configuration method \"" << m_config->getOption("JOBOPTIONSTYPE") << "\"");
168  return false;
169  }
170 
171 
172  if ( jobOptConfig || needPython ) {
173  // ----------------------------
174  // Configuration with .py files
175  // ----------------------------
176 
177  // Try to initialize the Python interpreter
178  if ( ! Py_IsInitialized() ) {
179  ERS_DEBUG(1,"Initializing Python interpreter");
180 
181  PyConfig config;
182  PyConfig_InitPythonConfig (&config);
183  PyStatus status = PyConfig_SetBytesArgv (&config, System::argc(), System::argv());
184  if (PyStatus_Exception (status)) {
185  PyConfig_Clear (&config);
186  ERS_PSC_ERROR("Error: Python could not be initialized.");
187  return false;
188  }
189  status = Py_InitializeFromConfig (&config);
190  if (PyStatus_Exception (status)) {
191  PyConfig_Clear (&config);
192  ERS_PSC_ERROR("Error: Python could not be initialized.");
193  return false;
194  }
195 
196  /*
197  * The GIL is initialized by Py_Initialize() since Python 3.7."
198  */
199 
200  // check
201  if ( ! Py_IsInitialized() ) {
202  ERS_PSC_ERROR("Error: Python could not be initialized.");
203  return false;
204  }
205 
206  }
207  else {
208  ERS_DEBUG(1,"Python interpreter already initialized");
209  }
210 
211  // Copy Config.optmap to python module
212  // This is how we transfer the options to the python setup script
213  PyObject* pModule = PyImport_ImportModule("TrigPSC.PscConfig");
214  if ( pModule ) {
215  PyObject* optmap = PyObject_GetAttrString(pModule, "optmap");
216  if ( optmap ) {
217  std::map<std::string,std::string>::const_iterator iter;
218  for (iter=m_config->optmap.begin(); iter!=m_config->optmap.end(); ++iter) {
219  PyObject* v = PyUnicode_FromString(iter->second.c_str());
220  std::vector<char> writable(iter->first.size() + 1);
221  std::copy(iter->first.begin(), iter->first.end(), writable.begin());
222  PyMapping_SetItemString(optmap, &writable[0], v);
223  }
224  Py_DECREF(optmap);
225  }
226  else {
227  ERS_DEBUG(1, "Could not import TrigPSC.PscConfig.optmap");
228  }
229  Py_DECREF(pModule);
230  }
231  }
232 
233  if ( jobOptConfig ) {
234  // Do the python setup (including user job options)
235  std::string pyBasicFile = m_config->getOption("PYTHONSETUPFILE") ;
236  if ( !psc::Utils::execFile(pyBasicFile) ) {
237  ERS_PSC_ERROR("Basic Python configuration failed.");
238  return false;
239  }
240  }
241 
242  if ( !jobOptConfig ) {
243  // -----------------------------
244  // Configuration from database
245  // -----------------------------
246 
247  // Run pre-command (you're on your own, no basic python setup)
248  std::string cmd = m_config->getOption("PRECOMMAND");
249  if ( cmd != "" ) {
250  ERS_LOG("Running pre-configure command '" << cmd << "'");
251  if ( !psc::Utils::execPython(cmd) ) {
252  ERS_PSC_ERROR("Pre-configure command failed.");
253  return false;
254  }
255  }
256 
257  // Configure ApplicationMgr
258  ERS_DEBUG(1,"Configure ApplicationMgr from database.");
259  StatusCode sc = m_pesaAppMgr->configure();
260  if( sc.isFailure() ) {
261  ERS_PSC_ERROR("Error while configuring the ApplicationMgr");
262  return false;
263  }
264 
265  // Do the basic python setup if postcommand or logLevel was changed
266  if ( needPython ) {
267  // only used in athenaHLT, but not in partition running
268  std::string pyBasicFile = m_config->getOption("PYTHONSETUPFILE", /*quiet*/true) ;
269  if ( !pyBasicFile.empty() ) {
270  if ( !psc::Utils::execFile(pyBasicFile) ) {
271  ERS_PSC_ERROR("Basic Python configuration failed.");
272  return false;
273  }
274  }
275  }
276  }
277 
278  // Check if running interactively
279  // If yes, delay AppMgr::initialize() until connect
280  if ( needPython ) {
281  PyObject* pModule = PyImport_ImportModule("TrigPSC.PscConfig");
282  if ( pModule ) {
283  PyObject* pInteractive = PyObject_GetAttrString(pModule, "interactive");
284  if ( pInteractive && PyBool_Check(pInteractive) ) {
285  m_interactive = (pInteractive==Py_True);
286  ERS_DEBUG(1, "TrigPSC.PscConfig.interactive = " << m_interactive);
287  Py_DECREF(pInteractive);
288  }
289  else {
290  ERS_DEBUG(1, "Could not read TrigPSC.PscConfig.interactive");
291  }
292  Py_DECREF(pModule);
293  }
294  else {
295  ERS_DEBUG(1, "Could not import TrigPSC.PscConfig");
296  }
297  }
298 
299  if ( m_interactive ) ERS_LOG("Running in interactive mode");
300 
301  ERS_DEBUG(1,"Configured ApplicationMgr in state: " << m_pesaAppMgr->FSMState());
302 
303  ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
304  SmartIF<IProperty> jos_propif{&(*jobOptionSvc)};
305  if(m_config->didUserSetLogLevel())
306  jos_propif->setProperty("OutputLevel", m_config->getLogLevelAsNumStr()).ignore();
307 
308  // Write Data Flow parameters in JobOptions catalogue
309 
310  // map of dataflow parameters to store in the JobOptions Catalogue
311  auto props = std::map<std::string,std::string>{
312  {"DF_PartitionName", "DF_PARTITION_NAME"},
313  {"DF_ApplicationName", "DF_APPLICATIONNAME"},
314  {"DF_MachineName", "DF_MACHINE_NAME"},
315  {"DF_Ppid", "DF_PPID"},
316  {"DF_Pid", "DF_PID"},
317  {"DF_HostId", "DF_HOST_ID"},
318  {"DF_RandomSeed", "DF_RANDOM_SEED"},
319  {"DF_NumberOfWorkers", "NPROCS"}
320  };
321  if(!setDFProperties(props))
322  return false;
323 
324  // Write list of configured ROB IDs into the JobOptions Catalogue
325  if ( m_config->enabled_robs.size() != 0 ) {
326  jobOptionSvc->set("DataFlowConfig.DF_Enabled_ROB_IDs",
327  Gaudi::Utils::toString<std::vector<uint32_t>>(m_config->enabled_robs));
328  ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for enabled ROBs in JobOptions Catalogue: "
329  <<" number of ROB IDs read from OKS = " << m_config->enabled_robs.size());
330  }
331 
332  // Write list of configured Sub Det IDs into the JobOptions Catalogue
333  if ( m_config->enabled_SubDets.size() != 0 ) {
334  jobOptionSvc->set("DataFlowConfig.DF_Enabled_SubDet_IDs",
335  Gaudi::Utils::toString<std::vector<uint32_t>>(m_config->enabled_SubDets));
336  ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for enabled sub detectors in JobOptions Catalogue: "
337  <<" number of Sub Det IDs read from OKS = " << m_config->enabled_SubDets.size());
338  }
339 
340  // Write the maximum HLT output size into the JobOptions Catalogue
341  if (std::string opt = m_config->getOption("MAXEVENTSIZEMB"); !opt.empty()) {
342  jobOptionSvc->set("DataFlowConfig.DF_MaxEventSizeMB", opt);
343  ERS_DEBUG(1,"psc::Psc::configure: Wrote DF_MaxEventSizeMB=" << opt << " in JobOptions Catalogue");
344  }
345 
346  // Write configuration for HLT muon calibration infrastructure in JobOptions catalogue
347  if ( (m_config->getOption("MUONCALBUFFERNAME") != "NONE") && (m_config->getOption("MUONCALBUFFERNAME") != "") ) {
348  jobOptionSvc->set("MuonHltCalibrationConfig.MuonCalBufferName", m_config->getOption("MUONCALBUFFERNAME"));
349  jobOptionSvc->set("MuonHltCalibrationConfig.MuonCalBufferSize", m_config->getOption("MUONCALBUFFERSIZE"));
350 
351  ERS_DEBUG(1,"psc::Psc::configure: Wrote configuration for HLT Muon Calibration in JobOptions Catalogue: "
352  <<" MuonCalBufferName = " << m_config->getOption("MUONCALBUFFERNAME")
353  <<" MuonCalBufferSize = " << m_config->getOption("MUONCALBUFFERSIZE") );
354  }
355 
356  // Write configuration specific to athena (HltEventLoopMgr)
357  if(!setAthenaProperties()) return false;
358 
359  if ( !jobOptConfig ) {
360  // Run post-command
361  std::string cmd = m_config->getOption("POSTCOMMAND");
362  if ( cmd != "" ) {
363  ERS_LOG("Running post-configure command '" << cmd << "'");
364  if ( !psc::Utils::execPython(cmd) ) {
365  ERS_PSC_ERROR("Post-configure command failed.");
366  return false;
367  }
368  }
369  }
370 
371  if ( !m_interactive ) {
372  if ( !doAppMgrInit() ) return false;
373  }
374 
375  return true;
376 }
377 
378 
380 {
381  // Initialize the application manager
382  StatusCode sc = m_pesaAppMgr->initialize();
383  ERS_DEBUG(1,"Initialize ApplicationMgr : " << m_pesaAppMgr->FSMState() <<
384  ", Status = " << sc.getCode());
385  if( sc.isFailure() ) {
386  ERS_PSC_ERROR("Error while initializing the ApplicationMgr");
387  return false;
388  }
389 
390  // Handle to service locator
391  m_svcLoc = SmartIF<ISvcLocator>( m_pesaAppMgr );
392  if ( !m_svcLoc.isValid() ) {
393  ERS_PSC_ERROR("Error retrieving Service Locator:") ;
394  return false;
395  }
396 
397  SmartIF<IProperty> propMgr ( m_pesaAppMgr );
398  if( !propMgr.isValid() ) {
399  ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
400  return false;
401  }
402 
403  // Find the basename for the EventLoopMgr
404  std::string value ;
405  sc = propMgr->getProperty( "EventLoop", value );
406  if( sc.isFailure() ) {
407  ERS_PSC_ERROR("Error while retrieving Property \'EventLoop\'.");
408  } else {
409  m_nameEventLoopMgr.assign(value, value.find_first_of("\"")+1,
410  value.find_last_of("\"")-value.find_first_of("\"")-1) ;
411  }
412 
413  return true;
414 }
415 
416 
417 //--------------------------------------------------------------------------------
418 // Connect transition
419 //--------------------------------------------------------------------------------
420 
421 bool psc::Psc::connect (const ptree& /*args*/)
422 {
423  psc::Utils::ScopeTimer timer("Psc connect");
424 
425  if ( m_interactive ) {
426  if ( !doAppMgrInit() ) return false;
427  }
428  return true;
429 }
430 
431 
432 //--------------------------------------------------------------------------------
433 // PrepareForRun transition
434 //--------------------------------------------------------------------------------
435 
437 {
438  psc::Utils::ScopeTimer timer("Psc prepareForRun");
439 
440  ERS_DEBUG(1, "psc::Psc::prepareForRun ptree:\n" << to_string(args));
441 
442  try
443  {
444  // save run number
445  m_run_number = args.get<uint32_t>("RunParams.run_number");
446  }
447  catch(const ptree_bad_path & e)
448  {
449  ERS_PSC_ERROR("Bad ptree path: \"" << e.path<ptree::path_type>().dump()
450  << "\" - " << e.what())
451  return false;
452  }
453  catch(const ptree_bad_data & e)
454  {
455  ERS_PSC_ERROR("Bad ptree data: \"" << e.data<ptree::data_type>() << "\" - "
456  << e.what())
457  return false;
458  }
459 
460  // Initializations needed for start()
461  if(!callOnEventLoopMgr<ITrigEventLoopMgr>([&args](ITrigEventLoopMgr* mgr) {return mgr->prepareForStart(args);},
462  "prepareForStart").isSuccess()) {
463  return false;
464  }
465 
466  // Start ApplicationMgr (which starts all services including EventLoopMgr)
467  StatusCode sc = m_pesaAppMgr->start();
468  if( !sc.isSuccess() ) {
469  ERS_PSC_ERROR("Error executing ApplicationMgr::start() command ");
470  return false;
471  }
472 
473  // bind args to prepareForRun
474  auto prep = [&args](ITrigEventLoopMgr* mgr) {
475  // FIXME: ITrigEventLookMgr::prepareForRun is declared NOT_THREAD_SAFE.
476  // Probably this method shoud also be NOT_THREAD_SAFE, but that's
477  // awkward because it implements a tdaq interface from hltinterface.
478  StatusCode ret ATLAS_THREAD_SAFE = mgr->prepareForRun (args);
479 
480  // This dance is needed to prevent RV optimization.
481  // Otherwise, the optimizer loses the ATLAS_THREAD_SAFE attribute
482  // on RET before the thread-safety checker gets to see the code.
483  if (ret.isSuccess()) {
484  return StatusCode (StatusCode::SUCCESS);
485  }
486  return ret;
487  };
488  if(!callOnEventLoopMgr<ITrigEventLoopMgr>(prep, "prepareForRun").isSuccess()) {
489  return false;
490  }
491 
492  if ( Py_IsInitialized() ) {
493  // After prepareForRun the HLTMPPU will fork the workers. Tell the Python
494  // interpreter this is happening to avoid deadlocks (ATR-23428).
495  ERS_DEBUG(1, "Pre-fork initialization of Python interpreter");
496  PyOS_BeforeFork();
497  }
498 
499  return true;
500 }
501 
502 
503 //--------------------------------------------------------------------------------
504 // StopRun transition
505 //--------------------------------------------------------------------------------
506 
507 bool psc::Psc::stopRun (const ptree& /*args*/)
508 {
509  psc::Utils::ScopeTimer timer("Psc stopRun");
510 
511  if(!callOnEventLoopMgr<IService>(&IService::sysStop, "sysStop").isSuccess())
512  {
513  return false;
514  }
515 
516  // Stop ApplicationMgr and all services.
517  // The EventLoopMgr is already stopped by above code but that's fine.
518  auto sc = m_pesaAppMgr->stop();
519  if ( !sc.isSuccess() ) {
520  ERS_PSC_ERROR("Error executing stop(void) for ApplicationMgr");
521  return false;
522  }
523 
524  // Workers: store histograms at the end of stop as the children may not
525  // go through finalize with SkipFinalizeWorker=1.
526  if (m_workerID != 0) {
527  SmartIF<IService> histsvc = m_svcLoc->service("THistSvc", /*createIf=*/ false);
528  if (histsvc.isValid()) {
529  ERS_LOG("Finalize THistSvc to save histograms");
530  if (histsvc->finalize().isFailure()) {
531  ERS_PSC_ERROR("Error executing finalize for THistSvc");
532  }
533  }
534  }
535 
536 
537  return true;
538 }
539 
541 {
542  if (m_pesaAppMgr==nullptr) return true; // already finalized
543 
544  // Finalize the application manager
545  StatusCode sc = m_pesaAppMgr->finalize();
546  ERS_DEBUG(1,"Finalize ApplicationMgr: " << m_pesaAppMgr->FSMState()
547  << ". Status : " << sc.getCode());
548 
549  if( sc.isFailure() ) {
550  ERS_PSC_ERROR("Error while finalizing the ApplicationMgr.");
551  return false;
552  }
553 
554  // Terminate the application manager
555  sc = m_pesaAppMgr->terminate();
556  ERS_DEBUG(1,"Terminate ApplicationMgr: " << m_pesaAppMgr->FSMState()
557  << ". Status : " << sc.getCode());
558 
559  if ( sc.isFailure() ) {
560  ERS_PSC_ERROR("Error while terminating the ApplicationMgr.");
561  return false;
562  }
563 
564  // Make sure we get a new instance the next time
565  Gaudi::setInstance(static_cast<IAppMgrUI*>(nullptr));
566  m_pesaAppMgr = nullptr;
567 
568  return true;
569 }
570 
571 //--------------------------------------------------------------------------------
572 // Disconnect transition
573 //--------------------------------------------------------------------------------
574 
575 bool psc::Psc::disconnect (const ptree& /*args*/)
576 {
577  psc::Utils::ScopeTimer timer("Psc disconnect");
578  return doAppMgrFinalize();
579 }
580 
581 
582 //--------------------------------------------------------------------------------
583 // Unconfigure transition
584 //--------------------------------------------------------------------------------
585 
586 bool psc::Psc::unconfigure (const ptree& /*args*/)
587 {
588  psc::Utils::ScopeTimer timer("Psc unconfigure");
589  return true;
590 }
591 
592 
593 //--------------------------------------------------------------------------------
594 // Publish statistics
595 //--------------------------------------------------------------------------------
596 
597 bool psc::Psc::publishStatistics (const ptree& /*args*/)
598 {
599  psc::Utils::ScopeTimer timer("Psc publishStatistics");
600  return true;
601 }
602 
603 
604 //--------------------------------------------------------------------------------
605 // User command can be sent via:
606 // partition: rc_sendcommand -p [PART] -n [APP] USER command args
607 // But they are no longer supported because they don't propagate from mother to child
608 //--------------------------------------------------------------------------------
610 {
611  ERS_DEBUG(1, "psc::Psc::hltUserCommand ptree:\n" << to_string(args));
612 
613  // Default if no action on command
614  return true;
615 }
616 
618 {
619  ERS_LOG("psc::Psc::doEventLoop: start of doEventLoop()");
620  StatusCode sc;
621  try
622  {
623  // bind maxevt=-1 (meaning all events) to executeRun
624  auto exec = [](IEventProcessor * mgr)
625  {return mgr->executeRun(-1);};
626  sc = callOnEventLoopMgr<IEventProcessor>(exec, "executeRun");
627  }
628  catch(const ers::Issue& e)
629  {
630  ERS_PSC_ERROR("Caught an unexpected ers::Issue: '" << e.what() << "'");
631  sc = StatusCode::FAILURE;
632  }
633  catch(const std::exception& e)
634  {
635  ERS_PSC_ERROR("Caught an unexpected std::exception: '" << e.what() << "'");
636  sc = StatusCode::FAILURE;
637  }
638  catch(...)
639  {
640  ERS_PSC_ERROR("Caught an unknown exception");
641  sc = StatusCode::FAILURE;
642  }
643  if (sc.isFailure()) {
644  ERS_PSC_ERROR("psc::Psc::doEventLoop failed");
645  return false;
646  }
647  ERS_LOG("psc::Psc::doEventLoop: end of doEventLoop()");
648  return true;
649 }
650 
652 {
653  psc::Utils::ScopeTimer timer("Psc prepareWorker");
654 
655  ROOT::EnableThreadSafety();
656  if ( Py_IsInitialized() ) {
657  ERS_DEBUG(1, "Post-fork initialization of Python interpreter");
658  PyOS_AfterFork_Child();
659 
660  /* Release the Python GIL (which we inherited from the mother)
661  to avoid dead-locking on the first call to Python. Only relevant
662  if Python is initialized and Python-based algorithms are used. */
663  ERS_DEBUG(1, "Releasing Python GIL");
664  PyEval_SaveThread();
665  }
666 
667  m_workerID = args.get<int>("workerId");
668 
669  ERS_LOG("Individualizing DF properties");
670  m_config->prepareWorker(args);
671 
672  if (!setDFProperties({{"DF_Pid", "DF_PID"},
673  {"DF_Ppid", "DF_PPID"},
674  {"DF_RandomSeed", "DF_RANDOM_SEED"},
675  {"DF_WorkerId", "DF_WORKER_ID"},
676  {"DF_NumberOfWorkers", "DF_NUMBER_OF_WORKERS"},
677  {"DF_ApplicationName", "DF_APPLICATIONNAME"}})
678  ) return false;
679 
680  // bind args to hltUpdateAfterFork
681  auto upd = [&args](ITrigEventLoopMgr * mgr)
682  {return mgr->hltUpdateAfterFork(args);};
683  if(!callOnEventLoopMgr<ITrigEventLoopMgr>(upd, "hltUpdateAfterFork").isSuccess())
684  {
685  return false;
686  }
687 
688  return true;
689 }
690 
692 {
693  psc::Utils::ScopeTimer timer("Psc finalizeWorker");
694  return doAppMgrFinalize();
695 }
696 
697 bool psc::Psc::setDFProperties(const std::map<std::string, std::string>& name_tr_table)
698 {
699  ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
700  for(const auto& prop : name_tr_table)
701  {
702  const auto& val = m_config->getOption(prop.second);
703  jobOptionSvc->set("DataFlowConfig."+prop.first, val);
704  ERS_DEBUG(0,"Wrote configuration for Data Flow in JobOptions Catalogue: "
705  << prop.first << " = " << val);
706  }
707 
708  return true;
709 }
710 
712  // Use the IProperty interface of the ApplicationManager to find the EventLoopMgr name
713  SmartIF<IProperty> propMgr(m_pesaAppMgr);
714  if (!propMgr.isValid()) {
715  ERS_PSC_ERROR("Error retrieving IProperty interface of ApplicationMgr");
716  return false;
717  }
718  std::string eventLoopMgrName;
719  if (propMgr->getProperty("EventLoop", eventLoopMgrName).isFailure()) {
720  ERS_PSC_ERROR("Error while retrieving the property ApplicationMgr.EventLoop");
721  return false;
722  }
723 
724  // Use the JobOptionsSvc to write athena-specific options in JobOptions Catalogue of EventLoopMgr
725  ServiceHandle<Gaudi::Interfaces::IOptionsSvc> jobOptionSvc("JobOptionsSvc","psc::Psc");
726 
727  std::string opt = m_config->getOption("HARDTIMEOUT");
728  if (!opt.empty()) {
729  jobOptionSvc->set(eventLoopMgrName+".HardTimeout", opt);
730  }
731  else {
732  ERS_PSC_ERROR("Failed to get the HARDTIMEOUT property from the configuration tree");
733  return false;
734  }
735 
736  opt = m_config->getOption("SOFTTIMEOUTFRACTION");
737  if (!opt.empty()) {
738  jobOptionSvc->set(eventLoopMgrName+".SoftTimeoutFraction", opt);
739  }
740  else {
741  ERS_PSC_ERROR("Failed to get the SOFTTIMEOUTFRACTION property from the configuration tree");
742  return false;
743  }
744 
745  /* The names "EventDataSvc" and "AvalancheSchedulerSvc" are hard-coded below, because it is not possible
746  to retrieve them from HltEventLoopMgr properties before it is initialised. Here we need to set the properties
747  before the services are initialised." */
748 
749  opt = m_config->getOption("NEVENTSLOTS");
750  if (!opt.empty()) {
751  jobOptionSvc->set("EventDataSvc.NSlots", opt);
752  }
753  else {
754  ERS_PSC_ERROR("Failed to get the NEVENTSLOTS property from the configuration tree");
755  return false;
756  }
757 
758  opt = m_config->getOption("NTHREADS");
759  if (!opt.empty()) {
760  jobOptionSvc->set("AvalancheSchedulerSvc.ThreadPoolSize", opt);
761  }
762  else {
763  ERS_PSC_ERROR("Failed to get the NTHREADS property from the configuration tree");
764  return false;
765  }
766 
767  return true;
768 }
769 
770 
771 template <typename T>
773  const std::string& name) const
774 {
775  SmartIF<T> processingMgr{m_svcLoc->service(m_nameEventLoopMgr)};
776  if(!processingMgr) {
777  ERS_PSC_ERROR("Error retrieving EventLoopMgr = '" << m_nameEventLoopMgr << "'" );
778  return StatusCode::FAILURE;
779  }
780 
781  // Call the given function of the EventLoopMgr
782  StatusCode sc = func(processingMgr); // processingMgr->func()
783  if(!sc.isSuccess())
784  {
785  ERS_PSC_ERROR("Error executing " << name << " for EventLoopMgr = '"
786  << m_nameEventLoopMgr << "'") ;
787  return sc;
788  }
789 
790  return sc;
791 }
PscIssues.h
ERS Issues for PSC.
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
make_coralServer_rep.opt
opt
Definition: make_coralServer_rep.py:19
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
psc::Psc::unconfigure
virtual bool unconfigure(const boost::property_tree::ptree &args) override
Unconfigures the framework, releasing all acquired resources.
Definition: Psc.cxx:586
psc::Psc::doEventLoop
virtual bool doEventLoop() override
Starts the HLT event loop.
Definition: Psc.cxx:617
Issue
Configuration Issue
Definition: PscIssues.h:31
rerun_display.cmd
string cmd
Definition: rerun_display.py:67
psc::Psc::stopRun
virtual bool stopRun(const boost::property_tree::ptree &args) override
stops the HLT framework without re-configuring
Definition: Psc.cxx:507
python.base_data.config
config
Definition: base_data.py:21
psc::Psc::connect
virtual bool connect(const boost::property_tree::ptree &args) override
Connects the framework.
Definition: Psc.cxx:421
athena.value
value
Definition: athena.py:124
psc::Psc::callOnEventLoopMgr
StatusCode callOnEventLoopMgr(std::function< StatusCode(T *)> func, const std::string &name) const
Utility method to call a method on the event loop manager.
Definition: Psc.cxx:772
psc::Psc::setDFProperties
bool setDFProperties(const std::map< std::string, std::string > &name_tr_table)
Definition: Psc.cxx:697
ITrigEventLoopMgr.h
psc::Psc::doAppMgrFinalize
bool doAppMgrFinalize()
Finalize the application manager.
Definition: Psc.cxx:540
config
Definition: PhysicsAnalysis/AnalysisCommon/AssociationUtils/python/config.py:1
python.utils.AtlRunQueryTimer.timer
def timer(name, disabled=False)
Definition: AtlRunQueryTimer.py:86
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
BchCleanup.mgr
mgr
Definition: BchCleanup.py:294
ERS_PSC_ERROR
#define ERS_PSC_ERROR(message)
Definition: PscIssues.h:51
psc::Psc::prepareForRun
virtual bool prepareForRun(const boost::property_tree::ptree &args) override
prepares the HLT framework for a run
Definition: Psc.cxx:436
ITrigEventLoopMgr
EventLoopMgr interface implemented by the HLT event loop manager.
Definition: ITrigEventLoopMgr.h:19
python.utils.AtlRunQueryDQUtils.p
p
Definition: AtlRunQueryDQUtils.py:210
Amg::toString
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
Definition: GeoPrimitivesToStringConverter.h:40
Psc.h
HLT Pesa Steering Controller.
LArCellNtuple.argv
argv
Definition: LArCellNtuple.py:152
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
python.TrigInDetValidation_AODtoTrkNtuple_CA.histsvc
histsvc
TrigInDetMonitoring part ################################.
Definition: TrigInDetValidation_AODtoTrkNtuple_CA.py:55
psc::Psc::publishStatistics
virtual bool publishStatistics(const boost::property_tree::ptree &args) override
Calls the HLT framework to publish statistics, after the run has finished.
Definition: Psc.cxx:597
psc::Psc::configure
virtual bool configure(const boost::property_tree::ptree &config) override
Configures the framework.
Definition: Psc.cxx:78
calibdata.exception
exception
Definition: calibdata.py:496
DQHistogramMergeRegExp.argc
argc
Definition: DQHistogramMergeRegExp.py:20
psc::Utils::execPython
bool execPython(const std::string &pyCmd)
Execute a python command in the python interpreter.
Definition: HLT/Trigger/TrigControl/TrigPSC/src/Utils.cxx:24
psc::Psc::disconnect
virtual bool disconnect(const boost::property_tree::ptree &args) override
Disconnects the framework.
Definition: Psc.cxx:575
ptree
boost::property_tree::ptree ptree
Definition: JsonFileLoader.cxx:16
psc::Psc::setAthenaProperties
bool setAthenaProperties()
Definition: Psc.cxx:711
psc::Psc::prepareWorker
virtual bool prepareWorker(const boost::property_tree::ptree &args) override
Method which can be called for a worker to perform the necessary steps to set unique worker IDs and a...
Definition: Psc.cxx:651
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
python.PyAthena.v
v
Definition: PyAthena.py:154
psc::Psc::doAppMgrInit
bool doAppMgrInit()
Initialize the application manager.
Definition: Psc.cxx:379
psc::Psc::finalizeWorker
virtual bool finalizeWorker(const boost::property_tree::ptree &args) override
Method which can be called for a worker to perform a cleanup before the worker gets killed.
Definition: Psc.cxx:691
psc::Psc::hltUserCommand
virtual bool hltUserCommand(const boost::property_tree::ptree &args) override
Calls the HLT framework to notify it that a user command has arrived.
Definition: Psc.cxx:609
Pythia8_RapidityOrderMPI.val
val
Definition: Pythia8_RapidityOrderMPI.py:14
psc::Utils::ScopeTimer
Very simple timer class.
Definition: HLT/Trigger/TrigControl/TrigPSC/TrigPSC/Utils.h:56
python.PscConfig.optmap
dictionary optmap
string:string map equivalent to TrigPsc/Config.h (filled in TrigPsc.cxx)
Definition: PscConfig.py:20
merge.status
status
Definition: merge.py:17
calibdata.copy
bool copy
Definition: calibdata.py:27
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
Utils.h
Some helpers for the PSC.
checker_macros.h
Define macros for attributes used to control the static checker.
psc::Utils::execFile
bool execFile(const std::string &pyFileName)
Execute a python file (via include or import)
Definition: HLT/Trigger/TrigControl/TrigPSC/src/Utils.cxx:54
PyObject
_object PyObject
Definition: IPyComponent.h:26
psc::Psc::~Psc
virtual ~Psc()
Virtual desctuctor.
Definition: Psc.cxx:67
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
TSU::T
unsigned long long T
Definition: L1TopoDataTypes.h:35
ServiceHandle< Gaudi::Interfaces::IOptionsSvc >