ATLAS Offline Software
SharedEvtQueueConsumer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 #include "copy_file_icc_hack.h"
9 
14 #include "CxxUtils/xmalloc.h"
15 #include "GaudiKernel/IEvtSelector.h"
16 #include "GaudiKernel/IIoComponentMgr.h"
17 #include "GaudiKernel/IFileMgr.h"
18 #include "GaudiKernel/IChronoStatSvc.h"
19 #include "GaudiKernel/ISvcLocator.h"
20 #include "GaudiKernel/IIncidentSvc.h"
21 #include "GaudiKernel/IConversionSvc.h"
22 
23 #include <filesystem>
24 
25 #include <sys/stat.h>
26 #include <sstream>
27 #include <fstream>
28 #include <unistd.h>
29 #include <stdio.h>
30 #include <stdint.h>
31 #include <stdexcept>
32 #include <cmath> // For pow
33 
35  , const std::string& name
36  , const IInterface* parent)
38  , m_chronoStatSvc("ChronoStatSvc", name)
39  , m_masterPid(getpid())
40 {
41  m_subprocDirPrefix = "worker_";
42 }
43 
45 {
46 }
47 
49 {
50  ATH_MSG_DEBUG("In initialize");
51 
53 
54  // For pile-up jobs use event loop manager for seeking
55  // otherwise use event selector
56  if(m_isPileup) {
57  m_evtSeek = SmartIF<IEventSeek>(m_evtProcessor.get());
58  if(!m_evtSeek) {
59  ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
60  return StatusCode::FAILURE;
61  }
62  }
63  else if(m_evtSelector) {
64  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
65  ATH_CHECK(m_evtSelSeek.isValid());
66  }
67 
68  if(m_evtSelector) {
69  ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
70 
71  m_evtShare = serviceLocator()->service(m_evtSelName);
72  if(!m_evtShare) {
73  if(m_useSharedReader) {
74  ATH_MSG_ERROR("Error retrieving IEventShare");
75  return StatusCode::FAILURE;
76  }
77  ATH_MSG_INFO("Could not retrieve IEventShare");
78  }
79 
80  //FIXME: AthenaPool dependent for now
81 
82  if(m_useSharedWriter) {
83  m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
84  if(!m_dataShare) {
85  ATH_MSG_ERROR("Error retrieving AthenaPoolSharedIOCnvSvc");
86  return StatusCode::FAILURE;
87  }
88  }
89  }
90 
91  ATH_CHECK(m_chronoStatSvc.retrieve());
92 
93  return StatusCode::SUCCESS;
94 }
95 
97 {
98  if(getpid()==m_masterPid) {
99  ATH_MSG_INFO("finalize() in the master process");
100  // Merge saved event orders into one in the master run directory
101 
102  // 1. Check if master run directory already contains a file with saved orders
103  // If so, then rename it with random suffix
104  std::filesystem::path ordersFile(m_eventOrdersFile.value());
105  if(std::filesystem::exists(ordersFile)) {
106  srand((unsigned)time(0));
107  std::ostringstream randname;
108  randname << rand();
109  std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
110  ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
111  ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
112 
113  std::filesystem::path ordersFileBakpath(ordersFileBak);
114  std::filesystem::rename(ordersFile,ordersFileBakpath);
115  }
116 
117  // 2. Merge workers event orders into the master file
118  std::fstream fs(m_eventOrdersFile,std::fstream::out);
119  for(int i=0; i<m_nprocs; ++i) {
120  std::ostringstream workerIndex;
121  workerIndex << i;
122  std::filesystem::path worker_rundir(m_subprocTopDir);
123  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
124  std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
125  ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
126  std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
127  std::string line;
128  while(fs_worker.good()) {
129  std::getline(fs_worker,line);
130  fs << line << std::endl;
131  }
132  fs_worker.close();
133  }
134  fs.close();
135  } // if(getpid()==m_masterPid)
136 
137  if (m_evtContext) {
138  ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
139  m_evtContext = nullptr;
140  }
141 
142  return StatusCode::SUCCESS;
143 }
144 
145 int SharedEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
146 {
147  ATH_MSG_DEBUG("In makePool " << getpid());
148 
149  if(nprocs==0 || nprocs<-1) {
150  ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
151  return -1;
152  }
153 
154  if(topdir.empty()) {
155  ATH_MSG_ERROR("Empty name for the top directory!");
156  return -1;
157  }
158 
159  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
160  m_subprocTopDir = topdir;
161 
162  // Get the shared event queue
163  ATH_MSG_DEBUG("Event queue name AthenaMPEventQueue_" << m_randStr);
164  ATH_CHECK( detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr), -1);
165 
166  // Create rank queue and fill it
167  m_sharedRankQueue = std::make_unique<AthenaInterprocess::SharedQueue>("SharedEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
168  for(int i=0; i<m_nprocs; ++i)
169  if(!m_sharedRankQueue->send_basic<int>(i)) {
170  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
171  return -1;
172  }
173 
174  // Create the process group and map_async bootstrap
176  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
177  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
178  return -1;
179  ATH_MSG_INFO("Workers bootstrapped");
180 
181  return m_nprocs;
182 }
183 
185 {
186  ATH_MSG_DEBUG("In exec " << getpid());
187 
188  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
189  return StatusCode::FAILURE;
190  ATH_MSG_INFO("Workers started processing events");
191 
192  return StatusCode::SUCCESS;
193 }
194 
195 StatusCode SharedEvtQueueConsumer::wait_once(pid_t& pid)
196 {
197  StatusCode sc = AthenaMPToolBase::wait_once(pid);
199  if(sc.isFailure()) {
200  // We are to stop waiting. Pull all available ProcessResults from the queue
201  // Don't serialize finalizations
202  do {
203  presult = m_processGroup->pullOneResult();
204  if(presult && (unsigned)(presult->output.size)>sizeof(int))
205  decodeProcessResult(presult,false);
206  if(presult) free(presult->output.data);
207  delete presult;
208  } while(presult);
209  }
210  else {
211  // Pull one result and decode it if necessary
212  presult = m_processGroup->pullOneResult();
213  int res(0);
214  if(presult && (unsigned)(presult->output.size)>sizeof(int))
215  res = decodeProcessResult(presult,true);
216  if(presult) free(presult->output.data);
217  delete presult;
218  if(res) return StatusCode::FAILURE;
219  }
220  return sc;
221 }
222 
224 {
225  ATH_MSG_INFO("Statuses of event processors");
226  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
227  for(size_t i=0; i<statuses.size(); ++i) {
228  // Get the number of events processed by this worker
229  auto it = m_eventStat.find(statuses[i].pid);
230  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
231  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
232  << ". Number of events processed: ";
233  if(it==m_eventStat.end())
234  msg(MSG::INFO) << "N/A" << endmsg;
235  else
236  msg(MSG::INFO) << it->second.first
237  << ", Event Loop Time: " << it->second.second << "sec."
238  << endmsg;
239  }
240 }
241 
242 void SharedEvtQueueConsumer::subProcessLogs(std::vector<std::string>& filenames)
243 {
244  filenames.clear();
245  for(int i=0; i<m_nprocs; ++i) {
246  std::ostringstream workerIndex;
247  workerIndex << i;
248  std::filesystem::path worker_rundir(m_subprocTopDir);
249  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
250  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
251  }
252 }
253 
254 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::bootstrap_func()
255 {
256  if(m_debug) waitForSignal();
257 
258  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
259  outwork->data = CxxUtils::xmalloc(sizeof(int));
260  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
261  outwork->size = sizeof(int);
262 
263  // For PileUp Digi Fork-After-N-Events >>>>
264  // Retrieve cuEvent-s for all background event selectors, if we forked after N events
265  std::map<IService*,int> bkgEvtSelectors;
266 
267  if(m_isPileup) {
268  for(IService* ptrSvc : serviceLocator()->getServices()) {
269  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
270  if(evtsel && (evtsel != m_evtSelector)) {
271  if(m_nEventsBeforeFork>0) {
272  IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
273  if(evtselseek) {
274  bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
275  }
276  else {
277  ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
278  return outwork;
279  }
280  }
281  else {
282  bkgEvtSelectors.emplace(ptrSvc,0);
283  }
284  }
285  }
286  }
287  // <<<< For PileUp Digi Fork-After-N-Events
288 
289  // ...
290  // (possible) TODO: extend outwork with some error message, which will be eventually
291  // reported in the master proces
292  // ...
293 
294  // ________________________ Get IncidentSvc and fire PostFork ________________________
295  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
296  if(!p_incidentSvc) {
297  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
298  return outwork;
299  }
300  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
301 
302 
303  // ________________________ Get RankID ________________________
304  //
305  if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
306  ATH_MSG_ERROR("Unable to get rank ID!");
307  return outwork;
308  }
309  std::ostringstream workindex;
310  workindex<<m_rankId;
311 
312  // ________________________ Worker dir: mkdir ________________________
313  std::filesystem::path worker_rundir(m_subprocTopDir);
314  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
315  // TODO: this "worker_" can be made configurable too
316 
317  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
318  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
319  return outwork;
320  }
321 
322  // __________ Redirect logs unless we want to attach debugger ____________
323  if(!m_debug) {
324  if(redirectLog(worker_rundir.string()))
325  return outwork;
326 
327  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
328  }
329 
330  // ________________________ Update Io Registry ____________________________
331  if(updateIoReg(worker_rundir.string()))
332  return outwork;
333 
334  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
335 
336  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
337  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
338  if(std::filesystem::is_regular_file("SimParams.db"))
339  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
340  if(std::filesystem::is_regular_file("DigitParams.db"))
341  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
342  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
343  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
344 
345  // _______________________ Handle saved PFC (if any) ______________________
346  if(handleSavedPfc(abs_worker_rundir))
347  return outwork;
348 
349  // ________________________ reopen descriptors ____________________________
350  if(reopenFds())
351  return outwork;
352 
353  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
354 
355 
356  // ________________________ Make Shared Reader/Writer Client ________________________
358  ATH_CHECK( m_evtShare->makeClient(m_rankId), outwork);
359  }
360 
362  SmartIF<IProperty> propertyServer(m_dataShare);
363  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
364  ATH_MSG_ERROR("Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
365  return outwork;
366  }
367  else {
368  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
369  }
370  }
371 
372  // ________________________ I/O reinit ________________________
373  ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
374 
375  // _______________ Get the value of SkipEvent ________________________
376  if(m_evtSelector) {
377  SmartIF<IProperty> propertyServer(m_evtSelector);
378  ATH_CHECK( propertyServer.isValid(), outwork);
379 
380  IntegerProperty skipEventsProp("SkipEvents", m_nSkipEvents);
381  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
382  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
383  }
384  else {
385  m_nSkipEvents = skipEventsProp.value();
386  }
387 
388  // ________________________ Event selector restart ________________________
389  SmartIF<IService> evtSelSvc(m_evtSelector);
390  ATH_CHECK( evtSelSvc.isValid(), outwork);
391  ATH_CHECK( evtSelSvc->start(), outwork);
392  }
393  // For PileUp jobs >>>>
394  // Main event selector: advance it if we either forked after N events, or skipEvents!=0
395  // Background event selectors: restart, and advance if we forked after N events
396  if(m_isPileup) {
397  // Deal with the main event selector first
398  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
399  if(!m_evtSelSeek) {
400  ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
401  return outwork;
402  }
403 
406  ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
407  return outwork;
408  }
409 
410  // Deal with background event selectors
411  for(auto [evtsel,curEvt] : bkgEvtSelectors) {
412  if(evtsel->start().isSuccess()) {
413  if (m_nEventsBeforeFork>0) {
414  SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
415  if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
416  ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
417  return outwork;
418  }
419  }
420  }
421  else {
422  ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
423  return outwork;
424  }
425  }
426  }
427  // <<<< For PileUp jobs
428 
429  // _______________________ Event orders for debugging ________________________________
430  if(m_readEventOrders) {
431  std::fstream fs(m_eventOrdersFile,std::fstream::in);
432  if(fs.good()) {
433  ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
434  while(fs.good()){
435  std::string line;
436  std::getline(fs,line);
437  if(line.empty())continue;
438 
439  // Parse the string
440  size_t idx(0);
441  int rank = std::stoi(line,&idx);
442  if(rank==m_rankId) {
443  msg(MSG::INFO) << "This worker will proces the following events #";
444  while(idx<line.size()-1) {
445  line = line.substr(idx+1);
446  int evtnum = std::stoi(line,&idx);
447  m_eventOrders.push_back(evtnum);
448  msg(MSG::INFO) << " " << evtnum;
449  }
450  msg(MSG::INFO) << endmsg;
451  }
452  }
453  if(m_eventOrders.empty()) {
454  ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
455  return outwork;
456  }
457  fs.close();
458  }
459  else {
460  ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
461  return outwork;
462  }
463  }
464 
465  // ________________________ Worker dir: chdir ________________________
466  if(chdir(worker_rundir.string().c_str())==-1) {
467  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
468  return outwork;
469  }
470 
471  // ___________________ Fire UpdateAfterFork incident _________________
472  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
473 
474  // Declare success and return
475  *(int*)(outwork->data) = 0;
476  return outwork;
477 }
478 
479 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::exec_func()
480 {
481  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
482 
483  bool all_ok(true);
484 
485  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
486  int nEvt(m_nEventsBeforeFork);
487  int nEventsProcessed(0);
488  long evtnumAndChunk(0);
489 
490  unsigned evtCounter(0);
491  int evtnum(0), chunkSize(1);
492  auto predefinedEvt = m_eventOrders.cbegin();
493 
494  // If the event orders file already exists in worker's run directory, then it's an unexpected error!
495  std::filesystem::path ordersFile(m_eventOrdersFile.value());
496  if(std::filesystem::exists(ordersFile)) {
497  ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
498  all_ok = false;
499  }
500 
501  System::ProcessTime time_start = System::getProcessTime();
502  if(all_ok) {
503  std::fstream fs(m_eventOrdersFile,std::fstream::out);
504  fs << m_rankId;
505  bool firstOrder(true);
506  while(true) {
507  if(m_isRoundRobin) {
508  evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
509  if(m_maxEvt!=-1 && evtnum>=m_maxEvt+m_nSkipEvents) {
510  break;
511  }
512  evtCounter++;
513  }
514  else {
515  if(m_readEventOrders) {
516  if(predefinedEvt==m_eventOrders.cend()) break;
517  evtnum = *predefinedEvt;
518  predefinedEvt++;
519  fs << (firstOrder?":":",") << evtnum;
520  fs.flush();
521  firstOrder=false;
522  ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
523  }
524  else {
525  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
526  // The event queue is empty, but we should check whether there are more events to come or not
527  ATH_MSG_DEBUG("Event queue is empty");
528  usleep(1000);
529  continue;
530  }
531  if(evtnumAndChunk<=0) {
532  evtnumAndChunk *= -1;
533  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
534  break;
535  }
536  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
537  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
538  evtnum = evtnumAndChunk & intmask;
539  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
540 
541  // Save event order
542  for(int i(0);i<chunkSize;++i) {
543  fs << (firstOrder?":":",") << evtnum+i;
544  firstOrder=false;
545  }
546  fs.flush();
547  } // Get event numbers from the shared queue
548  } // Not RoundRobin
549  nEvt+=chunkSize;
550  StatusCode sc;
551  if(m_useSharedReader) {
552  sc = m_evtShare->share(evtnum);
553  if(sc.isFailure()){
554  ATH_MSG_ERROR("Unable to share " << evtnum);
555  all_ok=false;
556  break;
557  }
558  else {
559  ATH_MSG_INFO("Share of " << evtnum << " succeeded");
560  }
561  }
562  else if(m_evtSelector) {
563  m_chronoStatSvc->chronoStart("AthenaMP_seek");
564  if (m_evtSeek) {
565  sc=m_evtSeek->seek(evtnum);
566  }
567  else {
568  sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
569  }
570  if(sc.isFailure()){
571  ATH_MSG_ERROR("Unable to seek to " << evtnum);
572  all_ok=false;
573  break;
574  }
575  else {
576  ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
577  }
578  m_chronoStatSvc->chronoStop("AthenaMP_seek");
579  }
580  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
581  sc = m_evtProcessor->nextEvent(nEvt);
582  nEventsProcessed += chunkSize;
583  if(sc.isFailure()){
584  if(chunkSize==1) {
585  ATH_MSG_ERROR("Unable to process event " << evtnum);
586  }
587  else {
588  ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
589  }
590  all_ok=false;
591  break;
592  }
593  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
594  if(m_mpRunStop->stopScheduled()) {
595  ATH_MSG_INFO("Scheduled stop");
596  break;
597  }
598  }
599  fs.close();
600  }
601  System::ProcessTime time_delta = System::getProcessTime() - time_start;
602  TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
603 
604  if(all_ok) {
605  if(m_evtProcessor->executeRun(0).isFailure()) {
606  ATH_MSG_ERROR("Could not finalize the Run");
607  all_ok=false;
608  }
609  else if(!m_useSharedReader && m_evtSelector) {
610  StatusCode sc;
611  if (m_evtSeek) {
612  sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
613  }
614  else {
615  sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
616  }
617  if(sc.isFailure()) {
618  ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
619  }
620  }
621  }
622 
623  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
624 
625  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
626  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
627  void* outdata = CxxUtils::xmalloc(outsize);
628  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
630  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
631  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
632  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
633  outwork->data = outdata;
634  outwork->size = outsize;
635  // ...
636  // (possible) TODO: extend outwork with some error message, which will be eventually
637  // reported in the master proces
638  // ...
639  return outwork;
640 }
641 
642 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::fin_func()
643 {
644  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
645 
646  bool all_ok(true);
647 
648  if(m_appMgr->stop().isFailure()) {
649  ATH_MSG_ERROR("Unable to stop AppMgr");
650  all_ok=false;
651  }
652  else {
653  if(m_appMgr->finalize().isFailure()) {
654  std::cerr << "Unable to finalize AppMgr" << std::endl;
655  all_ok=false;
656  }
657  }
658 
659  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
660 
661  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
662  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
663  void* outdata = CxxUtils::xmalloc(outsize);
664  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
666  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
667  int nEvt = -1;
668  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
669  TimeValType elapsed = -1;
670  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
671 
672  outwork->data = outdata;
673  outwork->size = outsize;
674 
675  return outwork;
676 }
677 
678 int SharedEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
679 {
680  if(!presult) return 0;
682  ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
683  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType)) return 0;
684 
686  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
687  if(func==AthenaMPToolBase::FUNC_EXEC) {
688  // Store the number of processed events
689  int nevt(0);
690  TimeValType elapsed(0);
691  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
692  memcpy(&elapsed,(char*)output.data+2*+sizeof(int)+sizeof(func),sizeof(TimeValType));
693  m_eventStat[presult->pid]=std::pair<int,TimeValType>(nevt,elapsed);
694  ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events in " << elapsed << "sec.");
695 
696  if(doFinalize) {
697  // Add PID to the finalization queue
698  m_finQueue.push(presult->pid);
699  ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
700 
701  // If this is the only element in the queue then start its finalization
702  // Otherwise it has to wait its turn until all previous processes have been finalized
703  if(m_finQueue.size()==1) {
704  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
705  || m_processGroup->map_async(0,0,presult->pid)) {
706  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
707  return 1;
708  }
709  else {
710  ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
711  }
712  }
713  }
714  }
715  else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
716  ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
717  pid_t pid = m_finQueue.front();
718  if(pid==presult->pid) {
719  // pid received as expected. Remove it from the queue
720  m_finQueue.pop();
721  ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
722  // Schedule finalization of the next processe in the queue
723  if(m_finQueue.size()) {
724  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
725  || m_processGroup->map_async(0,0,m_finQueue.front())) {
726  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
727  return 1;
728  }
729  else {
730  ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
731  }
732  }
733  }
734  else {
735  // Error: unexpected pid received from presult
736  ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
737  return 1;
738  }
739  }
740 
741  return 0;
742 }
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:64
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
SharedEvtQueueConsumer::m_isRoundRobin
Gaudi::Property< bool > m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:57
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:403
SharedEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedEvtQueueConsumer.h:78
SharedEvtQueueConsumer::m_useSharedReader
Gaudi::Property< bool > m_useSharedReader
Definition: SharedEvtQueueConsumer.h:55
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
IEventSeek.h
Abstract interface for seeking within an event stream.
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
SharedEvtQueueConsumer::m_readEventOrders
Gaudi::Property< bool > m_readEventOrders
Definition: SharedEvtQueueConsumer.h:59
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
SharedEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedEvtQueueConsumer.cxx:48
SharedEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueConsumer.cxx:642
SharedEvtQueueConsumer::m_evtSeek
SmartIF< IEventSeek > m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
SharedEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
skel.it
it
Definition: skel.GENtoEVGEN.py:407
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:70
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Name of the event selector.
Definition: AthenaMPToolBase.h:89
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
ProcessGroup.h
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
dq_defect_bulk_create_defects.line
line
Definition: dq_defect_bulk_create_defects.py:27
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:63
SharedEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueConsumer.cxx:242
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
IEvtSelectorSeek::curEvent
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:103
lumiFormat.i
int i
Definition: lumiFormat.py:85
SharedEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedEvtQueueConsumer.h:71
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:67
SharedEvtQueueConsumer::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
Incidents.h
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:11
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:81
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
SharedEvtQueueConsumer::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SharedEvtQueueConsumer::m_eventOrdersFile
Gaudi::Property< std::string > m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:61
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:92
SharedEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedEvtQueueConsumer.cxx:96
SharedEvtQueueConsumer.h
SharedEvtQueueConsumer::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueConsumer.h:70
merge.output
output
Definition: merge.py:16
python.PyKernel.detStore
detStore
Definition: PyKernel.py:41
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
SharedEvtQueueConsumer::SharedEvtQueueConsumer
SharedEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
Definition: SharedEvtQueueConsumer.cxx:34
AthenaInterprocess::IMPRunStop::stopScheduled
virtual bool stopScheduled() const =0
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
python.plotting.G4DebuggerUtils.rename
def rename(label)
Definition: G4DebuggerUtils.py:11
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:82
SharedEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueConsumer.cxx:254
grepfile.filenames
list filenames
Definition: grepfile.py:34
SharedEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueConsumer.cxx:479
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Maximum number of events assigned to the job.
Definition: AthenaMPToolBase.h:86
SharedEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedEvtQueueConsumer.cxx:223
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.Constants.INFO
int INFO
Definition: Control/AthenaCommon/python/Constants.py:15
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
Herwig7_QED_EvtGen_ll.fs
dictionary fs
Definition: Herwig7_QED_EvtGen_ll.py:17
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
SharedEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:56
SharedEvtQueueConsumer::~SharedEvtQueueConsumer
virtual ~SharedEvtQueueConsumer() override
Definition: SharedEvtQueueConsumer.cxx:44
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
SharedEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedEvtQueueConsumer.h:58
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
xmalloc.h
Trapping version of malloc.
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:94