ATLAS Offline Software
SharedHiveEvtQueueConsumer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 #include "copy_file_icc_hack.h"
9 
13 #include "GaudiKernel/IEvtSelector.h"
14 #include "GaudiKernel/IIoComponentMgr.h"
15 #include "GaudiKernel/IFileMgr.h"
16 #include "GaudiKernel/IChronoStatSvc.h"
17 #include "GaudiKernel/ISvcLocator.h"
18 #include "GaudiKernel/IIncidentSvc.h"
19 #include "GaudiKernel/IConversionSvc.h"
20 
21 #include <sys/stat.h>
22 #include <sstream>
23 #include <fstream>
24 #include <unistd.h>
25 #include <stdio.h>
26 #include <stdint.h>
27 #include <stdexcept>
28 #include <cmath> // For pow
29 
30 #include <signal.h>
31 
33  std::atomic<bool> sig_done = false;
34  void pauseForDebug(int /*sig*/) {
35  // std::cout << "Continuing after receiving signal "
36  // << sig << std::endl;
37  sig_done = true;
38  }
39 }
40 
42  , const std::string& name
43  , const IInterface* parent)
45  , m_rankId(-1)
46  , m_chronoStatSvc("ChronoStatSvc", name)
47  , m_evtSelSeek(0)
48  , m_evtContext(0)
49  , m_sharedEventQueue(0)
50  , m_sharedRankQueue(0)
51 {
52  declareInterface<IAthenaMPTool>(this);
53 
54  m_subprocDirPrefix = "worker_";
55 }
56 
57 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
58 
60 {
61 }
62 
63 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
64 
66 {
67  ATH_MSG_DEBUG("In initialize");
68 
70  if(!sc.isSuccess())
71  return sc;
72 
73  sc = serviceLocator()->service(m_evtSelName,m_evtSelSeek);
74  if(sc.isFailure() || m_evtSelSeek==0) {
75  ATH_MSG_ERROR("Error retrieving IEvtSelectorSeek");
76  return StatusCode::FAILURE;
77  }
78  ATH_CHECK( evtSelector()->createContext (m_evtContext) );
79 
80  ATH_CHECK(m_chronoStatSvc.retrieve());
81 
82  IConversionSvc* cnvSvc = 0;
83  sc = serviceLocator()->service("AthenaPoolCnvSvc",cnvSvc);
84  m_dataShare = dynamic_cast<IDataShare*>(cnvSvc);
85  if(sc.isFailure() || m_dataShare==0) {
86  if(m_useSharedWriter) {
87  ATH_MSG_ERROR("Error retrieving AthenaPoolCnvSvc " << cnvSvc);
88  return StatusCode::FAILURE;
89  }
90  }
91 
92  return StatusCode::SUCCESS;
93 }
94 
95 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
96 
99 {
100  if (m_evtContext) {
101  ATH_CHECK( evtSelector()->releaseContext (m_evtContext) );
102  m_evtContext = nullptr;
103  }
104 
105  delete m_sharedRankQueue;
106  return StatusCode::SUCCESS;
107 }
108 
109 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
110 
111 int
112 SharedHiveEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
113 {
114  ATH_MSG_DEBUG("In makePool " << getpid());
115 
116  if(nprocs==0 || nprocs<-1) {
117  ATH_MSG_ERROR("Invalid value for the nprocs parameter: ");
118  return -1;
119  }
120 
121  if(topdir.empty()) {
122  ATH_MSG_ERROR("Empty name for the top directory!");
123  return -1;
124  }
125 
126  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
127  m_subprocTopDir = topdir;
128 
129  // Get the shared event queue
130  ATH_MSG_DEBUG("Event queue name " << "AthenaMPEventQueue_" << m_randStr);
131  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
132  if(sc.isFailure()) {
133  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Event Queue");
134  return -1;
135  }
136 
137 
138  // Create rank queue and fill it
139  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedHiveEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
140  for(int i=0; i<m_nprocs; ++i)
141  if(!m_sharedRankQueue->send_basic<int>(i)) {
142  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
143  return -1;
144  }
145 
146  // Create the process group and map_async bootstrap
148  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
149  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
150  return -1;
151  ATH_MSG_INFO("Workers bootstraped");
152 
153  return m_nprocs;
154 }
155 
156 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
157 
158 StatusCode
160 {
161  ATH_MSG_DEBUG("In exec " << getpid());
162 
163  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
164  return StatusCode::FAILURE;
165  ATH_MSG_INFO("Workers started processing events");
166 
167  return StatusCode::SUCCESS;
168 }
169 
170 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
171 
172 StatusCode
173 SharedHiveEvtQueueConsumer::wait_once(pid_t& pid)
174 {
175  StatusCode sc = AthenaMPToolBase::wait_once(pid);
177  if(sc.isFailure()) {
178  // We are to stop waiting. Pull all available ProcessResults from the queue
179  // Don't serialize finalizations
180  do {
181  presult = m_processGroup->pullOneResult();
182  if(presult && (unsigned)(presult->output.size)>sizeof(int))
183  decodeProcessResult(presult,false);
184  if(presult) free(presult->output.data);
185  delete presult;
186  } while(presult);
187  } else {
188  // Pull one result and decode it if necessary
189  presult = m_processGroup->pullOneResult();
190  int res(0);
191  if(presult && (unsigned)(presult->output.size)>sizeof(int))
192  res = decodeProcessResult(presult,true);
193  if(presult) free(presult->output.data);
194  delete presult;
195  if(res) return StatusCode::FAILURE;
196  }
197  return sc;
198 }
199 
200 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
201 
202 void
204 {
205  ATH_MSG_INFO("Statuses of event processors");
206  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
207  for(size_t i=0; i<statuses.size(); ++i) {
208  // Get the number of events processed by this worker
209  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
210  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
211  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
212  << ". Number of events processed: ";
213  if(it==m_nProcessedEvents.end())
214  msg(MSG::INFO) << "N/A" << endmsg;
215  else
216  msg(MSG::INFO) << it->second << endmsg;
217  }
218 }
219 
220 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
221 
222 void
224 {
225  filenames.clear();
226  for(int i=0; i<m_nprocs; ++i) {
227  std::ostringstream workerIndex;
228  workerIndex << i;
229  std::filesystem::path worker_rundir(m_subprocTopDir);
230  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
231  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
232  }
233 }
234 
235 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
236 
237 std::unique_ptr<AthenaInterprocess::ScheduledWork>
239 {
240  if (m_debug) {
241  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
242  sigset_t mask, oldmask;
243 
245 
246  sigemptyset (&mask);
247  sigaddset (&mask, SIGUSR1);
248 
249  sigprocmask (SIG_BLOCK, &mask, &oldmask);
251  sigsuspend (&oldmask);
252  sigprocmask (SIG_UNBLOCK, &mask, NULL);
253  }
254 
255  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
256  outwork->data = malloc(sizeof(int));
257  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
258  outwork->size = sizeof(int);
259 
260  // ...
261  // (possible) TODO: extend outwork with some error message, which will be eventually
262  // reported in the master proces
263  // ...
264 
265  // ________________________ Get IncidentSvc and fire PostFork ________________________
266  IIncidentSvc* p_incidentSvc(0);
267  if(!serviceLocator()->service("IncidentSvc", p_incidentSvc).isSuccess()) {
268  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
269  return outwork;
270  }
271  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
272 
273 
274  // ________________________ Get RankID ________________________
275  //
277  ATH_MSG_ERROR("Unable to get rank ID!");
278  return outwork;
279  }
280  std::ostringstream workindex;
281  workindex<<m_rankId;
282 
283  // ________________________ Worker dir: mkdir ________________________
284  std::filesystem::path worker_rundir(m_subprocTopDir);
285  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
286  // TODO: this "worker_" can be made configurable too
287 
288  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
289  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
290  return outwork;
291  }
292 
293  // ________________________ Redirect logs ________________________
294  if(redirectLog(worker_rundir.string()))
295  return outwork;
296 
297  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
298 
299  // ________________________ Update Io Registry ____________________________
300  if(updateIoReg(worker_rundir.string()))
301  return outwork;
302 
303  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
304 
305  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
306  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
307  if(std::filesystem::is_regular_file("SimParams.db"))
308  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
309  if(std::filesystem::is_regular_file("DigitParams.db"))
310  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
311  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
312  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
313 
314  // _______________________ Handle saved PFC (if any) ______________________
315  if(handleSavedPfc(abs_worker_rundir))
316  return outwork;
317 
318  // ________________________ reopen descriptors ____________________________
319  if(reopenFds())
320  return outwork;
321 
322  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
323 
324  // ________________________ Make Shared Writer Client ________________________
325 
327  IProperty* propertyServer = dynamic_cast<IProperty*>(m_dataShare);
328  if (propertyServer==0 || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
329  ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property");
330  return outwork;
331  } else {
332  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
333  }
334  }
335 
336  // ________________________ I/O reinit ________________________
337  if(!m_ioMgr->io_reinitialize().isSuccess()) {
338  ATH_MSG_ERROR("Failed to reinitialize I/O");
339  return outwork;
340  } else {
341  ATH_MSG_DEBUG("Successfully reinitialized I/O");
342  }
343 
344  // ________________________ Event selector restart ________________________
345  IService* evtSelSvc = dynamic_cast<IService*>(m_evtSelector);
346  if(!evtSelSvc) {
347  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
348  return outwork;
349  }
350  if(!evtSelSvc->start().isSuccess()) {
351  ATH_MSG_ERROR("Failed to restart the event selector");
352  return outwork;
353  } else {
354  ATH_MSG_DEBUG("Successfully restarted the event selector");
355  }
356 
357  // ________________________ Worker dir: chdir ________________________
358  if(chdir(worker_rundir.string().c_str())==-1) {
359  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
360  return outwork;
361  }
362 
363  // ___________________ Fire UpdateAfterFork incident _________________
364  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
365 
366  // Declare success and return
367  *(int*)(outwork->data) = 0;
368  return outwork;
369 }
370 
371 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
372 
373 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedHiveEvtQueueConsumer::exec_func()
374 {
375  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
376 
377  bool all_ok(true);
378 
379  if (!initHive().isSuccess()) {
380  ATH_MSG_FATAL("unable to initialize Hive");
381  all_ok = false;
382  }
383 
384  // Get the value of SkipEvent
385  int skipEvents(0);
386  IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector);
387  if(propertyServer==0) {
388  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
389  all_ok = false;
390  }
391  else {
392  std::string propertyName("SkipEvents");
393  IntegerProperty skipEventsProp(propertyName,skipEvents);
394  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
395  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
396  }
397  else {
398  skipEvents = skipEventsProp.value();
399  }
400  }
401 
402  IHybridProcessorHelper* hybridHelper = dynamic_cast<IHybridProcessorHelper*>(m_evtProcessor.get());
403  if(!hybridHelper) {
404  ATH_MSG_FATAL("Failed to acquire IHybridProcessorHelper interface");
405  all_ok = false;
406  return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
407  }
408  // Reset the application return code.
409  hybridHelper->resetAppReturnCode();
410 
411  int finishedEvts =0;
412  int createdEvts =0;
413  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
414  long evtnumAndChunk(0);
415 // unsigned evtCounter(0);
416  int evtnum(0), chunkSize(1);
417 
418  ATH_MSG_INFO("Starting loop on events");
419 
420  StatusCode sc(StatusCode::SUCCESS);
421 
422  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
423  ATH_MSG_DEBUG("Event queue is empty");
424  usleep(1000);
425  }
426  bool loop_ended = (evtnumAndChunk<0);
427  if(!loop_ended) {
428  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
429  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
430  evtnum = evtnumAndChunk & intmask;
431  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
432  hybridHelper->setCurrentEventNum(++evtnum);
433  }
434 
435  bool no_more_events = false;
436 
437  while(!loop_ended) {
438  ATH_MSG_DEBUG(" -> createdEvts: " << createdEvts);
439 
440  if(!hybridHelper->terminateLoop() // No scheduled loop termination
441  && !no_more_events // We are not yet done getting events
442  && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
443  ATH_MSG_DEBUG("createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots());
444 
445  auto ctx = m_evtProcessor->createEventContext();
446  if(!ctx.valid()) {
447  sc = StatusCode::FAILURE;
448  }
449  else {
450  sc = m_evtProcessor->executeEvent(std::move(ctx));
451  }
452 
453  if (sc.isFailure()) {
454  ATH_MSG_ERROR("Terminating event processing loop due to errors");
455  loop_ended = true;
456  }
457  else {
458  ++createdEvts;
459  if(--chunkSize==0) {
460  // Fetch next chunk
461  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
462  ATH_MSG_DEBUG("Event queue is empty");
463  usleep(1000);
464  }
465  if(evtnumAndChunk<0) {
466  no_more_events = true;
467  evtnumAndChunk *= -1;
468  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
469  }
470  else {
471  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
472  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
473  evtnum = evtnumAndChunk & intmask;
474  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
475  }
476  }
477  // Advance to the next event
478  if(!no_more_events) {
479  hybridHelper->setCurrentEventNum(++evtnum);
480  }
481  }
482  }
483  else {
484  // all the events were created but not all finished or the slots were
485  // all busy: the scheduler should finish its job
486  ATH_MSG_DEBUG("Draining the scheduler");
487 
488  // Pull out of the scheduler the finished events
489  int ir = hybridHelper->drainScheduler(finishedEvts,true);
490  if(ir < 0) {
491  // some sort of error draining scheduler;
492  loop_ended = true;
493  sc = StatusCode::FAILURE;
494  }
495  else if(ir == 0) {
496  // no more events in scheduler
497  if(no_more_events) {
498  // We are done
499  loop_ended = true;
500  sc = StatusCode::SUCCESS;
501  }
502  }
503  else {
504  // keep going!
505  }
506  }
507  } // end main loop on finished events
508 
509  if(all_ok) {
510  if(m_evtProcessor->executeRun(0).isFailure()) {
511  ATH_MSG_ERROR("Could not finalize the Run");
512  all_ok=false;
513  } else {
514  if(m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents).isFailure()) {
515  ATH_MSG_DEBUG("Seek past maxevt to " << evtnumAndChunk+skipEvents << " returned failure. As expected...");
516  }
517  }
518  }
519 
520  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
521 
522  // Return value: "ERRCODE|Func_Flag|NEvt"
523  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
524  void* outdata = malloc(outsize);
525  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
527  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
528  memcpy((char*)outdata+sizeof(int)+sizeof(func),&createdEvts,sizeof(int));
529 
530  outwork->data = outdata;
531  outwork->size = outsize;
532  // ...
533  // (possible) TODO: extend outwork with some error message, which will be eventually
534  // reported in the master proces
535  // ...
536  return outwork;
537 }
538 
539 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
540 
541 std::unique_ptr<AthenaInterprocess::ScheduledWork>
543 {
544  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
545 
546  bool all_ok(true);
547 
548  if(m_appMgr->stop().isFailure()) {
549  ATH_MSG_ERROR("Unable to stop AppMgr");
550  all_ok=false;
551  } else {
552  if(m_appMgr->finalize().isFailure()) {
553  std::cerr << "Unable to finalize AppMgr" << std::endl;
554  all_ok=false;
555  }
556  }
557 
558  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
559 
560  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
561  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
562  void* outdata = malloc(outsize);
563  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
565  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
566  int nEvt = -1;
567  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
568 
569  outwork->data = outdata;
570  outwork->size = outsize;
571 
572  return outwork;
573 }
574 
575 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
576 
577 int
578 SharedHiveEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
579 {
580  if(!presult) return 0;
582  ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
583  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) return 0;
584 
586  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
587  if(func==AthenaMPToolBase::FUNC_EXEC) {
588  // Store the number of processed events
589  int nevt(0);
590  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
591  m_nProcessedEvents[presult->pid]=nevt;
592  ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events");
593 
594  if(doFinalize) {
595  // Add PID to the finalization queue
596  m_finQueue.push(presult->pid);
597  ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
598 
599  // If this is the only element in the queue then start its finalization
600  // Otherwise it has to wait its turn until all previous processes have been finalized
601  if(m_finQueue.size()==1) {
602  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
603  || m_processGroup->map_async(0,0,presult->pid)) {
604  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
605  return 1;
606  } else {
607  ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
608  }
609  }
610  }
611  } else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
612  ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
613  pid_t pid = m_finQueue.front();
614  if(pid==presult->pid) {
615  // pid received as expected. Remove it from the queue
616  m_finQueue.pop();
617  ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
618  // Schedule finalization of the next processe in the queue
619  if(m_finQueue.size()) {
620  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
621  || m_processGroup->map_async(0,0,m_finQueue.front())) {
622  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
623  return 1;
624  } else {
625  ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
626  }
627  }
628  } else {
629  // Error: unexpected pid received from presult
630  ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
631  return 1;
632  }
633  }
634 
635  return 0;
636 }
637 
638 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
639 
642 
643  if (m_evtProcessor.release().isFailure()) {
644  ATH_MSG_INFO("could not release old EventProcessor ");
645  }
646 
647  ISvcManager* pISM(dynamic_cast<ISvcManager*>(serviceLocator().get()));
648  if (pISM == 0) {
649  ATH_MSG_ERROR("initHive: Could not get SvcManager");
650  } else {
651  if (pISM->removeService(m_evtProcessor.name()).isFailure()) {
652  ATH_MSG_ERROR("initHive: could not remove " << m_evtProcessor.name()
653  << " from SvcManager");
654  }
655  }
656 
657  m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaMtesEventLoopMgr",name());
658 
659  if (m_evtProcessor.retrieve().isFailure()) {
660  ATH_MSG_ERROR("could not setup " << m_evtProcessor.typeAndName());
661  return StatusCode::FAILURE;
662  }
663 
664  m_schedulerSvc = serviceLocator()->service("AvalancheSchedulerSvc");
665 
666  // m_whiteboard = serviceLocator()->service(m_whiteboardName);
667  // if( !m_whiteboard.isValid() ) {
668  // ATH_MSG_FATAL( "Error retrieving " << m_whiteboardName
669  // << " interface IHiveWhiteBoard." );
670  // return StatusCode::FAILURE;
671  // }
672 
673  // m_schedulerSvc = serviceLocator()->service(m_schedulerName);
674  // if ( !m_schedulerSvc.isValid()){
675  // ATH_MSG_FATAL( "Error retrieving SchedulerSvc interface ISchedulerSvc." );
676  // return StatusCode::FAILURE;
677  // }
678  // // Setup algorithm resource pool
679  // m_algResourcePool = serviceLocator()->service("AlgResourcePool");
680  // if( !m_algResourcePool.isValid() ) {
681  // ATH_MSG_FATAL ("Error retrieving AlgResourcePool");
682  // return StatusCode::FAILURE;
683  // }
684 
685  // sc = m_eventStore.retrieve();
686  // if( !sc.isSuccess() ) {
687  // ATH_MSG_FATAL("Error retrieving pointer to StoreGateSvc");
688  // return sc;
689  // }
690 
691 
692  return StatusCode::SUCCESS;
693 
694 }
695 
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
IHybridProcessorHelper::setCurrentEventNum
virtual void setCurrentEventNum(int num)=0
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
AthenaMPToolBase::m_evtSelector
IEvtSelector * m_evtSelector
Definition: AthenaMPToolBase.h:94
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
SharedHiveEvtQueueConsumer::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
Definition: SharedHiveEvtQueueConsumer.h:83
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer
virtual ~SharedHiveEvtQueueConsumer() override
Definition: SharedHiveEvtQueueConsumer.cxx:59
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
conifer::pow
constexpr int pow(int x)
Definition: conifer.h:20
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:68
SharedHiveEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedHiveEvtQueueConsumer.cxx:223
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:32
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:362
skel.it
it
Definition: skel.GENtoEVGEN.py:423
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:86
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
ProcessGroup.h
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
SharedHiveEvtQueueConsumer::m_evtSelSeek
IEvtSelectorSeek * m_evtSelSeek
Definition: SharedHiveEvtQueueConsumer.h:74
SharedHiveEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedHiveEvtQueueConsumer.h:78
AthExHiveOpts.chunkSize
chunkSize
Definition: AthExHiveOpts.py:101
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:460
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
SharedHiveEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:542
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedHiveEvtQueueConsumer_d
Definition: SharedHiveEvtQueueConsumer.cxx:32
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:28
SharedHiveEvtQueueConsumer::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: SharedHiveEvtQueueConsumer.h:80
IHybridProcessorHelper
Helper interface for implementing hybrid MP+MT. Used by the Hybrid Shared Event Queue Consumer MP too...
Definition: IHybridProcessorHelper.h:15
sigset_t
int sigset_t
Definition: SealSignal.h:80
SharedHiveEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedHiveEvtQueueConsumer.h:77
IEvtSelectorSeek::seek
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:92
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:65
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
Incidents.h
SharedHiveEvtQueueConsumer_d::pauseForDebug
void pauseForDebug(int)
Definition: SharedHiveEvtQueueConsumer.cxx:34
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:14
SharedHiveEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:373
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:81
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:129
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:341
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
SharedHiveEvtQueueConsumer.h
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
merge.output
output
Definition: merge.py:17
SharedHiveEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:238
SharedHiveEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedHiveEvtQueueConsumer.h:75
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:64
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:278
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
SharedHiveEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedHiveEvtQueueConsumer.cxx:98
SharedHiveEvtQueueConsumer_d::sig_done
std::atomic< bool > sig_done
Definition: SharedHiveEvtQueueConsumer.cxx:33
grepfile.filenames
list filenames
Definition: grepfile.py:34
ir
int ir
counter of the current depth
Definition: fastadd.cxx:49
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer
SharedHiveEvtQueueConsumer()
SharedHiveEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedHiveEvtQueueConsumer.cxx:203
SharedHiveEvtQueueConsumer::initHive
StatusCode initHive()
Definition: SharedHiveEvtQueueConsumer.cxx:641
SharedHiveEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedHiveEvtQueueConsumer.h:70
IHybridProcessorHelper.h
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
SharedHiveEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedHiveEvtQueueConsumer.h:72
IHybridProcessorHelper::drainScheduler
virtual int drainScheduler(int &finishedEvts, bool report)=0
AthCommonMsg< AlgTool >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
IHybridProcessorHelper::resetAppReturnCode
virtual void resetAppReturnCode()=0
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedHiveEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedHiveEvtQueueConsumer.h:81
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:425
SharedHiveEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedHiveEvtQueueConsumer.cxx:65
SharedHiveEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedHiveEvtQueueConsumer.h:65
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:369
SharedHiveEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedHiveEvtQueueConsumer.h:61
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
SharedHiveEvtQueueConsumer::m_dataShare
IDataShare * m_dataShare
Definition: SharedHiveEvtQueueConsumer.h:73
IHybridProcessorHelper::terminateLoop
virtual bool terminateLoop()=0
ServiceHandle< IEventProcessor >
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:90