ATLAS Offline Software
SharedEvtQueueConsumer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 #include "copy_file_icc_hack.h"
9 
14 #include "GaudiKernel/IEvtSelector.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IFileMgr.h"
17 #include "GaudiKernel/IChronoStatSvc.h"
18 #include "GaudiKernel/ISvcLocator.h"
19 #include "GaudiKernel/IIncidentSvc.h"
20 #include "GaudiKernel/IConversionSvc.h"
21 
22 #include <filesystem>
23 
24 #include <sys/stat.h>
25 #include <sstream>
26 #include <fstream>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <stdexcept>
31 #include <cmath> // For pow
32 
34  , const std::string& name
35  , const IInterface* parent)
37  , m_useSharedReader(false)
38  , m_useSharedWriter(false)
39  , m_isRoundRobin(false)
40  , m_nEventsBeforeFork(0)
41  , m_nSkipEvents(0)
42  , m_debug(false)
43  , m_rankId(-1)
44  , m_chronoStatSvc("ChronoStatSvc", name)
45  , m_evtSeek(nullptr)
46  , m_evtSelSeek(nullptr)
47  , m_evtContext(nullptr)
48  , m_evtShare(nullptr)
49  , m_dataShare(nullptr)
50  , m_sharedEventQueue(nullptr)
51  , m_sharedRankQueue(nullptr)
52  , m_readEventOrders(false)
53  , m_eventOrdersFile("athenamp_eventorders.txt")
54  , m_masterPid(getpid())
55 {
56  declareInterface<IAthenaMPTool>(this);
57 
58  declareProperty("UseSharedReader",m_useSharedReader);
59  declareProperty("UseSharedWriter",m_useSharedWriter);
60  declareProperty("IsRoundRobin",m_isRoundRobin);
61  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
62  declareProperty("Debug", m_debug);
63  declareProperty("ReadEventOrders",m_readEventOrders);
64  declareProperty("EventOrdersFile",m_eventOrdersFile);
65 
66  m_subprocDirPrefix = "worker_";
67 }
68 
70 {
71 }
72 
74 {
75  ATH_MSG_DEBUG("In initialize");
76 
78 
79  // For pile-up jobs use event loop manager for seeking
80  // otherwise use event selector
81  if(m_isPileup) {
82  m_evtSeek = dynamic_cast<IEventSeek*>(m_evtProcessor.operator->());
83  if(!m_evtSeek) {
84  ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85  return StatusCode::FAILURE;
86  }
87  }
88  else if(m_evtSelector) {
89  ATH_CHECK(serviceLocator()->service(m_evtSelName,m_evtSelSeek));
90  }
91 
92  if(m_evtSelector) {
93  ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
94 
95  StatusCode sc = serviceLocator()->service(m_evtSelName,m_evtShare);
96  if(sc.isFailure() || m_evtShare==0) {
97  if(m_useSharedReader) {
98  ATH_MSG_ERROR("Error retrieving IEventShare");
99  return StatusCode::FAILURE;
100  }
101  ATH_MSG_INFO("Could not retrieve IEventShare");
102  }
103 
104  //FIXME: AthenaPool dependent for now
105  IConversionSvc* cnvSvc{nullptr};
106  sc = serviceLocator()->service("AthenaPoolCnvSvc",cnvSvc);
107  m_dataShare = dynamic_cast<IDataShare*>(cnvSvc);
108  if(sc.isFailure() || m_dataShare==0) {
109  if(m_useSharedWriter) {
110  ATH_MSG_ERROR("Error retrieving AthenaPoolCnvSvc " << cnvSvc);
111  return StatusCode::FAILURE;
112  }
113  }
114  }
115 
116  ATH_CHECK(m_chronoStatSvc.retrieve());
117 
118  return StatusCode::SUCCESS;
119 }
120 
122 {
123  if(getpid()==m_masterPid) {
124  ATH_MSG_INFO("finalize() in the master process");
125  // Merge saved event orders into one in the master run directory
126 
127  // 1. Check if master run directory already contains a file with saved orders
128  // If so, then rename it with random suffix
130  if(std::filesystem::exists(ordersFile)) {
131  srand((unsigned)time(0));
132  std::ostringstream randname;
133  randname << rand();
134  std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
135  ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
136  ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
137 
138  std::filesystem::path ordersFileBakpath(ordersFileBak);
139  std::filesystem::rename(ordersFile,ordersFileBakpath);
140  }
141 
142  // 2. Merge workers event orders into the master file
143  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::out);
144  for(int i=0; i<m_nprocs; ++i) {
145  std::ostringstream workerIndex;
146  workerIndex << i;
147  std::filesystem::path worker_rundir(m_subprocTopDir);
148  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
149  std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
150  ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
151  std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
152  std::string line;
153  while(fs_worker.good()) {
154  std::getline(fs_worker,line);
155  fs << line << std::endl;
156  }
157  fs_worker.close();
158  }
159  fs.close();
160  } // if(getpid()==m_masterPid)
161 
162  if (m_evtContext) {
163  ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
164  m_evtContext = nullptr;
165  }
166 
167  delete m_sharedRankQueue;
168  return StatusCode::SUCCESS;
169 }
170 
171 int SharedEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
172 {
173  ATH_MSG_DEBUG("In makePool " << getpid());
174 
175  if(nprocs==0 || nprocs<-1) {
176  ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
177  return -1;
178  }
179 
180  if(topdir.empty()) {
181  ATH_MSG_ERROR("Empty name for the top directory!");
182  return -1;
183  }
184 
185  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
186  m_subprocTopDir = topdir;
187 
188  // Get the shared event queue
189  ATH_MSG_DEBUG("Event queue name AthenaMPEventQueue_" << m_randStr);
190  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
191  if(sc.isFailure()) {
192  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Event Queue");
193  return -1;
194  }
195 
196 
197  // Create rank queue and fill it
198  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
199  for(int i=0; i<m_nprocs; ++i)
200  if(!m_sharedRankQueue->send_basic<int>(i)) {
201  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
202  return -1;
203  }
204 
205  // Create the process group and map_async bootstrap
207  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
208  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
209  return -1;
210  ATH_MSG_INFO("Workers bootstraped");
211 
212  return m_nprocs;
213 }
214 
216 {
217  ATH_MSG_DEBUG("In exec " << getpid());
218 
219  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
220  return StatusCode::FAILURE;
221  ATH_MSG_INFO("Workers started processing events");
222 
223  return StatusCode::SUCCESS;
224 }
225 
226 StatusCode SharedEvtQueueConsumer::wait_once(pid_t& pid)
227 {
228  StatusCode sc = AthenaMPToolBase::wait_once(pid);
230  if(sc.isFailure()) {
231  // We are to stop waiting. Pull all available ProcessResults from the queue
232  // Don't serialize finalizations
233  do {
234  presult = m_processGroup->pullOneResult();
235  if(presult && (unsigned)(presult->output.size)>sizeof(int))
236  decodeProcessResult(presult,false);
237  if(presult) free(presult->output.data);
238  delete presult;
239  } while(presult);
240  }
241  else {
242  // Pull one result and decode it if necessary
243  presult = m_processGroup->pullOneResult();
244  int res(0);
245  if(presult && (unsigned)(presult->output.size)>sizeof(int))
246  res = decodeProcessResult(presult,true);
247  if(presult) free(presult->output.data);
248  delete presult;
249  if(res) return StatusCode::FAILURE;
250  }
251  return sc;
252 }
253 
255 {
256  ATH_MSG_INFO("Statuses of event processors");
257  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
258  for(size_t i=0; i<statuses.size(); ++i) {
259  // Get the number of events processed by this worker
260  auto it = m_eventStat.find(statuses[i].pid);
261  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
262  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
263  << ". Number of events processed: ";
264  if(it==m_eventStat.end())
265  msg(MSG::INFO) << "N/A" << endmsg;
266  else
267  msg(MSG::INFO) << it->second.first
268  << ", Event Loop Time: " << it->second.second << "sec."
269  << endmsg;
270  }
271 }
272 
273 void SharedEvtQueueConsumer::subProcessLogs(std::vector<std::string>& filenames)
274 {
275  filenames.clear();
276  for(int i=0; i<m_nprocs; ++i) {
277  std::ostringstream workerIndex;
278  workerIndex << i;
279  std::filesystem::path worker_rundir(m_subprocTopDir);
280  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
281  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
282  }
283 }
284 
285 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::bootstrap_func()
286 {
287  if(m_debug) waitForSignal();
288 
289  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
290  outwork->data = malloc(sizeof(int));
291  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
292  outwork->size = sizeof(int);
293 
294  // For PileUp Digi Fork-After-N-Events >>>>
295  // Retrieve cuEvent-s for all background event selectors, if we forked after N events
296  std::map<IService*,int> bkgEvtSelectors;
297 
298  if(m_isPileup) {
299  for(IService* ptrSvc : serviceLocator()->getServices()) {
300  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
301  if(evtsel && (evtsel != m_evtSelector)) {
302  if(m_nEventsBeforeFork>0) {
303  IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
304  if(evtselseek) {
305  bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
306  }
307  else {
308  ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
309  return outwork;
310  }
311  }
312  else {
313  bkgEvtSelectors.emplace(ptrSvc,0);
314  }
315  }
316  }
317  }
318  // <<<< For PileUp Digi Fork-After-N-Events
319 
320  // ...
321  // (possible) TODO: extend outwork with some error message, which will be eventually
322  // reported in the master proces
323  // ...
324 
325  // ________________________ Get IncidentSvc and fire PostFork ________________________
326  IIncidentSvc* p_incidentSvc(0);
327  if(!serviceLocator()->service("IncidentSvc", p_incidentSvc).isSuccess()) {
328  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
329  return outwork;
330  }
331  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
332 
333 
334  // ________________________ Get RankID ________________________
335  //
337  ATH_MSG_ERROR("Unable to get rank ID!");
338  return outwork;
339  }
340  std::ostringstream workindex;
341  workindex<<m_rankId;
342 
343  // ________________________ Worker dir: mkdir ________________________
344  std::filesystem::path worker_rundir(m_subprocTopDir);
345  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
346  // TODO: this "worker_" can be made configurable too
347 
348  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
350  return outwork;
351  }
352 
353  // __________ Redirect logs unless we want to attach debugger ____________
354  if(!m_debug) {
355  if(redirectLog(worker_rundir.string()))
356  return outwork;
357 
358  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
359  }
360 
361  // ________________________ Update Io Registry ____________________________
362  if(updateIoReg(worker_rundir.string()))
363  return outwork;
364 
365  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
366 
367  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
368  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
369  if(std::filesystem::is_regular_file("SimParams.db"))
370  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
371  if(std::filesystem::is_regular_file("DigitParams.db"))
372  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
373  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
374  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
375 
376  // _______________________ Handle saved PFC (if any) ______________________
377  if(handleSavedPfc(abs_worker_rundir))
378  return outwork;
379 
380  // ________________________ reopen descriptors ____________________________
381  if(reopenFds())
382  return outwork;
383 
384  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
385 
386 
387  // ________________________ Make Shared Reader/Writer Client ________________________
389  if(!m_evtShare->makeClient(m_rankId).isSuccess()) {
390  ATH_MSG_ERROR("Failed to make the event selector a share client");
391  return outwork;
392  }
393  else {
394  ATH_MSG_DEBUG("Successfully made the event selector a share client");
395  }
396  }
397 
399  IProperty* propertyServer = dynamic_cast<IProperty*>(m_dataShare);
400  if (propertyServer==0 || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
401  ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property");
402  return outwork;
403  }
404  else {
405  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
406  }
407  }
408 
409  // ________________________ I/O reinit ________________________
410  if(!m_ioMgr->io_reinitialize().isSuccess()) {
411  ATH_MSG_ERROR("Failed to reinitialize I/O");
412  return outwork;
413  }
414  else {
415  ATH_MSG_DEBUG("Successfully reinitialized I/O");
416  }
417 
418  // _______________ Get the value of SkipEvent ________________________
419  if(m_evtSelector) {
420  IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector);
421  if(!propertyServer) {
422  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
423  return outwork;
424  }
425  else {
426  std::string propertyName("SkipEvents");
427  IntegerProperty skipEventsProp(propertyName,m_nSkipEvents);
428  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
430  }
431  else {
432  m_nSkipEvents = skipEventsProp.value();
433  }
434  }
435 
436 
437  // ________________________ Event selector restart ________________________
438  IService* evtSelSvc = dynamic_cast<IService*>(m_evtSelector);
439  if(!evtSelSvc) {
440  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
441  return outwork;
442  }
443  if(!evtSelSvc->start().isSuccess()) {
444  ATH_MSG_ERROR("Failed to restart the event selector");
445  return outwork;
446  }
447  else {
448  ATH_MSG_DEBUG("Successfully restarted the event selector");
449  }
450  }
451  // For PileUp jobs >>>>
452  // Main event selector: advance it if we either forked after N events, or skipEvents!=0
453  // Background event selectors: restart, and advance if we forked after N events
454  if(m_isPileup) {
455  // Deal with the main event selector first
456  if(serviceLocator()->service(m_evtSelName,m_evtSelSeek).isFailure() || !m_evtSelSeek) {
457  ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
458  return outwork;
459  }
460 
463  ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
464  return outwork;
465  }
466 
467  // Deal with background event selectors
468  for(auto [evtsel,curEvt] : bkgEvtSelectors) {
469  if(evtsel->start().isSuccess()) {
470  if (m_nEventsBeforeFork>0) {
471  IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
472  if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
473  ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
474  return outwork;
475  }
476  }
477  }
478  else {
479  ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
480  return outwork;
481  }
482  }
483  }
484  // <<<< For PileUp jobs
485 
486  // _______________________ Event orders for debugging ________________________________
487  if(m_readEventOrders) {
488  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::in);
489  if(fs.good()) {
490  ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
491  while(fs.good()){
492  std::string line;
493  std::getline(fs,line);
494  if(line.empty())continue;
495 
496  // Parse the string
497  size_t idx(0);
498  int rank = std::stoi(line,&idx);
499  if(rank==m_rankId) {
500  msg(MSG::INFO) << "This worker will proces the following events #";
501  while(idx<line.size()-1) {
502  line = line.substr(idx+1);
503  int evtnum = std::stoi(line,&idx);
504  m_eventOrders.push_back(evtnum);
505  msg(MSG::INFO) << " " << evtnum;
506  }
507  msg(MSG::INFO) << endmsg;
508  }
509  }
510  if(m_eventOrders.empty()) {
511  ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
512  return outwork;
513  }
514  fs.close();
515  }
516  else {
517  ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
518  return outwork;
519  }
520  }
521 
522  // ________________________ Worker dir: chdir ________________________
523  if(chdir(worker_rundir.string().c_str())==-1) {
524  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
525  return outwork;
526  }
527 
528  // ___________________ Fire UpdateAfterFork incident _________________
529  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
530 
531  // Declare success and return
532  *(int*)(outwork->data) = 0;
533  return outwork;
534 }
535 
536 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::exec_func()
537 {
538  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
539 
540  bool all_ok(true);
541 
542  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
543  int nEvt(m_nEventsBeforeFork);
544  int nEventsProcessed(0);
545  long evtnumAndChunk(0);
546 
547  unsigned evtCounter(0);
548  int evtnum(0), chunkSize(1);
549  auto predefinedEvt = m_eventOrders.cbegin();
550 
551  // If the event orders file already exists in worker's run directory, then it's an unexpected error!
553  if(std::filesystem::exists(ordersFile)) {
554  ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
555  all_ok = false;
556  }
557 
558  // For the round robin we need to know the maximum number of events for this job
559  if(m_isRoundRobin) {
560  evtnumAndChunk = 1;
561  while(evtnumAndChunk>0) {
562  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
563  usleep(1000);
564  }
565  }
566  evtnumAndChunk *= -1;
567  }
568 
569  System::ProcessTime time_start = System::getProcessTime();
570  if(all_ok) {
571  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::out);
572  fs << m_rankId;
573  bool firstOrder(true);
574  while(true) {
575  if(m_isRoundRobin) {
576  evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
577  if(evtnum>=evtnumAndChunk+m_nSkipEvents) {
578  break;
579  }
580  evtCounter++;
581  }
582  else {
583  if(m_readEventOrders) {
584  if(predefinedEvt==m_eventOrders.cend()) break;
585  evtnum = *predefinedEvt;
586  predefinedEvt++;
587  fs << (firstOrder?":":",") << evtnum;
588  fs.flush();
589  firstOrder=false;
590  ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
591  }
592  else {
593  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
594  // The event queue is empty, but we should check whether there are more events to come or not
595  ATH_MSG_DEBUG("Event queue is empty");
596  usleep(1000);
597  continue;
598  }
599  if(evtnumAndChunk<=0) {
600  evtnumAndChunk *= -1;
601  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
602  break;
603  }
604  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
605  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
606  evtnum = evtnumAndChunk & intmask;
607  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
608 
609  // Save event order
610  for(int i(0);i<chunkSize;++i) {
611  fs << (firstOrder?":":",") << evtnum+i;
612  firstOrder=false;
613  }
614  fs.flush();
615  } // Get event numbers from the shared queue
616  } // Not RoundRobin
617  nEvt+=chunkSize;
618  StatusCode sc;
619  if(m_useSharedReader) {
620  sc = m_evtShare->share(evtnum);
621  if(sc.isFailure()){
622  ATH_MSG_ERROR("Unable to share " << evtnum);
623  all_ok=false;
624  break;
625  }
626  else {
627  ATH_MSG_INFO("Share of " << evtnum << " succeeded");
628  }
629  }
630  else if(m_evtSelector) {
631  m_chronoStatSvc->chronoStart("AthenaMP_seek");
632  if (m_evtSeek) {
633  sc=m_evtSeek->seek(evtnum);
634  }
635  else {
636  sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
637  }
638  if(sc.isFailure()){
639  ATH_MSG_ERROR("Unable to seek to " << evtnum);
640  all_ok=false;
641  break;
642  }
643  else {
644  ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
645  }
646  m_chronoStatSvc->chronoStop("AthenaMP_seek");
647  }
648  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
649  sc = m_evtProcessor->nextEvent(nEvt);
650  nEventsProcessed += chunkSize;
651  if(sc.isFailure()){
652  if(chunkSize==1) {
653  ATH_MSG_ERROR("Unable to process event " << evtnum);
654  }
655  else {
656  ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
657  }
658  all_ok=false;
659  break;
660  }
661  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
662  }
663  fs.close();
664  }
665  System::ProcessTime time_delta = System::getProcessTime() - time_start;
666  TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
667 
668  if(all_ok) {
669  if(m_evtProcessor->executeRun(0).isFailure()) {
670  ATH_MSG_ERROR("Could not finalize the Run");
671  all_ok=false;
672  }
673  else if(!m_useSharedReader && m_evtSelector) {
674  StatusCode sc;
675  if (m_evtSeek) {
676  sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
677  }
678  else {
679  sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
680  }
681  if(sc.isFailure()) {
682  ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
683  }
684  }
685  }
686 
687  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
688 
689  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
690  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
691  void* outdata = malloc(outsize);
692  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
694  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
695  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
696  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
697  outwork->data = outdata;
698  outwork->size = outsize;
699  // ...
700  // (possible) TODO: extend outwork with some error message, which will be eventually
701  // reported in the master proces
702  // ...
703  return outwork;
704 }
705 
706 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::fin_func()
707 {
708  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
709 
710  bool all_ok(true);
711 
712  if(m_appMgr->stop().isFailure()) {
713  ATH_MSG_ERROR("Unable to stop AppMgr");
714  all_ok=false;
715  }
716  else {
717  if(m_appMgr->finalize().isFailure()) {
718  std::cerr << "Unable to finalize AppMgr" << std::endl;
719  all_ok=false;
720  }
721  }
722 
723  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
724 
725  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
726  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
727  void* outdata = malloc(outsize);
728  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
730  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
731  int nEvt = -1;
732  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
733  TimeValType elapsed = -1;
734  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
735 
736  outwork->data = outdata;
737  outwork->size = outsize;
738 
739  return outwork;
740 }
741 
742 int SharedEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
743 {
744  if(!presult) return 0;
746  ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
747  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType)) return 0;
748 
750  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
751  if(func==AthenaMPToolBase::FUNC_EXEC) {
752  // Store the number of processed events
753  int nevt(0);
754  TimeValType elapsed(0);
755  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
756  memcpy(&elapsed,(char*)output.data+2*+sizeof(int)+sizeof(func),sizeof(TimeValType));
757  m_eventStat[presult->pid]=std::pair<int,TimeValType>(nevt,elapsed);
758  ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events in " << elapsed << "sec.");
759 
760  if(doFinalize) {
761  // Add PID to the finalization queue
762  m_finQueue.push(presult->pid);
763  ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
764 
765  // If this is the only element in the queue then start its finalization
766  // Otherwise it has to wait its turn until all previous processes have been finalized
767  if(m_finQueue.size()==1) {
768  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
769  || m_processGroup->map_async(0,0,presult->pid)) {
770  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
771  return 1;
772  }
773  else {
774  ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
775  }
776  }
777  }
778  }
779  else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
780  ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
781  pid_t pid = m_finQueue.front();
782  if(pid==presult->pid) {
783  // pid received as expected. Remove it from the queue
784  m_finQueue.pop();
785  ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
786  // Schedule finalization of the next processe in the queue
787  if(m_finQueue.size()) {
788  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
789  || m_processGroup->map_async(0,0,m_finQueue.front())) {
790  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
791  return 1;
792  }
793  else {
794  ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
795  }
796  }
797  }
798  else {
799  // Error: unexpected pid received from presult
800  ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
801  return 1;
802  }
803  }
804 
805  return 0;
806 }
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:61
IEventShare::makeClient
virtual StatusCode makeClient(int num)=0
Make this a client.
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:432
SharedEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedEvtQueueConsumer.h:78
checkFileSG.line
line
Definition: checkFileSG.py:75
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
SharedEvtQueueConsumer::m_evtShare
IEventShare * m_evtShare
Definition: SharedEvtQueueConsumer.h:70
AthenaMPToolBase::m_evtSelector
IEvtSelector * m_evtSelector
Definition: AthenaMPToolBase.h:94
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
IEventSeek.h
Abstract interface for seeking within an event stream.
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
SharedEvtQueueConsumer::m_evtSeek
IEventSeek * m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
SharedEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedEvtQueueConsumer.cxx:73
SharedEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueConsumer.cxx:706
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
conifer::pow
constexpr int pow(int x)
Definition: conifer.h:20
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:68
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:32
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:362
skel.it
it
Definition: skel.GENtoEVGEN.py:423
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:86
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
ProcessGroup.h
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
IEventSeek::seek
virtual StatusCode seek(int evtnum)=0
Seek to a given event number.
AthExHiveOpts.chunkSize
chunkSize
Definition: AthExHiveOpts.py:101
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:64
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:28
IEventShare::share
virtual StatusCode share(int evtnum)=0
Request to share a given event.
SharedEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueConsumer.cxx:273
IEvtSelectorSeek::seek
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
IEvtSelectorSeek::curEvent
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:99
lumiFormat.i
int i
Definition: lumiFormat.py:92
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:65
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
Incidents.h
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:14
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:83
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SharedEvtQueueConsumer::m_isRoundRobin
bool m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:59
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:129
SharedEvtQueueConsumer::m_useSharedWriter
bool m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:58
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:341
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
SharedEvtQueueConsumer::m_useSharedReader
bool m_useSharedReader
Definition: SharedEvtQueueConsumer.h:57
SharedEvtQueueConsumer::m_readEventOrders
bool m_readEventOrders
Definition: SharedEvtQueueConsumer.h:81
SharedEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedEvtQueueConsumer.cxx:121
SharedEvtQueueConsumer.h
merge.output
output
Definition: merge.py:17
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
SharedEvtQueueConsumer::m_evtSelSeek
IEvtSelectorSeek * m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
SharedEvtQueueConsumer::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:278
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:84
SharedEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueConsumer.cxx:285
grepfile.filenames
list filenames
Definition: grepfile.py:34
SharedEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueConsumer.cxx:536
SharedEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedEvtQueueConsumer.cxx:254
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthCommonMsg< AlgTool >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
Herwig7_QED_EvtGen_ll.fs
dictionary fs
Definition: Herwig7_QED_EvtGen_ll.py:17
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
IEventSeek
Abstract interface for seeking within an event stream.
Definition: IEventSeek.h:27
SharedEvtQueueConsumer::~SharedEvtQueueConsumer
virtual ~SharedEvtQueueConsumer() override
Definition: SharedEvtQueueConsumer.cxx:69
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedEvtQueueConsumer::SharedEvtQueueConsumer
SharedEvtQueueConsumer()
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:425
SharedEvtQueueConsumer::m_dataShare
IDataShare * m_dataShare
Definition: SharedEvtQueueConsumer.h:71
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
SharedEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:369
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
SharedEvtQueueConsumer::m_debug
bool m_debug
Definition: SharedEvtQueueConsumer.h:62
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:90
SharedEvtQueueConsumer::m_eventOrdersFile
std::string m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:82