ATLAS Offline Software
SharedHiveEvtQueueConsumer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 #include "copy_file_icc_hack.h"
9 
13 #include "CxxUtils/xmalloc.h"
14 #include "GaudiKernel/IEvtSelector.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IFileMgr.h"
17 #include "GaudiKernel/IChronoStatSvc.h"
18 #include "GaudiKernel/ISvcLocator.h"
19 #include "GaudiKernel/IIncidentSvc.h"
20 #include "GaudiKernel/IConversionSvc.h"
21 
22 #include <sys/stat.h>
23 #include <sstream>
24 #include <fstream>
25 #include <unistd.h>
26 #include <stdio.h>
27 #include <stdint.h>
28 #include <stdexcept>
29 #include <cmath> // For pow
30 
31 #include <signal.h>
32 
34  std::atomic<bool> sig_done = false;
35  void pauseForDebug(int /*sig*/) {
36  // std::cout << "Continuing after receiving signal "
37  // << sig << std::endl;
38  sig_done = true;
39  }
40 }
41 
43  , const std::string& name
44  , const IInterface* parent)
46  , m_chronoStatSvc("ChronoStatSvc", name)
47 {
48  m_subprocDirPrefix = "worker_";
49 }
50 
51 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
52 
54 {
55 }
56 
57 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
58 
60 {
61  ATH_MSG_DEBUG("In initialize");
62 
64 
65  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
66  ATH_CHECK( m_evtSelSeek.isValid() );
67  ATH_CHECK( evtSelector()->createContext (m_evtContext) );
68 
69  ATH_CHECK(m_chronoStatSvc.retrieve());
70 
71  if(m_useSharedWriter) {
72  m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
73  if(!m_dataShare) {
74  ATH_MSG_ERROR("Error retrieving AthenaPoolSharedIOCnvSvc");
75  return StatusCode::FAILURE;
76  }
77  }
78 
79  return StatusCode::SUCCESS;
80 }
81 
82 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
83 
86 {
87  if (m_evtContext) {
88  ATH_CHECK( evtSelector()->releaseContext (m_evtContext) );
89  m_evtContext = nullptr;
90  }
91 
92  return StatusCode::SUCCESS;
93 }
94 
95 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
96 
97 int
98 SharedHiveEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
99 {
100  ATH_MSG_DEBUG("In makePool " << getpid());
101 
102  if(nprocs==0 || nprocs<-1) {
103  ATH_MSG_ERROR("Invalid value for the nprocs parameter: ");
104  return -1;
105  }
106 
107  if(topdir.empty()) {
108  ATH_MSG_ERROR("Empty name for the top directory!");
109  return -1;
110  }
111 
112  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
113  m_subprocTopDir = topdir;
114 
115  // Get the shared event queue
116  ATH_MSG_DEBUG("Event queue name " << "AthenaMPEventQueue_" << m_randStr);
117  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
118  if(sc.isFailure()) {
119  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Event Queue");
120  return -1;
121  }
122 
123 
124  // Create rank queue and fill it
125  m_sharedRankQueue = std::make_unique<AthenaInterprocess::SharedQueue>("SharedHiveEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
126  for(int i=0; i<m_nprocs; ++i)
127  if(!m_sharedRankQueue->send_basic<int>(i)) {
128  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
129  return -1;
130  }
131 
132  // Create the process group and map_async bootstrap
134  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
135  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
136  return -1;
137  ATH_MSG_INFO("Workers bootstrapped");
138 
139  return m_nprocs;
140 }
141 
142 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
143 
144 StatusCode
146 {
147  ATH_MSG_DEBUG("In exec " << getpid());
148 
149  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
150  return StatusCode::FAILURE;
151  ATH_MSG_INFO("Workers started processing events");
152 
153  return StatusCode::SUCCESS;
154 }
155 
156 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
157 
158 StatusCode
159 SharedHiveEvtQueueConsumer::wait_once(pid_t& pid)
160 {
161  StatusCode sc = AthenaMPToolBase::wait_once(pid);
163  if(sc.isFailure()) {
164  // We are to stop waiting. Pull all available ProcessResults from the queue
165  // Don't serialize finalizations
166  do {
167  presult = m_processGroup->pullOneResult();
168  if(presult && (unsigned)(presult->output.size)>sizeof(int))
169  decodeProcessResult(presult,false);
170  if(presult) free(presult->output.data);
171  delete presult;
172  } while(presult);
173  } else {
174  // Pull one result and decode it if necessary
175  presult = m_processGroup->pullOneResult();
176  int res(0);
177  if(presult && (unsigned)(presult->output.size)>sizeof(int))
178  res = decodeProcessResult(presult,true);
179  if(presult) free(presult->output.data);
180  delete presult;
181  if(res) return StatusCode::FAILURE;
182  }
183  return sc;
184 }
185 
186 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
187 
188 void
190 {
191  ATH_MSG_INFO("Statuses of event processors");
192  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
193  for(size_t i=0; i<statuses.size(); ++i) {
194  // Get the number of events processed by this worker
195  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
196  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
197  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
198  << ". Number of events processed: ";
199  if(it==m_nProcessedEvents.end())
200  msg(MSG::INFO) << "N/A" << endmsg;
201  else
202  msg(MSG::INFO) << it->second << endmsg;
203  }
204 }
205 
206 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
207 
208 void
210 {
211  filenames.clear();
212  for(int i=0; i<m_nprocs; ++i) {
213  std::ostringstream workerIndex;
214  workerIndex << i;
215  std::filesystem::path worker_rundir(m_subprocTopDir);
216  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
217  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
218  }
219 }
220 
221 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
222 
223 std::unique_ptr<AthenaInterprocess::ScheduledWork>
225 {
226  if (m_debug) {
227  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
228  sigset_t mask, oldmask;
229 
231 
232  sigemptyset (&mask);
233  sigaddset (&mask, SIGUSR1);
234 
235  sigprocmask (SIG_BLOCK, &mask, &oldmask);
237  sigsuspend (&oldmask);
238  sigprocmask (SIG_UNBLOCK, &mask, NULL);
239  }
240 
241  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
242  outwork->data = CxxUtils::xmalloc(sizeof(int));
243  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
244  outwork->size = sizeof(int);
245 
246  // ...
247  // (possible) TODO: extend outwork with some error message, which will be eventually
248  // reported in the master proces
249  // ...
250 
251  // ________________________ Get IncidentSvc and fire PostFork ________________________
252  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
253  if (!p_incidentSvc) {
254  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
255  return outwork;
256  }
257  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
258 
259 
260  // ________________________ Get RankID ________________________
261  //
262  if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
263  ATH_MSG_ERROR("Unable to get rank ID!");
264  return outwork;
265  }
266  std::ostringstream workindex;
267  workindex<<m_rankId;
268 
269  // ________________________ Worker dir: mkdir ________________________
270  std::filesystem::path worker_rundir(m_subprocTopDir);
271  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
272  // TODO: this "worker_" can be made configurable too
273 
274  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
275  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
276  return outwork;
277  }
278 
279  // ________________________ Redirect logs ________________________
280  if(redirectLog(worker_rundir.string()))
281  return outwork;
282 
283  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
284 
285  // ________________________ Update Io Registry ____________________________
286  if(updateIoReg(worker_rundir.string()))
287  return outwork;
288 
289  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
290 
291  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
292  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
293  if(std::filesystem::is_regular_file("SimParams.db"))
294  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
295  if(std::filesystem::is_regular_file("DigitParams.db"))
296  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
297  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
298  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
299 
300  // _______________________ Handle saved PFC (if any) ______________________
301  if(handleSavedPfc(abs_worker_rundir))
302  return outwork;
303 
304  // ________________________ reopen descriptors ____________________________
305  if(reopenFds())
306  return outwork;
307 
308  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
309 
310  // ________________________ Make Shared Writer Client ________________________
311 
313  SmartIF<IProperty> propertyServer(m_dataShare);
314  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
315  ATH_MSG_ERROR("Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
316  return outwork;
317  } else {
318  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
319  }
320  }
321 
322  // ________________________ I/O reinit ________________________
323  ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
324 
325  // ________________________ Event selector restart ________________________
326  SmartIF<IService> evtSelSvc(m_evtSelector);
327  ATH_CHECK( evtSelSvc.isValid(), outwork );
328  ATH_CHECK( evtSelSvc->start(), outwork );
329 
330  // ________________________ Worker dir: chdir ________________________
331  if(chdir(worker_rundir.string().c_str())==-1) {
332  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
333  return outwork;
334  }
335 
336  // ___________________ Fire UpdateAfterFork incident _________________
337  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
338 
339  // Declare success and return
340  *(int*)(outwork->data) = 0;
341  return outwork;
342 }
343 
344 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
345 
346 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedHiveEvtQueueConsumer::exec_func()
347 {
348  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
349 
350  bool all_ok(true);
351 
352  if (!initHive().isSuccess()) {
353  ATH_MSG_FATAL("unable to initialize Hive");
354  all_ok = false;
355  }
356 
357  // Get the value of SkipEvent
358  int skipEvents(0);
359  SmartIF<IProperty> propertyServer(m_evtSelector);
360  if(propertyServer==0) {
361  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
362  all_ok = false;
363  }
364  else {
365  std::string propertyName("SkipEvents");
366  IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
367  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
368  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
369  }
370  else {
371  skipEvents = skipEventsProp.value();
372  }
373  }
374 
375  IHybridProcessorHelper* hybridHelper = dynamic_cast<IHybridProcessorHelper*>(m_evtProcessor.get());
376  if(!hybridHelper) {
377  ATH_MSG_FATAL("Failed to acquire IHybridProcessorHelper interface");
378  all_ok = false;
379  return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
380  }
381  // Reset the application return code.
382  hybridHelper->resetAppReturnCode();
383 
384  int finishedEvts =0;
385  int createdEvts =0;
386  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
387  long evtnumAndChunk(0);
388 // unsigned evtCounter(0);
389  int evtnum(0), chunkSize(1);
390 
391  ATH_MSG_INFO("Starting loop on events");
392 
393  StatusCode sc(StatusCode::SUCCESS);
394 
395  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
396  ATH_MSG_DEBUG("Event queue is empty");
397  usleep(1000);
398  }
399  bool loop_ended = (evtnumAndChunk<0);
400  if(!loop_ended) {
401  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
402  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
403  evtnum = evtnumAndChunk & intmask;
404  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
405  hybridHelper->setCurrentEventNum(++evtnum);
406  }
407 
408  bool no_more_events = false;
409 
410  while(!loop_ended) {
411  ATH_MSG_DEBUG(" -> createdEvts: " << createdEvts);
412 
413  if(!hybridHelper->terminateLoop() // No scheduled loop termination
414  && !no_more_events // We are not yet done getting events
415  && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
416  ATH_MSG_DEBUG("createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots());
417 
418  auto ctx = m_evtProcessor->createEventContext();
419  if(!ctx.valid()) {
420  sc = StatusCode::FAILURE;
421  }
422  else {
423  sc = m_evtProcessor->executeEvent(std::move(ctx));
424  }
425 
426  if (sc.isFailure()) {
427  ATH_MSG_ERROR("Terminating event processing loop due to errors");
428  loop_ended = true;
429  }
430  else {
431  ++createdEvts;
432  if(--chunkSize==0) {
433  // Fetch next chunk
434  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
435  ATH_MSG_DEBUG("Event queue is empty");
436  usleep(1000);
437  }
438  if(evtnumAndChunk<0) {
439  no_more_events = true;
440  evtnumAndChunk *= -1;
441  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
442  }
443  else {
444  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
445  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
446  evtnum = evtnumAndChunk & intmask;
447  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
448  }
449  }
450  // Advance to the next event
451  if(!no_more_events) {
452  hybridHelper->setCurrentEventNum(++evtnum);
453  }
454  }
455  }
456  else {
457  // all the events were created but not all finished or the slots were
458  // all busy: the scheduler should finish its job
459  ATH_MSG_DEBUG("Draining the scheduler");
460 
461  // Pull out of the scheduler the finished events
462  int ir = hybridHelper->drainScheduler(finishedEvts,true);
463  if(ir < 0) {
464  // some sort of error draining scheduler;
465  loop_ended = true;
466  sc = StatusCode::FAILURE;
467  }
468  else if(ir == 0) {
469  // no more events in scheduler
470  if(no_more_events) {
471  // We are done
472  loop_ended = true;
473  sc = StatusCode::SUCCESS;
474  }
475  }
476  else {
477  // keep going!
478  }
479  }
480  } // end main loop on finished events
481 
482  if(all_ok) {
483  if(m_evtProcessor->executeRun(0).isFailure()) {
484  ATH_MSG_ERROR("Could not finalize the Run");
485  all_ok=false;
486  } else {
487  if(m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents).isFailure()) {
488  ATH_MSG_DEBUG("Seek past maxevt to " << evtnumAndChunk+skipEvents << " returned failure. As expected...");
489  }
490  }
491  }
492 
493  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
494 
495  // Return value: "ERRCODE|Func_Flag|NEvt"
496  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
497  void* outdata = CxxUtils::xmalloc(outsize);
498  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
500  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
501  memcpy((char*)outdata+sizeof(int)+sizeof(func),&createdEvts,sizeof(int));
502 
503  outwork->data = outdata;
504  outwork->size = outsize;
505  // ...
506  // (possible) TODO: extend outwork with some error message, which will be eventually
507  // reported in the master proces
508  // ...
509  return outwork;
510 }
511 
512 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
513 
514 std::unique_ptr<AthenaInterprocess::ScheduledWork>
516 {
517  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
518 
519  bool all_ok(true);
520 
521  if(m_appMgr->stop().isFailure()) {
522  ATH_MSG_ERROR("Unable to stop AppMgr");
523  all_ok=false;
524  } else {
525  if(m_appMgr->finalize().isFailure()) {
526  std::cerr << "Unable to finalize AppMgr" << std::endl;
527  all_ok=false;
528  }
529  }
530 
531  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
532 
533  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
534  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
535  void* outdata = CxxUtils::xmalloc(outsize);
536  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
538  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
539  int nEvt = -1;
540  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
541 
542  outwork->data = outdata;
543  outwork->size = outsize;
544 
545  return outwork;
546 }
547 
548 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
549 
550 int
551 SharedHiveEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
552 {
553  if(!presult) return 0;
555  ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
556  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) return 0;
557 
559  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
560  if(func==AthenaMPToolBase::FUNC_EXEC) {
561  // Store the number of processed events
562  int nevt(0);
563  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
564  m_nProcessedEvents[presult->pid]=nevt;
565  ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events");
566 
567  if(doFinalize) {
568  // Add PID to the finalization queue
569  m_finQueue.push(presult->pid);
570  ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
571 
572  // If this is the only element in the queue then start its finalization
573  // Otherwise it has to wait its turn until all previous processes have been finalized
574  if(m_finQueue.size()==1) {
575  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
576  || m_processGroup->map_async(0,0,presult->pid)) {
577  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
578  return 1;
579  } else {
580  ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
581  }
582  }
583  }
584  } else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
585  ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
586  pid_t pid = m_finQueue.front();
587  if(pid==presult->pid) {
588  // pid received as expected. Remove it from the queue
589  m_finQueue.pop();
590  ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
591  // Schedule finalization of the next processe in the queue
592  if(m_finQueue.size()) {
593  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
594  || m_processGroup->map_async(0,0,m_finQueue.front())) {
595  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
596  return 1;
597  } else {
598  ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
599  }
600  }
601  } else {
602  // Error: unexpected pid received from presult
603  ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
604  return 1;
605  }
606  }
607 
608  return 0;
609 }
610 
611 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
612 
615 
616  if (m_evtProcessor.release().isFailure()) {
617  ATH_MSG_INFO("could not release old EventProcessor ");
618  }
619 
620  ISvcManager* pISM(dynamic_cast<ISvcManager*>(serviceLocator().get()));
621  if (pISM == 0) {
622  ATH_MSG_ERROR("initHive: Could not get SvcManager");
623  } else {
624  if (pISM->removeService(m_evtProcessor.name()).isFailure()) {
625  ATH_MSG_ERROR("initHive: could not remove " << m_evtProcessor.name()
626  << " from SvcManager");
627  }
628  }
629 
630  m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaMtesEventLoopMgr",name());
631 
632  if (m_evtProcessor.retrieve().isFailure()) {
633  ATH_MSG_ERROR("could not setup " << m_evtProcessor.typeAndName());
634  return StatusCode::FAILURE;
635  }
636 
637  m_schedulerSvc = serviceLocator()->service("AvalancheSchedulerSvc");
638 
639  // m_whiteboard = serviceLocator()->service(m_whiteboardName);
640  // if( !m_whiteboard.isValid() ) {
641  // ATH_MSG_FATAL( "Error retrieving " << m_whiteboardName
642  // << " interface IHiveWhiteBoard." );
643  // return StatusCode::FAILURE;
644  // }
645 
646  // m_schedulerSvc = serviceLocator()->service(m_schedulerName);
647  // if ( !m_schedulerSvc.isValid()){
648  // ATH_MSG_FATAL( "Error retrieving SchedulerSvc interface ISchedulerSvc." );
649  // return StatusCode::FAILURE;
650  // }
651  // // Setup algorithm resource pool
652  // m_algResourcePool = serviceLocator()->service("AlgResourcePool");
653  // if( !m_algResourcePool.isValid() ) {
654  // ATH_MSG_FATAL ("Error retrieving AlgResourcePool");
655  // return StatusCode::FAILURE;
656  // }
657 
658  // sc = m_eventStore.retrieve();
659  // if( !sc.isSuccess() ) {
660  // ATH_MSG_FATAL("Error retrieving pointer to StoreGateSvc");
661  // return sc;
662  // }
663 
664 
665  return StatusCode::SUCCESS;
666 
667 }
668 
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
IHybridProcessorHelper::setCurrentEventNum
virtual void setCurrentEventNum(int num)=0
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer
SharedHiveEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
Definition: SharedHiveEvtQueueConsumer.cxx:42
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
SharedHiveEvtQueueConsumer::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
Definition: SharedHiveEvtQueueConsumer.h:83
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer
virtual ~SharedHiveEvtQueueConsumer() override
Definition: SharedHiveEvtQueueConsumer.cxx:53
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
SharedHiveEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedHiveEvtQueueConsumer.cxx:209
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
skel.it
it
Definition: skel.GENtoEVGEN.py:407
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Name of the event selector.
Definition: AthenaMPToolBase.h:89
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
ProcessGroup.h
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:459
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:55
SharedHiveEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:515
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedHiveEvtQueueConsumer_d
Definition: SharedHiveEvtQueueConsumer.cxx:33
SharedHiveEvtQueueConsumer::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: SharedHiveEvtQueueConsumer.h:80
IHybridProcessorHelper
Helper interface for implementing hybrid MP+MT. Used by the Hybrid Shared Event Queue Consumer MP too...
Definition: IHybridProcessorHelper.h:16
sigset_t
int sigset_t
Definition: SealSignal.h:80
SharedHiveEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedHiveEvtQueueConsumer.h:77
SharedHiveEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedHiveEvtQueueConsumer.h:74
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
SharedHiveEvtQueueConsumer::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: SharedHiveEvtQueueConsumer.h:78
lumiFormat.i
int i
Definition: lumiFormat.py:85
SharedHiveEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedHiveEvtQueueConsumer.h:73
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:67
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
Incidents.h
SharedHiveEvtQueueConsumer_d::pauseForDebug
void pauseForDebug(int)
Definition: SharedHiveEvtQueueConsumer.cxx:35
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:11
SharedHiveEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:346
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:83
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
SharedHiveEvtQueueConsumer.h
merge.output
output
Definition: merge.py:16
SharedHiveEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:224
python.PyKernel.detStore
detStore
Definition: PyKernel.py:41
SharedHiveEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedHiveEvtQueueConsumer.h:75
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
SharedHiveEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedHiveEvtQueueConsumer.cxx:85
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
SharedHiveEvtQueueConsumer_d::sig_done
std::atomic< bool > sig_done
Definition: SharedHiveEvtQueueConsumer.cxx:34
grepfile.filenames
list filenames
Definition: grepfile.py:34
ir
int ir
counter of the current depth
Definition: fastadd.cxx:49
SharedHiveEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedHiveEvtQueueConsumer.cxx:189
SharedHiveEvtQueueConsumer::initHive
StatusCode initHive()
Definition: SharedHiveEvtQueueConsumer.cxx:614
SharedHiveEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedHiveEvtQueueConsumer.h:70
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
python.Constants.INFO
int INFO
Definition: Control/AthenaCommon/python/Constants.py:15
IHybridProcessorHelper.h
SharedHiveEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedHiveEvtQueueConsumer.h:72
IHybridProcessorHelper::drainScheduler
virtual int drainScheduler(int &finishedEvts, bool report)=0
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
IHybridProcessorHelper::resetAppReturnCode
virtual void resetAppReturnCode()=0
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedHiveEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedHiveEvtQueueConsumer.h:81
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
SharedHiveEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedHiveEvtQueueConsumer.cxx:59
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
SharedHiveEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedHiveEvtQueueConsumer.h:65
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
xmalloc.h
Trapping version of malloc.
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
SharedHiveEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedHiveEvtQueueConsumer.h:61
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
IHybridProcessorHelper::terminateLoop
virtual bool terminateLoop()=0
ServiceHandle< IEventProcessor >
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:94