ATLAS Offline Software
SharedHiveEvtQueueConsumer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 #include "copy_file_icc_hack.h"
9 
13 #include "GaudiKernel/IEvtSelector.h"
14 #include "GaudiKernel/IIoComponentMgr.h"
15 #include "GaudiKernel/IFileMgr.h"
16 #include "GaudiKernel/IChronoStatSvc.h"
17 #include "GaudiKernel/ISvcLocator.h"
18 #include "GaudiKernel/IIncidentSvc.h"
19 #include "GaudiKernel/IConversionSvc.h"
20 
21 #include <sys/stat.h>
22 #include <sstream>
23 #include <fstream>
24 #include <unistd.h>
25 #include <stdio.h>
26 #include <stdint.h>
27 #include <stdexcept>
28 #include <cmath> // For pow
29 
30 #include <signal.h>
31 
33  std::atomic<bool> sig_done = false;
34  void pauseForDebug(int /*sig*/) {
35  // std::cout << "Continuing after receiving signal "
36  // << sig << std::endl;
37  sig_done = true;
38  }
39 }
40 
42  , const std::string& name
43  , const IInterface* parent)
45  , m_rankId(-1)
46  , m_chronoStatSvc("ChronoStatSvc", name)
47  , m_evtSelSeek(0)
48  , m_evtContext(0)
49  , m_sharedEventQueue(0)
50  , m_sharedRankQueue(0)
51 {
52  declareInterface<IAthenaMPTool>(this);
53 
54  m_subprocDirPrefix = "worker_";
55 }
56 
57 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
58 
60 {
61 }
62 
63 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
64 
66 {
67  ATH_MSG_DEBUG("In initialize");
68 
70  if(!sc.isSuccess())
71  return sc;
72 
73  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
74  ATH_CHECK( m_evtSelSeek.isValid() );
75  ATH_CHECK( evtSelector()->createContext (m_evtContext) );
76 
77  ATH_CHECK(m_chronoStatSvc.retrieve());
78 
79  SmartIF<IConversionSvc> cnvSvc(serviceLocator()->service("AthenaPoolCnvSvc"));
80  m_dataShare = SmartIF<IDataShare>(cnvSvc);
81  if(!m_dataShare) {
82  if(m_useSharedWriter) {
83  ATH_MSG_ERROR("Error retrieving AthenaPoolCnvSvc " << cnvSvc);
84  return StatusCode::FAILURE;
85  }
86  }
87 
88  return StatusCode::SUCCESS;
89 }
90 
91 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
92 
95 {
96  if (m_evtContext) {
97  ATH_CHECK( evtSelector()->releaseContext (m_evtContext) );
98  m_evtContext = nullptr;
99  }
100 
101  delete m_sharedRankQueue;
102  return StatusCode::SUCCESS;
103 }
104 
105 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
106 
107 int
108 SharedHiveEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
109 {
110  ATH_MSG_DEBUG("In makePool " << getpid());
111 
112  if(nprocs==0 || nprocs<-1) {
113  ATH_MSG_ERROR("Invalid value for the nprocs parameter: ");
114  return -1;
115  }
116 
117  if(topdir.empty()) {
118  ATH_MSG_ERROR("Empty name for the top directory!");
119  return -1;
120  }
121 
122  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
123  m_subprocTopDir = topdir;
124 
125  // Get the shared event queue
126  ATH_MSG_DEBUG("Event queue name " << "AthenaMPEventQueue_" << m_randStr);
127  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
128  if(sc.isFailure()) {
129  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Event Queue");
130  return -1;
131  }
132 
133 
134  // Create rank queue and fill it
135  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedHiveEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
136  for(int i=0; i<m_nprocs; ++i)
137  if(!m_sharedRankQueue->send_basic<int>(i)) {
138  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
139  return -1;
140  }
141 
142  // Create the process group and map_async bootstrap
144  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
145  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
146  return -1;
147  ATH_MSG_INFO("Workers bootstraped");
148 
149  return m_nprocs;
150 }
151 
152 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
153 
154 StatusCode
156 {
157  ATH_MSG_DEBUG("In exec " << getpid());
158 
159  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
160  return StatusCode::FAILURE;
161  ATH_MSG_INFO("Workers started processing events");
162 
163  return StatusCode::SUCCESS;
164 }
165 
166 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
167 
168 StatusCode
169 SharedHiveEvtQueueConsumer::wait_once(pid_t& pid)
170 {
171  StatusCode sc = AthenaMPToolBase::wait_once(pid);
173  if(sc.isFailure()) {
174  // We are to stop waiting. Pull all available ProcessResults from the queue
175  // Don't serialize finalizations
176  do {
177  presult = m_processGroup->pullOneResult();
178  if(presult && (unsigned)(presult->output.size)>sizeof(int))
179  decodeProcessResult(presult,false);
180  if(presult) free(presult->output.data);
181  delete presult;
182  } while(presult);
183  } else {
184  // Pull one result and decode it if necessary
185  presult = m_processGroup->pullOneResult();
186  int res(0);
187  if(presult && (unsigned)(presult->output.size)>sizeof(int))
188  res = decodeProcessResult(presult,true);
189  if(presult) free(presult->output.data);
190  delete presult;
191  if(res) return StatusCode::FAILURE;
192  }
193  return sc;
194 }
195 
196 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
197 
198 void
200 {
201  ATH_MSG_INFO("Statuses of event processors");
202  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
203  for(size_t i=0; i<statuses.size(); ++i) {
204  // Get the number of events processed by this worker
205  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
206  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
207  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
208  << ". Number of events processed: ";
209  if(it==m_nProcessedEvents.end())
210  msg(MSG::INFO) << "N/A" << endmsg;
211  else
212  msg(MSG::INFO) << it->second << endmsg;
213  }
214 }
215 
216 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
217 
218 void
220 {
221  filenames.clear();
222  for(int i=0; i<m_nprocs; ++i) {
223  std::ostringstream workerIndex;
224  workerIndex << i;
225  std::filesystem::path worker_rundir(m_subprocTopDir);
226  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
227  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
228  }
229 }
230 
231 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
232 
233 std::unique_ptr<AthenaInterprocess::ScheduledWork>
235 {
236  if (m_debug) {
237  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
238  sigset_t mask, oldmask;
239 
241 
242  sigemptyset (&mask);
243  sigaddset (&mask, SIGUSR1);
244 
245  sigprocmask (SIG_BLOCK, &mask, &oldmask);
247  sigsuspend (&oldmask);
248  sigprocmask (SIG_UNBLOCK, &mask, NULL);
249  }
250 
251  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
252  outwork->data = malloc(sizeof(int));
253  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
254  outwork->size = sizeof(int);
255 
256  // ...
257  // (possible) TODO: extend outwork with some error message, which will be eventually
258  // reported in the master proces
259  // ...
260 
261  // ________________________ Get IncidentSvc and fire PostFork ________________________
262  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
263  if (!p_incidentSvc) {
264  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
265  return outwork;
266  }
267  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
268 
269 
270  // ________________________ Get RankID ________________________
271  //
273  ATH_MSG_ERROR("Unable to get rank ID!");
274  return outwork;
275  }
276  std::ostringstream workindex;
277  workindex<<m_rankId;
278 
279  // ________________________ Worker dir: mkdir ________________________
280  std::filesystem::path worker_rundir(m_subprocTopDir);
281  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
282  // TODO: this "worker_" can be made configurable too
283 
284  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
285  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
286  return outwork;
287  }
288 
289  // ________________________ Redirect logs ________________________
290  if(redirectLog(worker_rundir.string()))
291  return outwork;
292 
293  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
294 
295  // ________________________ Update Io Registry ____________________________
296  if(updateIoReg(worker_rundir.string()))
297  return outwork;
298 
299  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
300 
301  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
302  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
303  if(std::filesystem::is_regular_file("SimParams.db"))
304  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
305  if(std::filesystem::is_regular_file("DigitParams.db"))
306  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
307  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
308  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
309 
310  // _______________________ Handle saved PFC (if any) ______________________
311  if(handleSavedPfc(abs_worker_rundir))
312  return outwork;
313 
314  // ________________________ reopen descriptors ____________________________
315  if(reopenFds())
316  return outwork;
317 
318  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
319 
320  // ________________________ Make Shared Writer Client ________________________
321 
323  SmartIF<IProperty> propertyServer(m_dataShare);
324  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
325  ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property");
326  return outwork;
327  } else {
328  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
329  }
330  }
331 
332  // ________________________ I/O reinit ________________________
333  if(!m_ioMgr->io_reinitialize().isSuccess()) {
334  ATH_MSG_ERROR("Failed to reinitialize I/O");
335  return outwork;
336  } else {
337  ATH_MSG_DEBUG("Successfully reinitialized I/O");
338  }
339 
340  // ________________________ Event selector restart ________________________
341  SmartIF<IService> evtSelSvc(m_evtSelector);
342  if(!evtSelSvc) {
343  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
344  return outwork;
345  }
346  if(!evtSelSvc->start().isSuccess()) {
347  ATH_MSG_ERROR("Failed to restart the event selector");
348  return outwork;
349  } else {
350  ATH_MSG_DEBUG("Successfully restarted the event selector");
351  }
352 
353  // ________________________ Worker dir: chdir ________________________
354  if(chdir(worker_rundir.string().c_str())==-1) {
355  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
356  return outwork;
357  }
358 
359  // ___________________ Fire UpdateAfterFork incident _________________
360  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
361 
362  // Declare success and return
363  *(int*)(outwork->data) = 0;
364  return outwork;
365 }
366 
367 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
368 
369 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedHiveEvtQueueConsumer::exec_func()
370 {
371  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
372 
373  bool all_ok(true);
374 
375  if (!initHive().isSuccess()) {
376  ATH_MSG_FATAL("unable to initialize Hive");
377  all_ok = false;
378  }
379 
380  // Get the value of SkipEvent
381  int skipEvents(0);
382  SmartIF<IProperty> propertyServer(m_evtSelector);
383  if(propertyServer==0) {
384  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
385  all_ok = false;
386  }
387  else {
388  std::string propertyName("SkipEvents");
389  IntegerProperty skipEventsProp(propertyName,skipEvents);
390  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
391  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
392  }
393  else {
394  skipEvents = skipEventsProp.value();
395  }
396  }
397 
398  IHybridProcessorHelper* hybridHelper = dynamic_cast<IHybridProcessorHelper*>(m_evtProcessor.get());
399  if(!hybridHelper) {
400  ATH_MSG_FATAL("Failed to acquire IHybridProcessorHelper interface");
401  all_ok = false;
402  return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
403  }
404  // Reset the application return code.
405  hybridHelper->resetAppReturnCode();
406 
407  int finishedEvts =0;
408  int createdEvts =0;
409  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
410  long evtnumAndChunk(0);
411 // unsigned evtCounter(0);
412  int evtnum(0), chunkSize(1);
413 
414  ATH_MSG_INFO("Starting loop on events");
415 
416  StatusCode sc(StatusCode::SUCCESS);
417 
418  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
419  ATH_MSG_DEBUG("Event queue is empty");
420  usleep(1000);
421  }
422  bool loop_ended = (evtnumAndChunk<0);
423  if(!loop_ended) {
424  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
425  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
426  evtnum = evtnumAndChunk & intmask;
427  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
428  hybridHelper->setCurrentEventNum(++evtnum);
429  }
430 
431  bool no_more_events = false;
432 
433  while(!loop_ended) {
434  ATH_MSG_DEBUG(" -> createdEvts: " << createdEvts);
435 
436  if(!hybridHelper->terminateLoop() // No scheduled loop termination
437  && !no_more_events // We are not yet done getting events
438  && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
439  ATH_MSG_DEBUG("createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots());
440 
441  auto ctx = m_evtProcessor->createEventContext();
442  if(!ctx.valid()) {
443  sc = StatusCode::FAILURE;
444  }
445  else {
446  sc = m_evtProcessor->executeEvent(std::move(ctx));
447  }
448 
449  if (sc.isFailure()) {
450  ATH_MSG_ERROR("Terminating event processing loop due to errors");
451  loop_ended = true;
452  }
453  else {
454  ++createdEvts;
455  if(--chunkSize==0) {
456  // Fetch next chunk
457  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
458  ATH_MSG_DEBUG("Event queue is empty");
459  usleep(1000);
460  }
461  if(evtnumAndChunk<0) {
462  no_more_events = true;
463  evtnumAndChunk *= -1;
464  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
465  }
466  else {
467  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
468  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
469  evtnum = evtnumAndChunk & intmask;
470  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
471  }
472  }
473  // Advance to the next event
474  if(!no_more_events) {
475  hybridHelper->setCurrentEventNum(++evtnum);
476  }
477  }
478  }
479  else {
480  // all the events were created but not all finished or the slots were
481  // all busy: the scheduler should finish its job
482  ATH_MSG_DEBUG("Draining the scheduler");
483 
484  // Pull out of the scheduler the finished events
485  int ir = hybridHelper->drainScheduler(finishedEvts,true);
486  if(ir < 0) {
487  // some sort of error draining scheduler;
488  loop_ended = true;
489  sc = StatusCode::FAILURE;
490  }
491  else if(ir == 0) {
492  // no more events in scheduler
493  if(no_more_events) {
494  // We are done
495  loop_ended = true;
496  sc = StatusCode::SUCCESS;
497  }
498  }
499  else {
500  // keep going!
501  }
502  }
503  } // end main loop on finished events
504 
505  if(all_ok) {
506  if(m_evtProcessor->executeRun(0).isFailure()) {
507  ATH_MSG_ERROR("Could not finalize the Run");
508  all_ok=false;
509  } else {
510  if(m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents).isFailure()) {
511  ATH_MSG_DEBUG("Seek past maxevt to " << evtnumAndChunk+skipEvents << " returned failure. As expected...");
512  }
513  }
514  }
515 
516  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
517 
518  // Return value: "ERRCODE|Func_Flag|NEvt"
519  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
520  void* outdata = malloc(outsize);
521  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
523  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
524  memcpy((char*)outdata+sizeof(int)+sizeof(func),&createdEvts,sizeof(int));
525 
526  outwork->data = outdata;
527  outwork->size = outsize;
528  // ...
529  // (possible) TODO: extend outwork with some error message, which will be eventually
530  // reported in the master proces
531  // ...
532  return outwork;
533 }
534 
535 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
536 
537 std::unique_ptr<AthenaInterprocess::ScheduledWork>
539 {
540  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
541 
542  bool all_ok(true);
543 
544  if(m_appMgr->stop().isFailure()) {
545  ATH_MSG_ERROR("Unable to stop AppMgr");
546  all_ok=false;
547  } else {
548  if(m_appMgr->finalize().isFailure()) {
549  std::cerr << "Unable to finalize AppMgr" << std::endl;
550  all_ok=false;
551  }
552  }
553 
554  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
555 
556  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
557  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
558  void* outdata = malloc(outsize);
559  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
561  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
562  int nEvt = -1;
563  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
564 
565  outwork->data = outdata;
566  outwork->size = outsize;
567 
568  return outwork;
569 }
570 
571 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
572 
573 int
574 SharedHiveEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
575 {
576  if(!presult) return 0;
578  ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
579  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) return 0;
580 
582  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
583  if(func==AthenaMPToolBase::FUNC_EXEC) {
584  // Store the number of processed events
585  int nevt(0);
586  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
587  m_nProcessedEvents[presult->pid]=nevt;
588  ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events");
589 
590  if(doFinalize) {
591  // Add PID to the finalization queue
592  m_finQueue.push(presult->pid);
593  ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
594 
595  // If this is the only element in the queue then start its finalization
596  // Otherwise it has to wait its turn until all previous processes have been finalized
597  if(m_finQueue.size()==1) {
598  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
599  || m_processGroup->map_async(0,0,presult->pid)) {
600  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
601  return 1;
602  } else {
603  ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
604  }
605  }
606  }
607  } else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
608  ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
609  pid_t pid = m_finQueue.front();
610  if(pid==presult->pid) {
611  // pid received as expected. Remove it from the queue
612  m_finQueue.pop();
613  ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
614  // Schedule finalization of the next processe in the queue
615  if(m_finQueue.size()) {
616  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
617  || m_processGroup->map_async(0,0,m_finQueue.front())) {
618  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
619  return 1;
620  } else {
621  ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
622  }
623  }
624  } else {
625  // Error: unexpected pid received from presult
626  ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
627  return 1;
628  }
629  }
630 
631  return 0;
632 }
633 
634 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
635 
638 
639  if (m_evtProcessor.release().isFailure()) {
640  ATH_MSG_INFO("could not release old EventProcessor ");
641  }
642 
643  ISvcManager* pISM(dynamic_cast<ISvcManager*>(serviceLocator().get()));
644  if (pISM == 0) {
645  ATH_MSG_ERROR("initHive: Could not get SvcManager");
646  } else {
647  if (pISM->removeService(m_evtProcessor.name()).isFailure()) {
648  ATH_MSG_ERROR("initHive: could not remove " << m_evtProcessor.name()
649  << " from SvcManager");
650  }
651  }
652 
653  m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaMtesEventLoopMgr",name());
654 
655  if (m_evtProcessor.retrieve().isFailure()) {
656  ATH_MSG_ERROR("could not setup " << m_evtProcessor.typeAndName());
657  return StatusCode::FAILURE;
658  }
659 
660  m_schedulerSvc = serviceLocator()->service("AvalancheSchedulerSvc");
661 
662  // m_whiteboard = serviceLocator()->service(m_whiteboardName);
663  // if( !m_whiteboard.isValid() ) {
664  // ATH_MSG_FATAL( "Error retrieving " << m_whiteboardName
665  // << " interface IHiveWhiteBoard." );
666  // return StatusCode::FAILURE;
667  // }
668 
669  // m_schedulerSvc = serviceLocator()->service(m_schedulerName);
670  // if ( !m_schedulerSvc.isValid()){
671  // ATH_MSG_FATAL( "Error retrieving SchedulerSvc interface ISchedulerSvc." );
672  // return StatusCode::FAILURE;
673  // }
674  // // Setup algorithm resource pool
675  // m_algResourcePool = serviceLocator()->service("AlgResourcePool");
676  // if( !m_algResourcePool.isValid() ) {
677  // ATH_MSG_FATAL ("Error retrieving AlgResourcePool");
678  // return StatusCode::FAILURE;
679  // }
680 
681  // sc = m_eventStore.retrieve();
682  // if( !sc.isSuccess() ) {
683  // ATH_MSG_FATAL("Error retrieving pointer to StoreGateSvc");
684  // return sc;
685  // }
686 
687 
688  return StatusCode::SUCCESS;
689 
690 }
691 
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
IHybridProcessorHelper::setCurrentEventNum
virtual void setCurrentEventNum(int num)=0
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
SharedHiveEvtQueueConsumer::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
Definition: SharedHiveEvtQueueConsumer.h:83
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer
virtual ~SharedHiveEvtQueueConsumer() override
Definition: SharedHiveEvtQueueConsumer.cxx:59
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:68
SharedHiveEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedHiveEvtQueueConsumer.cxx:219
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:358
skel.it
it
Definition: skel.GENtoEVGEN.py:396
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:86
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
ProcessGroup.h
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
SharedHiveEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedHiveEvtQueueConsumer.h:78
AthExHiveOpts.chunkSize
chunkSize
Definition: AthExHiveOpts.py:101
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:460
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
SharedHiveEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:538
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedHiveEvtQueueConsumer_d
Definition: SharedHiveEvtQueueConsumer.cxx:32
SharedHiveEvtQueueConsumer::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: SharedHiveEvtQueueConsumer.h:80
IHybridProcessorHelper
Helper interface for implementing hybrid MP+MT. Used by the Hybrid Shared Event Queue Consumer MP too...
Definition: IHybridProcessorHelper.h:16
sigset_t
int sigset_t
Definition: SealSignal.h:80
SharedHiveEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedHiveEvtQueueConsumer.h:77
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
SharedHiveEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedHiveEvtQueueConsumer.h:74
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:94
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:85
SharedHiveEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedHiveEvtQueueConsumer.h:73
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:65
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
Incidents.h
SharedHiveEvtQueueConsumer_d::pauseForDebug
void pauseForDebug(int)
Definition: SharedHiveEvtQueueConsumer.cxx:34
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:14
SharedHiveEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:369
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:81
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:337
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
SharedHiveEvtQueueConsumer.h
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
merge.output
output
Definition: merge.py:17
SharedHiveEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:234
SharedHiveEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedHiveEvtQueueConsumer.h:75
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:281
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
SharedHiveEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedHiveEvtQueueConsumer.cxx:94
SharedHiveEvtQueueConsumer_d::sig_done
std::atomic< bool > sig_done
Definition: SharedHiveEvtQueueConsumer.cxx:33
grepfile.filenames
list filenames
Definition: grepfile.py:34
ir
int ir
counter of the current depth
Definition: fastadd.cxx:49
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer
SharedHiveEvtQueueConsumer()
SharedHiveEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedHiveEvtQueueConsumer.cxx:199
SharedHiveEvtQueueConsumer::initHive
StatusCode initHive()
Definition: SharedHiveEvtQueueConsumer.cxx:637
SharedHiveEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedHiveEvtQueueConsumer.h:70
IHybridProcessorHelper.h
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
SharedHiveEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedHiveEvtQueueConsumer.h:72
IHybridProcessorHelper::drainScheduler
virtual int drainScheduler(int &finishedEvts, bool report)=0
AthCommonMsg< AlgTool >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
IHybridProcessorHelper::resetAppReturnCode
virtual void resetAppReturnCode()=0
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedHiveEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedHiveEvtQueueConsumer.h:81
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:421
SharedHiveEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedHiveEvtQueueConsumer.cxx:65
SharedHiveEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedHiveEvtQueueConsumer.h:65
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:365
SharedHiveEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedHiveEvtQueueConsumer.h:61
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
IHybridProcessorHelper::terminateLoop
virtual bool terminateLoop()=0
ServiceHandle< IEventProcessor >
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:90