ATLAS Offline Software
SharedEvtQueueConsumer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 #include "copy_file_icc_hack.h"
9 
14 #include "GaudiKernel/IEvtSelector.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IFileMgr.h"
17 #include "GaudiKernel/IChronoStatSvc.h"
18 #include "GaudiKernel/ISvcLocator.h"
19 #include "GaudiKernel/IIncidentSvc.h"
20 #include "GaudiKernel/IConversionSvc.h"
21 
22 #include <filesystem>
23 
24 #include <sys/stat.h>
25 #include <sstream>
26 #include <fstream>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <stdexcept>
31 #include <cmath> // For pow
32 
34  , const std::string& name
35  , const IInterface* parent)
37  , m_useSharedReader(false)
38  , m_useSharedWriter(false)
39  , m_isRoundRobin(false)
40  , m_nEventsBeforeFork(0)
41  , m_nSkipEvents(0)
42  , m_debug(false)
43  , m_rankId(-1)
44  , m_chronoStatSvc("ChronoStatSvc", name)
45  , m_evtSeek(nullptr)
46  , m_evtSelSeek(nullptr)
47  , m_evtContext(nullptr)
48  , m_evtShare(nullptr)
49  , m_dataShare(nullptr)
50  , m_sharedEventQueue(nullptr)
51  , m_sharedRankQueue(nullptr)
52  , m_readEventOrders(false)
53  , m_eventOrdersFile("athenamp_eventorders.txt")
54  , m_masterPid(getpid())
55 {
56  declareInterface<IAthenaMPTool>(this);
57 
58  declareProperty("UseSharedReader",m_useSharedReader);
59  declareProperty("UseSharedWriter",m_useSharedWriter);
60  declareProperty("IsRoundRobin",m_isRoundRobin);
61  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
62  declareProperty("Debug", m_debug);
63  declareProperty("ReadEventOrders",m_readEventOrders);
64  declareProperty("EventOrdersFile",m_eventOrdersFile);
65 
66  m_subprocDirPrefix = "worker_";
67 }
68 
70 {
71 }
72 
74 {
75  ATH_MSG_DEBUG("In initialize");
76 
78 
79  // For pile-up jobs use event loop manager for seeking
80  // otherwise use event selector
81  if(m_isPileup) {
82  m_evtSeek = SmartIF<IEventSeek>(m_evtProcessor.get());
83  if(!m_evtSeek) {
84  ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85  return StatusCode::FAILURE;
86  }
87  }
88  else if(m_evtSelector) {
89  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
90  ATH_CHECK(m_evtSelSeek.isValid());
91  }
92 
93  if(m_evtSelector) {
94  ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
95 
96  m_evtShare = serviceLocator()->service(m_evtSelName);
97  if(!m_evtShare) {
98  if(m_useSharedReader) {
99  ATH_MSG_ERROR("Error retrieving IEventShare");
100  return StatusCode::FAILURE;
101  }
102  ATH_MSG_INFO("Could not retrieve IEventShare");
103  }
104 
105  //FIXME: AthenaPool dependent for now
106 
107  m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolCnvSvc"));
108  if(!m_dataShare) {
109  if(m_useSharedWriter) {
110  ATH_MSG_ERROR("Error retrieving AthenaPoolCnvSvc");
111  return StatusCode::FAILURE;
112  }
113  }
114  }
115 
116  ATH_CHECK(m_chronoStatSvc.retrieve());
117 
118  return StatusCode::SUCCESS;
119 }
120 
122 {
123  if(getpid()==m_masterPid) {
124  ATH_MSG_INFO("finalize() in the master process");
125  // Merge saved event orders into one in the master run directory
126 
127  // 1. Check if master run directory already contains a file with saved orders
128  // If so, then rename it with random suffix
130  if(std::filesystem::exists(ordersFile)) {
131  srand((unsigned)time(0));
132  std::ostringstream randname;
133  randname << rand();
134  std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
135  ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
136  ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
137 
138  std::filesystem::path ordersFileBakpath(ordersFileBak);
139  std::filesystem::rename(ordersFile,ordersFileBakpath);
140  }
141 
142  // 2. Merge workers event orders into the master file
143  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::out);
144  for(int i=0; i<m_nprocs; ++i) {
145  std::ostringstream workerIndex;
146  workerIndex << i;
147  std::filesystem::path worker_rundir(m_subprocTopDir);
148  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
149  std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
150  ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
151  std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
152  std::string line;
153  while(fs_worker.good()) {
154  std::getline(fs_worker,line);
155  fs << line << std::endl;
156  }
157  fs_worker.close();
158  }
159  fs.close();
160  } // if(getpid()==m_masterPid)
161 
162  if (m_evtContext) {
163  ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
164  m_evtContext = nullptr;
165  }
166 
167  delete m_sharedRankQueue;
168  return StatusCode::SUCCESS;
169 }
170 
171 int SharedEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
172 {
173  ATH_MSG_DEBUG("In makePool " << getpid());
174 
175  if(nprocs==0 || nprocs<-1) {
176  ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
177  return -1;
178  }
179 
180  if(topdir.empty()) {
181  ATH_MSG_ERROR("Empty name for the top directory!");
182  return -1;
183  }
184 
185  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
186  m_subprocTopDir = topdir;
187 
188  // Get the shared event queue
189  ATH_MSG_DEBUG("Event queue name AthenaMPEventQueue_" << m_randStr);
190  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
191  if(sc.isFailure()) {
192  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Event Queue");
193  return -1;
194  }
195 
196 
197  // Create rank queue and fill it
198  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
199  for(int i=0; i<m_nprocs; ++i)
200  if(!m_sharedRankQueue->send_basic<int>(i)) {
201  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
202  return -1;
203  }
204 
205  // Create the process group and map_async bootstrap
207  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
208  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
209  return -1;
210  ATH_MSG_INFO("Workers bootstraped");
211 
212  return m_nprocs;
213 }
214 
216 {
217  ATH_MSG_DEBUG("In exec " << getpid());
218 
219  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
220  return StatusCode::FAILURE;
221  ATH_MSG_INFO("Workers started processing events");
222 
223  return StatusCode::SUCCESS;
224 }
225 
226 StatusCode SharedEvtQueueConsumer::wait_once(pid_t& pid)
227 {
228  StatusCode sc = AthenaMPToolBase::wait_once(pid);
230  if(sc.isFailure()) {
231  // We are to stop waiting. Pull all available ProcessResults from the queue
232  // Don't serialize finalizations
233  do {
234  presult = m_processGroup->pullOneResult();
235  if(presult && (unsigned)(presult->output.size)>sizeof(int))
236  decodeProcessResult(presult,false);
237  if(presult) free(presult->output.data);
238  delete presult;
239  } while(presult);
240  }
241  else {
242  // Pull one result and decode it if necessary
243  presult = m_processGroup->pullOneResult();
244  int res(0);
245  if(presult && (unsigned)(presult->output.size)>sizeof(int))
246  res = decodeProcessResult(presult,true);
247  if(presult) free(presult->output.data);
248  delete presult;
249  if(res) return StatusCode::FAILURE;
250  }
251  return sc;
252 }
253 
255 {
256  ATH_MSG_INFO("Statuses of event processors");
257  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
258  for(size_t i=0; i<statuses.size(); ++i) {
259  // Get the number of events processed by this worker
260  auto it = m_eventStat.find(statuses[i].pid);
261  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
262  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
263  << ". Number of events processed: ";
264  if(it==m_eventStat.end())
265  msg(MSG::INFO) << "N/A" << endmsg;
266  else
267  msg(MSG::INFO) << it->second.first
268  << ", Event Loop Time: " << it->second.second << "sec."
269  << endmsg;
270  }
271 }
272 
273 void SharedEvtQueueConsumer::subProcessLogs(std::vector<std::string>& filenames)
274 {
275  filenames.clear();
276  for(int i=0; i<m_nprocs; ++i) {
277  std::ostringstream workerIndex;
278  workerIndex << i;
279  std::filesystem::path worker_rundir(m_subprocTopDir);
280  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
281  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
282  }
283 }
284 
285 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::bootstrap_func()
286 {
287  if(m_debug) waitForSignal();
288 
289  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
290  outwork->data = malloc(sizeof(int));
291  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
292  outwork->size = sizeof(int);
293 
294  // For PileUp Digi Fork-After-N-Events >>>>
295  // Retrieve cuEvent-s for all background event selectors, if we forked after N events
296  std::map<IService*,int> bkgEvtSelectors;
297 
298  if(m_isPileup) {
299  for(IService* ptrSvc : serviceLocator()->getServices()) {
300  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
301  if(evtsel && (evtsel != m_evtSelector)) {
302  if(m_nEventsBeforeFork>0) {
303  IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
304  if(evtselseek) {
305  bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
306  }
307  else {
308  ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
309  return outwork;
310  }
311  }
312  else {
313  bkgEvtSelectors.emplace(ptrSvc,0);
314  }
315  }
316  }
317  }
318  // <<<< For PileUp Digi Fork-After-N-Events
319 
320  // ...
321  // (possible) TODO: extend outwork with some error message, which will be eventually
322  // reported in the master proces
323  // ...
324 
325  // ________________________ Get IncidentSvc and fire PostFork ________________________
326  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
327  if(!p_incidentSvc) {
328  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
329  return outwork;
330  }
331  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
332 
333 
334  // ________________________ Get RankID ________________________
335  //
337  ATH_MSG_ERROR("Unable to get rank ID!");
338  return outwork;
339  }
340  std::ostringstream workindex;
341  workindex<<m_rankId;
342 
343  // ________________________ Worker dir: mkdir ________________________
344  std::filesystem::path worker_rundir(m_subprocTopDir);
345  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
346  // TODO: this "worker_" can be made configurable too
347 
348  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
350  return outwork;
351  }
352 
353  // __________ Redirect logs unless we want to attach debugger ____________
354  if(!m_debug) {
355  if(redirectLog(worker_rundir.string()))
356  return outwork;
357 
358  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
359  }
360 
361  // ________________________ Update Io Registry ____________________________
362  if(updateIoReg(worker_rundir.string()))
363  return outwork;
364 
365  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
366 
367  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
368  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
369  if(std::filesystem::is_regular_file("SimParams.db"))
370  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
371  if(std::filesystem::is_regular_file("DigitParams.db"))
372  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
373  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
374  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
375 
376  // _______________________ Handle saved PFC (if any) ______________________
377  if(handleSavedPfc(abs_worker_rundir))
378  return outwork;
379 
380  // ________________________ reopen descriptors ____________________________
381  if(reopenFds())
382  return outwork;
383 
384  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
385 
386 
387  // ________________________ Make Shared Reader/Writer Client ________________________
389  if(!m_evtShare->makeClient(m_rankId).isSuccess()) {
390  ATH_MSG_ERROR("Failed to make the event selector a share client");
391  return outwork;
392  }
393  else {
394  ATH_MSG_DEBUG("Successfully made the event selector a share client");
395  }
396  }
397 
399  SmartIF<IProperty> propertyServer(m_dataShare);
400  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
401  ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property");
402  return outwork;
403  }
404  else {
405  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
406  }
407  }
408 
409  // ________________________ I/O reinit ________________________
410  if(!m_ioMgr->io_reinitialize().isSuccess()) {
411  ATH_MSG_ERROR("Failed to reinitialize I/O");
412  return outwork;
413  }
414  else {
415  ATH_MSG_DEBUG("Successfully reinitialized I/O");
416  }
417 
418  // _______________ Get the value of SkipEvent ________________________
419  if(m_evtSelector) {
420  SmartIF<IProperty> propertyServer(m_evtSelector);
421  if(!propertyServer) {
422  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
423  return outwork;
424  }
425  else {
426  std::string propertyName("SkipEvents");
427  IntegerProperty skipEventsProp(propertyName,m_nSkipEvents);
428  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
430  }
431  else {
432  m_nSkipEvents = skipEventsProp.value();
433  }
434  }
435 
436 
437  // ________________________ Event selector restart ________________________
438  SmartIF<IService> evtSelSvc(m_evtSelector);
439  if(!evtSelSvc) {
440  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
441  return outwork;
442  }
443  if(!evtSelSvc->start().isSuccess()) {
444  ATH_MSG_ERROR("Failed to restart the event selector");
445  return outwork;
446  }
447  else {
448  ATH_MSG_DEBUG("Successfully restarted the event selector");
449  }
450  }
451  // For PileUp jobs >>>>
452  // Main event selector: advance it if we either forked after N events, or skipEvents!=0
453  // Background event selectors: restart, and advance if we forked after N events
454  if(m_isPileup) {
455  // Deal with the main event selector first
456  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
457  if(!m_evtSelSeek) {
458  ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
459  return outwork;
460  }
461 
464  ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
465  return outwork;
466  }
467 
468  // Deal with background event selectors
469  for(auto [evtsel,curEvt] : bkgEvtSelectors) {
470  if(evtsel->start().isSuccess()) {
471  if (m_nEventsBeforeFork>0) {
472  SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
473  if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
474  ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
475  return outwork;
476  }
477  }
478  }
479  else {
480  ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
481  return outwork;
482  }
483  }
484  }
485  // <<<< For PileUp jobs
486 
487  // _______________________ Event orders for debugging ________________________________
488  if(m_readEventOrders) {
489  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::in);
490  if(fs.good()) {
491  ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
492  while(fs.good()){
493  std::string line;
494  std::getline(fs,line);
495  if(line.empty())continue;
496 
497  // Parse the string
498  size_t idx(0);
499  int rank = std::stoi(line,&idx);
500  if(rank==m_rankId) {
501  msg(MSG::INFO) << "This worker will proces the following events #";
502  while(idx<line.size()-1) {
503  line = line.substr(idx+1);
504  int evtnum = std::stoi(line,&idx);
505  m_eventOrders.push_back(evtnum);
506  msg(MSG::INFO) << " " << evtnum;
507  }
508  msg(MSG::INFO) << endmsg;
509  }
510  }
511  if(m_eventOrders.empty()) {
512  ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
513  return outwork;
514  }
515  fs.close();
516  }
517  else {
518  ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
519  return outwork;
520  }
521  }
522 
523  // ________________________ Worker dir: chdir ________________________
524  if(chdir(worker_rundir.string().c_str())==-1) {
525  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
526  return outwork;
527  }
528 
529  // ___________________ Fire UpdateAfterFork incident _________________
530  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
531 
532  // Declare success and return
533  *(int*)(outwork->data) = 0;
534  return outwork;
535 }
536 
537 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::exec_func()
538 {
539  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
540 
541  bool all_ok(true);
542 
543  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
544  int nEvt(m_nEventsBeforeFork);
545  int nEventsProcessed(0);
546  long evtnumAndChunk(0);
547 
548  unsigned evtCounter(0);
549  int evtnum(0), chunkSize(1);
550  auto predefinedEvt = m_eventOrders.cbegin();
551 
552  // If the event orders file already exists in worker's run directory, then it's an unexpected error!
554  if(std::filesystem::exists(ordersFile)) {
555  ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
556  all_ok = false;
557  }
558 
559  // For the round robin we need to know the maximum number of events for this job
560  if(m_isRoundRobin) {
561  evtnumAndChunk = 1;
562  while(evtnumAndChunk>0) {
563  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
564  usleep(1000);
565  }
566  }
567  evtnumAndChunk *= -1;
568  }
569 
570  System::ProcessTime time_start = System::getProcessTime();
571  if(all_ok) {
572  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::out);
573  fs << m_rankId;
574  bool firstOrder(true);
575  while(true) {
576  if(m_isRoundRobin) {
577  evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
578  if(evtnum>=evtnumAndChunk+m_nSkipEvents) {
579  break;
580  }
581  evtCounter++;
582  }
583  else {
584  if(m_readEventOrders) {
585  if(predefinedEvt==m_eventOrders.cend()) break;
586  evtnum = *predefinedEvt;
587  predefinedEvt++;
588  fs << (firstOrder?":":",") << evtnum;
589  fs.flush();
590  firstOrder=false;
591  ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
592  }
593  else {
594  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
595  // The event queue is empty, but we should check whether there are more events to come or not
596  ATH_MSG_DEBUG("Event queue is empty");
597  usleep(1000);
598  continue;
599  }
600  if(evtnumAndChunk<=0) {
601  evtnumAndChunk *= -1;
602  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
603  break;
604  }
605  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
606  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
607  evtnum = evtnumAndChunk & intmask;
608  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
609 
610  // Save event order
611  for(int i(0);i<chunkSize;++i) {
612  fs << (firstOrder?":":",") << evtnum+i;
613  firstOrder=false;
614  }
615  fs.flush();
616  } // Get event numbers from the shared queue
617  } // Not RoundRobin
618  nEvt+=chunkSize;
619  StatusCode sc;
620  if(m_useSharedReader) {
621  sc = m_evtShare->share(evtnum);
622  if(sc.isFailure()){
623  ATH_MSG_ERROR("Unable to share " << evtnum);
624  all_ok=false;
625  break;
626  }
627  else {
628  ATH_MSG_INFO("Share of " << evtnum << " succeeded");
629  }
630  }
631  else if(m_evtSelector) {
632  m_chronoStatSvc->chronoStart("AthenaMP_seek");
633  if (m_evtSeek) {
634  sc=m_evtSeek->seek(evtnum);
635  }
636  else {
637  sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
638  }
639  if(sc.isFailure()){
640  ATH_MSG_ERROR("Unable to seek to " << evtnum);
641  all_ok=false;
642  break;
643  }
644  else {
645  ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
646  }
647  m_chronoStatSvc->chronoStop("AthenaMP_seek");
648  }
649  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
650  sc = m_evtProcessor->nextEvent(nEvt);
651  nEventsProcessed += chunkSize;
652  if(sc.isFailure()){
653  if(chunkSize==1) {
654  ATH_MSG_ERROR("Unable to process event " << evtnum);
655  }
656  else {
657  ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
658  }
659  all_ok=false;
660  break;
661  }
662  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
663  }
664  fs.close();
665  }
666  System::ProcessTime time_delta = System::getProcessTime() - time_start;
667  TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
668 
669  if(all_ok) {
670  if(m_evtProcessor->executeRun(0).isFailure()) {
671  ATH_MSG_ERROR("Could not finalize the Run");
672  all_ok=false;
673  }
674  else if(!m_useSharedReader && m_evtSelector) {
675  StatusCode sc;
676  if (m_evtSeek) {
677  sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
678  }
679  else {
680  sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
681  }
682  if(sc.isFailure()) {
683  ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
684  }
685  }
686  }
687 
688  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
689 
690  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
691  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
692  void* outdata = malloc(outsize);
693  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
695  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
696  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
697  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
698  outwork->data = outdata;
699  outwork->size = outsize;
700  // ...
701  // (possible) TODO: extend outwork with some error message, which will be eventually
702  // reported in the master proces
703  // ...
704  return outwork;
705 }
706 
707 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::fin_func()
708 {
709  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
710 
711  bool all_ok(true);
712 
713  if(m_appMgr->stop().isFailure()) {
714  ATH_MSG_ERROR("Unable to stop AppMgr");
715  all_ok=false;
716  }
717  else {
718  if(m_appMgr->finalize().isFailure()) {
719  std::cerr << "Unable to finalize AppMgr" << std::endl;
720  all_ok=false;
721  }
722  }
723 
724  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
725 
726  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
727  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
728  void* outdata = malloc(outsize);
729  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
731  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
732  int nEvt = -1;
733  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
734  TimeValType elapsed = -1;
735  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
736 
737  outwork->data = outdata;
738  outwork->size = outsize;
739 
740  return outwork;
741 }
742 
743 int SharedEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
744 {
745  if(!presult) return 0;
747  ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
748  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType)) return 0;
749 
751  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
752  if(func==AthenaMPToolBase::FUNC_EXEC) {
753  // Store the number of processed events
754  int nevt(0);
755  TimeValType elapsed(0);
756  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
757  memcpy(&elapsed,(char*)output.data+2*+sizeof(int)+sizeof(func),sizeof(TimeValType));
758  m_eventStat[presult->pid]=std::pair<int,TimeValType>(nevt,elapsed);
759  ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events in " << elapsed << "sec.");
760 
761  if(doFinalize) {
762  // Add PID to the finalization queue
763  m_finQueue.push(presult->pid);
764  ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
765 
766  // If this is the only element in the queue then start its finalization
767  // Otherwise it has to wait its turn until all previous processes have been finalized
768  if(m_finQueue.size()==1) {
769  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
770  || m_processGroup->map_async(0,0,presult->pid)) {
771  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
772  return 1;
773  }
774  else {
775  ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
776  }
777  }
778  }
779  }
780  else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
781  ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
782  pid_t pid = m_finQueue.front();
783  if(pid==presult->pid) {
784  // pid received as expected. Remove it from the queue
785  m_finQueue.pop();
786  ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
787  // Schedule finalization of the next processe in the queue
788  if(m_finQueue.size()) {
789  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
790  || m_processGroup->map_async(0,0,m_finQueue.front())) {
791  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
792  return 1;
793  }
794  else {
795  ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
796  }
797  }
798  }
799  else {
800  // Error: unexpected pid received from presult
801  ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
802  return 1;
803  }
804  }
805 
806  return 0;
807 }
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:61
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:428
SharedEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedEvtQueueConsumer.h:78
checkFileSG.line
line
Definition: checkFileSG.py:75
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
IEventSeek.h
Abstract interface for seeking within an event stream.
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
SharedEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedEvtQueueConsumer.cxx:73
SharedEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueConsumer.cxx:707
SharedEvtQueueConsumer::m_evtSeek
SmartIF< IEventSeek > m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
SharedEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:68
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:358
skel.it
it
Definition: skel.GENtoEVGEN.py:396
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:86
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
ProcessGroup.h
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
AthExHiveOpts.chunkSize
chunkSize
Definition: AthExHiveOpts.py:101
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:64
SharedEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueConsumer.cxx:273
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:94
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
IEvtSelectorSeek::curEvent
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:99
lumiFormat.i
int i
Definition: lumiFormat.py:85
SharedEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedEvtQueueConsumer.h:71
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:65
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
Incidents.h
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:14
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:83
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SharedEvtQueueConsumer::m_isRoundRobin
bool m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:59
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
SharedEvtQueueConsumer::m_useSharedWriter
bool m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:58
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:337
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
SharedEvtQueueConsumer::m_useSharedReader
bool m_useSharedReader
Definition: SharedEvtQueueConsumer.h:57
SharedEvtQueueConsumer::m_readEventOrders
bool m_readEventOrders
Definition: SharedEvtQueueConsumer.h:81
SharedEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedEvtQueueConsumer.cxx:121
SharedEvtQueueConsumer.h
SharedEvtQueueConsumer::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueConsumer.h:70
merge.output
output
Definition: merge.py:17
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
SharedEvtQueueConsumer::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:281
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:84
SharedEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueConsumer.cxx:285
grepfile.filenames
list filenames
Definition: grepfile.py:34
SharedEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueConsumer.cxx:537
SharedEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedEvtQueueConsumer.cxx:254
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthCommonMsg< AlgTool >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
Herwig7_QED_EvtGen_ll.fs
dictionary fs
Definition: Herwig7_QED_EvtGen_ll.py:17
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
SharedEvtQueueConsumer::~SharedEvtQueueConsumer
virtual ~SharedEvtQueueConsumer() override
Definition: SharedEvtQueueConsumer.cxx:69
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedEvtQueueConsumer::SharedEvtQueueConsumer
SharedEvtQueueConsumer()
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:421
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
SharedEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:365
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
SharedEvtQueueConsumer::m_debug
bool m_debug
Definition: SharedEvtQueueConsumer.h:62
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:90
SharedEvtQueueConsumer::m_eventOrdersFile
std::string m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:82