ATLAS Offline Software
AthMpEvtLoopMgr.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "AthMpEvtLoopMgr.h"
6 
10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/IConversionSvc.h"
13 #include "GaudiKernel/Incident.h"
14 #include "GaudiKernel/ServiceHandle.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IIoComponent.h"
17 #include "GaudiKernel/ConcurrencyFlags.h"
18 #include "StoreGate/StoreGateSvc.h"
19 
20 #include <sys/stat.h>
21 #include <sys/wait.h>
22 #include <errno.h>
23 #include <ctime>
24 #include <fcntl.h>
25 #include <sstream>
26 #include <fstream>
27 #include <cstdlib>
28 #include <string>
29 #include <time.h>
30 #include <chrono>
31 #include <algorithm>
32 #include <functional>
33 
34 #include <filesystem>
35 
37 {
38  int getPss(pid_t, unsigned long&, unsigned long&, unsigned long&, unsigned long&, bool verbose=false);
39 }
40 
42  , ISvcLocator* svcLocator)
43  : AthService(name,svcLocator)
44  , m_evtProcessor("AthenaEventLoopMgr", name)
45  , m_evtSelector(nullptr)
46  , m_nWorkers(0)
47  , m_workerTopDir("athenaMP_workers")
48  , m_outputReportName("AthenaMPOutputs")
49  , m_strategy("")
50  , m_isPileup(false)
51  , m_collectSubprocessLogs(false)
52  , m_tools(this)
53  , m_nChildProcesses(0)
54  , m_nPollingInterval(100) // 0.1 second
55  , m_nMemSamplingInterval(0) // no sampling by default
56  , m_nEventsBeforeFork(0)
57  , m_eventPrintoutInterval(1)
58  , m_execAtPreFork()
59  , m_masterPid(getpid())
60 {
61  declareProperty("NWorkers",m_nWorkers);
62  declareProperty("WorkerTopDir",m_workerTopDir);
63  declareProperty("OutputReportFile",m_outputReportName);
64  declareProperty("Strategy",m_strategy);
65  declareProperty("IsPileup",m_isPileup);
66  declareProperty("CollectSubprocessLogs",m_collectSubprocessLogs);
67  declareProperty("Tools",m_tools);
68  declareProperty("PollingInterval",m_nPollingInterval);
69  declareProperty("MemSamplingInterval",m_nMemSamplingInterval);
70  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
71  declareProperty("EventPrintoutInterval",m_eventPrintoutInterval);
72  declareProperty("ExecAtPreFork", m_execAtPreFork);
73 }
74 
76 {
77 }
78 
80 {
81  ATH_MSG_DEBUG("in initialize() ... ");
82 
83  Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
84 
85  SmartIF<IProperty> prpMgr(serviceLocator());
86  if(!prpMgr.isValid()) {
87  ATH_MSG_ERROR("Failed to get hold of the Property Manager");
88  return StatusCode::FAILURE;
89  }
90 
91  std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
92  m_evtSelector = serviceLocator()->service(evtSelName);
93  ATH_CHECK(m_evtSelector.isValid());
94 
95  if(m_strategy=="EventService") {
96  // ES with non-zero events before forking makes no sense
97  if(m_nEventsBeforeFork!=0) {
98  ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
99  return StatusCode::FAILURE;
100  }
101 
102  // We need to ignore SkipEvents in ES
103  if(updateSkipEvents(0).isFailure()) {
104  ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
105  return StatusCode::FAILURE;
106  }
107  }
108 
109  if(m_isPileup) {
110  m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
111  ATH_MSG_INFO("ELM: The job running in pileup mode");
112  }
113  else {
114  ATH_MSG_INFO("ELM: The job running in non-pileup mode");
115  }
116 
117  ATH_CHECK(m_evtProcessor.retrieve());
118  if(!m_isPileup) {
119  SmartIF<IProperty> propertyServer(m_evtProcessor.get());
120  if(propertyServer) {
121  if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
122  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
123  }
124  if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
125  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
126  }
127  }
128  else {
129  ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
130  }
131  }
132  ATH_CHECK(m_tools.retrieve());
133 
134  return StatusCode::SUCCESS;
135 }
136 
138 {
139  return StatusCode::SUCCESS;
140 }
141 
143  void** ppvInterface)
144 {
145  if(IEventProcessor::interfaceID().versionMatch(riid)) {
146  *ppvInterface = (IEventProcessor*)this;
147  addRef();
148  return StatusCode::SUCCESS;
149  }
150  else {
151  // Interface is not directly available: try out a base class
152  return AthService::queryInterface(riid, ppvInterface);
153  }
154 }
155 
157 {
158  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
159  return m_evtProcessor->nextEvent(maxevt);
160 }
161 
163  // return an invalid context - method should not be called
164  return EventContext{};
165 }
166 
168 {
169  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
170  return m_evtProcessor->executeEvent(std::move(ctx));
171 }
172 
174 {
175  ATH_MSG_DEBUG("in executeRun()");
176 
177  // Generate random component of the Shared Memory and Shared Queue names
178  srand(time(NULL));
179  std::ostringstream randStream;
180  randStream << getpid() << '_' << AthenaInterprocess::randString();
181  ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
182 
183  ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
184  ATH_CHECK(pDetStore.retrieve());
185 
186  // Create Shared Event queue if necessary and make it available to the tools
187  if(m_strategy=="SharedQueue"
188  || m_strategy=="RoundRobin") {
189  AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
190  if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
191  ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
192  delete evtQueue;
193  return StatusCode::FAILURE;
194  }
195  }
196 
197  // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
198  // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
199  if(m_strategy=="EventService") {
200  AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
201  if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
202  ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
203  delete failedPidQueue;
204  return StatusCode::FAILURE;
205  }
206  }
207 
208  // Prepare work directory for sub-processes
209  if(mkdir(m_workerTopDir.c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
210  switch(errno) {
211  case EEXIST:
212  {
213  // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
214  // Rename it with m_workerTopDir+"-bak-rand"
215 
216  srand((unsigned)time(0));
217  std::ostringstream randname;
218  randname << rand();
219  std::string backupDir = (m_workerTopDir.rfind('/')==(m_workerTopDir.size()-1)?m_workerTopDir.substr(0,m_workerTopDir.size()-1):m_workerTopDir)+std::string("-bak-")+randname.str();
220 
221  ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
222  ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
223 
224  if(rename(m_workerTopDir.c_str(),backupDir.c_str())!=0) {
225  char buf[256];
226  strerror_r(errno, buf, sizeof(buf));
227  ATH_MSG_ERROR("Unable to make backup directory! " << buf);
228  return StatusCode::FAILURE;
229  }
230 
231  if(mkdir(m_workerTopDir.c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0)
232  break;
233  }
234  /* FALLTHROUGH */
235  default:
236  {
237  char buf[256];
238  strerror_r(errno, buf, sizeof(buf));
239  ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
240  return StatusCode::FAILURE;
241  }
242  }
243  }
244 
245  // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
246  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
247  ATH_CHECK(incSvc.retrieve());
248 
249  if(m_nEventsBeforeFork==0 && !m_isPileup) {
250  incSvc->fireIncident(Incident(name(),"BeforeFork"));
251  }
252 
253  // Extract process file descriptors
254  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
255 
257  itLast = m_tools.end();
258 
259  // When using SharedWriter in conjunction with fork-after-N-events
260  // we have to make sure that mother process is a conversion service
261  // client so that events before forking workers are captured...
262 
263  SmartIF<IDataShare> dataShare{serviceLocator()->service("AthenaPoolCnvSvc")};
264  ATH_CHECK(dataShare.isValid());
265 
266  auto sharedWriterTool = m_tools["SharedWriterTool"];
267  const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
268 
269  if(sharedWriterWithFAFE) {
270  (*sharedWriterTool)->useFdsRegistry(registry);
271  (*sharedWriterTool)->setRandString(randStream.str());
272 
273  int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
274  if(nChildren==-1) {
275  ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
276  return StatusCode::FAILURE;
277  }
278  else {
279  m_nChildProcesses+=nChildren;
280  }
281 
282  // Execute the SharedWriterTool at this point
283  StatusCode mySc = (*sharedWriterTool)->exec();
284  if(!mySc.isSuccess()) {
285  ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
286  return StatusCode::FAILURE;
287  }
288 
289  // Make the mother process a client
290  if(!dataShare->makeClient(m_nWorkers+1).isSuccess()) {
291  ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
292  return StatusCode::FAILURE;
293  }
294  }
295 
296  //
297  // Try processing requested number of events here
298  if(m_nEventsBeforeFork) {
299  // Take into account a corner case: m_nEventsBeforeFork > maxevt
300  int nEventsToProcess = ((maxevt>-1 && m_nEventsBeforeFork>maxevt)?maxevt:m_nEventsBeforeFork);
301  StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
302  if(!scEvtProc.isSuccess()) {
303  if(nEventsToProcess)
304  ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
305  else
306  ATH_MSG_FATAL("Unable to process first event in the master");
307  return scEvtProc;
308  }
309  }
310 
311  // Finalize I/O (close input files) by IoComponents
312  ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
313  ATH_CHECK(ioMgr.retrieve());
314  ATH_CHECK(ioMgr->io_finalize());
315  ATH_MSG_DEBUG("Successfully finalized I/O before forking");
316 
317  // Flush stream buffers
318  fflush(NULL);
319 
320  // Make the mother process not client
321  if(sharedWriterWithFAFE && !dataShare->makeClient(0).isSuccess()) {
322  ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
323  return StatusCode::FAILURE;
324  }
325 
326  int maxEvents(maxevt); // This can be modified after restart
327 
328  // Re-extract process file descriptors
329  registry = extractFds();
330 
331  // Make worker pools
332  it = m_tools.begin();
333  for(; it!=itLast; ++it) {
334  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
335  (*it)->useFdsRegistry(registry);
336  (*it)->setRandString(randStream.str());
337  if(it==m_tools.begin()) {
338  incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
339  }
340  int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
341  if(nChildren==-1) {
342  ATH_MSG_FATAL("makePool failed for " << (*it)->name());
343  return StatusCode::FAILURE;
344  }
345  else {
346  m_nChildProcesses+=nChildren;
347  }
348  }
349 
350  if(m_nChildProcesses==0) {
351  ATH_MSG_ERROR("No child processes were created");
352  return StatusCode::FAILURE;
353  }
354 
355  // Assign work to child processes
356  for(it=m_tools.begin(); it!=itLast; ++it) {
357  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
358  if((*it)->exec().isFailure()) {
359  ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
360  return StatusCode::FAILURE;
361  }
362  }
363 
364  StatusCode sc = wait();
365 
366  if(m_nMemSamplingInterval>0) {
367  ATH_MSG_INFO("*** *** Memory Usage *** ***");
368  ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
369  ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
370  ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
371  ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
372  ATH_MSG_INFO("*** *** Memory Usage *** ***");
373  }
374 
376  ATH_MSG_INFO("BEGIN collecting sub-process logs");
377  std::vector<std::string> logs;
378  for(it=m_tools.begin(); it!=itLast; ++it) {
379  (*it)->subProcessLogs(logs);
380  for(size_t i=0;i<logs.size();++i) {
381  std::cout << "\n File: " << logs[i] << "\n" << std::endl;
382  std::ifstream log;
383  log.open(logs[i].c_str(),std::ifstream::in);
384  std::string line;
385  while(!log.eof()) {
386  std::getline(log,line);
387  std::cout << line << std::endl;
388  }
389  log.close();
390  }
391  }
392  ATH_MSG_INFO("END collecting sub-process logs");
393  }
394 
395  if(sc.isSuccess())
396  return generateOutputReport();
397  else
398  return sc;
399 }
400 
402 {
403  return m_evtProcessor->stopRun();
404 }
405 
406 // !!! NB !!!
407 //
408 // Here we rely on fact that if master process finds that one of
409 // its sub-processes finished abnormally (either signal or non-zero exit code)
410 // it will stop waiting for other sub-processes and proceed with its finalization.
411 // Once master process exits the remaining sub-processes will receive SIGHUP and exit too.
412 //
413 // We could also change the behavior and broadcast termination signal on all remaining
414 // sub-processes once a problematic sub-process has been identified
415 //
417 {
418  ATH_MSG_INFO("Waiting for sub-processes");
420  itLast = m_tools.end();
421  pid_t pid(0);
422  bool all_ok(true);
423 
424  auto memMonTime = std::chrono::system_clock::now();
425 
426  while(m_nChildProcesses>0) {
427  for(it = m_tools.begin(); it!=itLast; ++it) {
428  if((*it)->wait_once(pid).isFailure()) {
429  all_ok = false;
430  ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
431  break;
432  }
433  else {
434  if(pid>0) m_nChildProcesses -= 1;
435  }
436  }
437  if(!all_ok) break;
438 
439  usleep(m_nPollingInterval*1000);
440 
441  if(m_nMemSamplingInterval>0) {
442  auto currTime = std::chrono::system_clock::now();
443  if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
444  unsigned long size(0);
445  unsigned long rss(0);
446  unsigned long pss(0);
447  unsigned long swap(0);
448 
449  if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
450  ATH_MSG_WARNING("Unable to get memory sample");
451  else {
452  m_samplesRss.push_back(rss);
453  m_samplesPss.push_back(pss);
454  m_samplesSize.push_back(size);
455  m_samplesSwap.push_back(swap);
456  }
457  memMonTime=currTime;
458  }
459  }
460  }
461 
462  for(it=m_tools.begin(); it!=itLast; ++it)
463  (*it)->reportSubprocessStatuses();
464 
465  if(!all_ok) {
466  for(it=m_tools.begin(); it!=itLast; ++it)
467  (*it)->killChildren();
468  }
469 
470  return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
471 }
472 
474 {
475  // Loop over tools, collect their output reports and put them all together into a single file.
476  // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
477 
478  std::ofstream ofs;
479  ofs.open(m_outputReportName.c_str());
480  if(!ofs) {
481  ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
482  return StatusCode::FAILURE;
483  }
484  else {
485  std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
486 
488  itLast = m_tools.end();
489  for(it=m_tools.begin(); it!=itLast; ++it)
490  allptrs.push_back((*it)->generateOutputReport());
491 
492  // First collect keys=file_names from all tools
493  std::set<std::string> allkeys;
494  for(size_t i=0; i<allptrs.size(); ++i) {
495  AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
496  it_wosLast = allptrs[i]->end();
497  for(;it_wos!=it_wosLast;++it_wos)
498  allkeys.insert(it_wos->first);
499  }
500 
501  // Generate XML
502  ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
503  ofs << "<athenaFileReport>" << std::endl;
504  std::set<std::string>::const_iterator keys_it = allkeys.begin(),
505  keys_itLast = allkeys.end();
506  for(;keys_it!=keys_itLast;++keys_it) {
507  ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
508  for(size_t i=0; i<allptrs.size(); ++i) {
509  AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
510  if(it_wos!=(allptrs[i])->end()) {
511  for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
512  AthenaMP::WorkerOutput& outp = it_wos->second[ii];
513  if(ii==0 && m_nEventsBeforeFork>0) {
514  std::filesystem::path masterFile(std::filesystem::current_path());
515  masterFile /= std::filesystem::path(*keys_it);
516  if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
517  ofs << " <File "
518  << "description=\"" << outp.description
519  << "\" mode=\"" << outp.access_mode
520  << "\" name=\"" << masterFile.string()
521  << "\" shared=\"" << (outp.shared?"True":"False")
522  << "\" technology=\"" << outp.technology
523  << "\"/>" << std::endl;
524  }
525  ofs << " <File "
526  << "description=\"" << outp.description
527  << "\" mode=\"" << outp.access_mode
528  << "\" name=\"" << outp.filename
529  << "\" shared=\"" << (outp.shared?"True":"False")
530  << "\" technology=\"" << outp.technology
531  << "\"/>" << std::endl;
532  }
533  }
534  }
535  ofs << " </Files>" << std::endl;
536  }
537  ofs << "</athenaFileReport>" << std::endl;
538  ofs.close();
539  }
540 
541  return StatusCode::SUCCESS;
542 }
543 
544 std::shared_ptr<AthenaInterprocess::FdsRegistry> AthMpEvtLoopMgr::extractFds()
545 {
546  ATH_MSG_DEBUG("Extracting file descriptors");
547  using namespace std::filesystem;
548  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
549 
550  // Extract file descriptors associated with the current process
551  // 1. Store only those regular files in the registry, which
552  // don't contain substrings from the "exclusion pattern" set
553  // 2. Skip also stdout and stderr
554 
555  std::vector<std::string> excludePatterns {
556  "/root/etc/plugins/"
557  ,"/root/cint/cint/"
558  ,"/root/include/"
559  ,"/var/tmp/"
560  ,"/var/lock/"
561  ,"/var/lib/"
562  ,"/bin/python/"
563  ,"/include/c++/"
564  ,".confdb2"
565  };
566 
567  path fdPath("/proc/self/fd");
568  for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
569  if(is_symlink(fdIt->path())) {
570  path realpath = read_symlink(fdIt->path());
571  int fd = atoi(fdIt->path().filename().string().c_str());
572 
573  if (fd==1 || fd==2) // Skip stdout and stderr
574  continue;
575 
576  if(exists(realpath)) {
577  if(is_regular_file(realpath)) {
578  // Check against the exclusion criteria
579  bool exclude(false);
580  for(size_t i=0;i<excludePatterns.size(); ++i) {
581  if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
582  exclude = true;
583  break;
584  }
585  }
586  if(exclude) {
587  ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
588  }
589  else {
590  registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
591  }
592  }
593  else {
594  ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
595  }
596  } // File exists
597  }
598  else
599  ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
600  } // Directory iteration
601 
602  ATH_MSG_DEBUG("Fds Reistry created. Contents:");
603  for(size_t ii(0); ii<registry->size(); ++ii)
604  ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
605 
606  return registry;
607 }
608 
610 {
611  SmartIF<IProperty> propertyServer(m_evtSelector);
612  if(!propertyServer) {
613  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
614  return StatusCode::FAILURE;
615  }
616 
617  IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
618  if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
619  ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
620  return StatusCode::FAILURE;
621  }
622  ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
623 
624  return StatusCode::SUCCESS;
625 }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
outp
std::ostream * outp
send output to here ...
Definition: hcg.cxx:73
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
AthMpEvtLoopMgr::finalize
virtual StatusCode finalize()
Definition: AthMpEvtLoopMgr.cxx:137
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthMpEvtLoopMgr::m_eventPrintoutInterval
unsigned int m_eventPrintoutInterval
Definition: AthMpEvtLoopMgr.h:52
AthMpEvtLoopMgr::m_isPileup
bool m_isPileup
Definition: AthMpEvtLoopMgr.h:45
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
checkFileSG.line
line
Definition: checkFileSG.py:75
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
AthMpEvtLoopMgr::~AthMpEvtLoopMgr
virtual ~AthMpEvtLoopMgr()
Definition: AthMpEvtLoopMgr.cxx:75
AthMpEvtLoopMgr::generateOutputReport
StatusCode generateOutputReport()
Definition: AthMpEvtLoopMgr.cxx:473
IDataShare.h
AthMpEvtLoopMgr::m_nPollingInterval
int m_nPollingInterval
Definition: AthMpEvtLoopMgr.h:49
AthMpEvtLoopMgr::m_samplesRss
std::vector< unsigned long > m_samplesRss
Definition: AthMpEvtLoopMgr.h:57
skel.it
it
Definition: skel.GENtoEVGEN.py:423
AthMpEvtLoopMgr::m_nChildProcesses
int m_nChildProcesses
Definition: AthMpEvtLoopMgr.h:48
AthCommonMsg< Service >::msgLvl
bool msgLvl(const MSG::Level lvl) const
Definition: AthCommonMsg.h:30
AthMpEvtLoopMgr::updateSkipEvents
StatusCode updateSkipEvents(int skipEvents)
Definition: AthMpEvtLoopMgr.cxx:609
exclude
std::set< std::string > exclude
list of directories to be excluded
Definition: hcg.cxx:95
AthMpEvtLoopMgr::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: AthMpEvtLoopMgr.h:51
AthMpEvtLoopMgr::m_tools
ToolHandleArray< IAthenaMPTool > m_tools
Definition: AthMpEvtLoopMgr.h:47
XMLtoHeader.count
count
Definition: XMLtoHeader.py:85
AthMpEvtLoopMgr::m_samplesSize
std::vector< unsigned long > m_samplesSize
Definition: AthMpEvtLoopMgr.h:59
AthMpEvtLoopMgr::m_outputReportName
std::string m_outputReportName
Definition: AthMpEvtLoopMgr.h:43
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:26
AthMpEvtLoopMgr::m_nWorkers
int m_nWorkers
Definition: AthMpEvtLoopMgr.h:41
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
python.SCT_ByteStreamErrorsTestAlgConfig.maxEvents
maxEvents
Definition: SCT_ByteStreamErrorsTestAlgConfig.py:43
AthMpEvtLoopMgr::m_workerTopDir
std::string m_workerTopDir
Definition: AthMpEvtLoopMgr.h:42
AthMpEvtLoopMgr::m_samplesPss
std::vector< unsigned long > m_samplesPss
Definition: AthMpEvtLoopMgr.h:58
athenaMP_MemHelper::getPss
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)
AthMpEvtLoopMgr::createEventContext
EventContext createEventContext()
Definition: AthMpEvtLoopMgr.cxx:162
AthMpEvtLoopMgr::wait
StatusCode wait()
Definition: AthMpEvtLoopMgr.cxx:416
AthMpEvtLoopMgr::m_evtSelector
SmartIF< IService > m_evtSelector
Definition: AthMpEvtLoopMgr.h:40
python.setupRTTAlg.size
int size
Definition: setupRTTAlg.py:39
AthMpEvtLoopMgr.h
AthMpEvtLoopMgr::m_nMemSamplingInterval
int m_nMemSamplingInterval
Definition: AthMpEvtLoopMgr.h:50
python.handimod.now
now
Definition: handimod.py:675
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
lumiFormat.i
int i
Definition: lumiFormat.py:85
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthMpEvtLoopMgr::m_samplesSwap
std::vector< unsigned long > m_samplesSwap
Definition: AthMpEvtLoopMgr.h:60
AthService
Definition: AthService.h:32
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
Utilities.h
AthMpEvtLoopMgr::queryInterface
virtual StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface)
Definition: AthMpEvtLoopMgr.cxx:142
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthMpEvtLoopMgr::m_strategy
std::string m_strategy
Definition: AthMpEvtLoopMgr.h:44
AthenaInterprocess::randString
std::string randString()
Definition: Control/AthenaInterprocess/AthenaInterprocess/Utilities.h:13
AthMpEvtLoopMgr::extractFds
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Definition: AthMpEvtLoopMgr.cxx:544
AthenaInterprocess::FdsRegistry
std::vector< FdsRegistryEntry > FdsRegistry
Definition: FdsRegistry.h:22
athenaMP_MemHelper
Definition: AthMpEvtLoopMgr.cxx:37
WriteCalibToCool.swap
swap
Definition: WriteCalibToCool.py:94
AthMpEvtLoopMgr::AthMpEvtLoopMgr
AthMpEvtLoopMgr()
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
AthMpEvtLoopMgr::initialize
virtual StatusCode initialize()
Definition: AthMpEvtLoopMgr.cxx:79
AthMpEvtLoopMgr::stopRun
virtual StatusCode stopRun()
Definition: AthMpEvtLoopMgr.cxx:401
IAthenaMPTool.h
AthMpEvtLoopMgr::m_execAtPreFork
StringArrayProperty m_execAtPreFork
Definition: AthMpEvtLoopMgr.h:53
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
AthMpEvtLoopMgr::executeEvent
virtual StatusCode executeEvent(EventContext &&ctx)
Definition: AthMpEvtLoopMgr.cxx:167
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthMpEvtLoopMgr::executeRun
virtual StatusCode executeRun(int maxevt)
Definition: AthMpEvtLoopMgr.cxx:173
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
python.TriggerHandler.verbose
verbose
Definition: TriggerHandler.py:297
DEBUG
#define DEBUG
Definition: page_access.h:11
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:17
python.CaloCondTools.log
log
Definition: CaloCondTools.py:20
SharedQueue.h
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
AthMpEvtLoopMgr::nextEvent
virtual StatusCode nextEvent(int maxevt)
Definition: AthMpEvtLoopMgr.cxx:156
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
StoreGateSvc.h
AthMpEvtLoopMgr::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthMpEvtLoopMgr.h:39
AthMpEvtLoopMgr::m_collectSubprocessLogs
bool m_collectSubprocessLogs
Definition: AthMpEvtLoopMgr.h:46
ServiceHandle< IEventProcessor >