ATLAS Offline Software
EvtRangeProcessor.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "EvtRangeProcessor.h"
6 #include "copy_file_icc_hack.h"
8 
10 #include "GaudiKernel/IEvtSelector.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/IFileMgr.h"
13 #include "GaudiKernel/IChronoStatSvc.h"
14 #include "GaudiKernel/ISvcLocator.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/FileIncident.h"
17 #include "GaudiKernel/Timing.h"
18 
19 #include <sys/stat.h>
20 #include <sstream>
21 #include <fstream>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <stdexcept>
26 #include <queue>
27 #include <signal.h>
28 #include <filesystem>
29 
30 #include "yampl/SocketFactory.h"
31 
32 
34  , const std::string& name
35  , const IInterface* parent)
37  , m_rankId(-1)
38  , m_nEventsBeforeFork(0)
39  , m_activeWorkers(0)
40  , m_inpFile("")
41  , m_chronoStatSvc("ChronoStatSvc", name)
42  , m_incidentSvc("IncidentSvc", name)
43  , m_evtSeek(nullptr)
44  , m_channel2Scatterer("")
45  , m_channel2EvtSel("")
46  , m_sharedRankQueue(0)
47  , m_sharedFailedPidQueue(0)
48  , m_debug(false)
49 {
50  declareInterface<IAthenaMPTool>(this);
51 
52  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
53  declareProperty("Channel2Scatterer", m_channel2Scatterer);
54  declareProperty("Channel2EvtSel", m_channel2EvtSel);
55  declareProperty("Debug", m_debug);
56 
57  m_subprocDirPrefix = "worker_";
58 }
59 
61 {
62 }
63 
65 {
66  ATH_MSG_DEBUG("In initialize");
67 
69  m_evtSeek = serviceLocator()->service(m_evtSelName);
70  ATH_CHECK(m_evtSeek.isValid());
71  ATH_CHECK(m_chronoStatSvc.retrieve());
72  ATH_CHECK(m_incidentSvc.retrieve());
73 
74  return StatusCode::SUCCESS;
75 }
76 
78 {
79  delete m_sharedRankQueue;
80  return StatusCode::SUCCESS;
81 }
82 
83 int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
84 {
85  ATH_MSG_DEBUG("In makePool " << getpid());
86 
87  if(nprocs==0 || nprocs<-1) {
88  ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
89  return -1;
90  }
91 
92  if(topdir.empty()) {
93  ATH_MSG_ERROR("Empty name for the top directory!");
94  return -1;
95  }
96 
97  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
99  m_subprocTopDir = topdir;
100 
101  // Create rank queue and fill it
102  std::ostringstream rankQueueName;
103  rankQueueName << "EvtRangeProcessor_RankQueue_" << getpid() << "_" << m_randStr;
104  m_sharedRankQueue = new AthenaInterprocess::SharedQueue(rankQueueName.str(),m_nprocs,sizeof(int));
105  for(int i=0; i<m_nprocs; ++i)
106  if(!m_sharedRankQueue->send_basic<int>(i)) {
107  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
108  return -1;
109  }
110 
111  // Create the process group and map_async bootstrap
113  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
114  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP)) {
115  return -1;
116  }
117  ATH_MSG_INFO("Workers bootstraped");
118 
119  // Populate the m_procStates map
121  m_procStates[process.getProcessID()] = PROC_STATE_INIT;
122  }
123 
124  return m_nprocs;
125 }
126 
128 {
129  ATH_MSG_DEBUG("In exec " << getpid());
130 
131  // Do nothing here. The exec will be mapped on workers one at a time ...
132 
133  return StatusCode::SUCCESS;
134 }
135 
136 StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
137 {
138  // This method performs two tasks:
139  // 1. Checks if any of the workers has changed its state, and if so performs appropriate actions
140  // 2. Tries to pull one result from the workers results queue, and if there is one, then decodes it
141 
142  // First make sure we have a valid pointer to the Failed PID Queue
143  if(m_sharedFailedPidQueue==0) {
144  if(detStore()->retrieve(m_sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
145  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
146  return StatusCode::FAILURE;
147  }
148  }
149 
150  // ____________________ Step 1: check for state changes in the workers ______________________________
151  StatusCode sc = AthenaMPToolBase::wait_once(pid);
152  if(pid>0) {
153  // One of the workers finished. We need to figure out whether or not it finished abnormally
154 
155  auto itProcState = m_procStates.find(pid);
156  if(itProcState==m_procStates.end()) {
157  // Untracked subprocess?? Something's wrong. Exit
158  // To Do: how to report this error to the pilot?
159  sc.ignore();
160  ATH_MSG_ERROR("Detected untracked process ID=" << pid);
161  return StatusCode::FAILURE;
162  }
163 
164  // Deal with failed workers
165  if(sc.isFailure()) {
166 
167  switch(itProcState->second) {
168  case PROC_STATE_INIT:
169  // If the failed process was in INIT state, exit immediately
170  ATH_MSG_ERROR("Worker with process ID=" << pid << " failed at initialization!");
171  return StatusCode::FAILURE;
172  case PROC_STATE_EXEC:
173  // If the failed process was in EXEC state, report pid to EvtRangeScatterer and attempt to start new worker
174 
175  // Report pid to Event Range Scatterer
177  // To Do: how to report this error to the pilot?
178  ATH_MSG_ERROR("Failed to report the crashed pid to the Event Range Scatterer");
179  return StatusCode::FAILURE;
180  }
181 
182  // Start new worker
183  if(startProcess().isSuccess()) {
184  ATH_MSG_INFO("Successfully started new process");
185  pid=0;
186  }
187  else {
188  // To Do: how to report this error to the pilot?
189  ATH_MSG_ERROR("Failed to start new process");
190  return StatusCode::FAILURE;
191  }
192  break;
193  case PROC_STATE_FIN:
194  // If the failed process was in FIN state, remove pid from the finQueue and schedule finalization of the next worker
195  m_finQueue.pop_front();
196 
197  if(m_finQueue.size()) {
198  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
199  // To Do: how to report this error to the pilot?
200  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
201  return StatusCode::FAILURE;
202  }
203  else {
204  ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
205  }
206  }
207  break;
208  case PROC_STATE_STOP:
209  break;
210  default:
211  ATH_MSG_ERROR("Detected unexpected state " << itProcState->second << " of failed worker with PID=" << pid);
212  return StatusCode::FAILURE;
213  }
214  }
215  else {
216  // The worker finished successfully and it was the last worker. Release the Event Range Scatterer
217  if(--m_activeWorkers==0
219  // To Do: how to report this error to the pilot?
220  ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
221  return StatusCode::FAILURE;
222  }
223  }
224 
225  // Erase the pid from m_procStates map
226  m_procStates.erase(itProcState);
227  }
228  else {
229  sc.ignore();
230  if(pid<0) {
231  // Here we failed to wait on the group. Exit immediately
232  // To Do: how to report this error to the pilot?
233  ATH_MSG_ERROR("Failed to wait on the process group!");
234  return StatusCode::FAILURE;
235  }
236  }
237  // ____________________ ______________________________________________ ______________________________
238 
239 
240  // ____________________ Step 2: decode worker result (if any) ______________________________
242  if(presult) {
243  int res{0};
244  if((unsigned)(presult->output.size)>=sizeof(int)) {
245  // Decode result
247 
248  // First extract pid from the ProcessResult and check its validity
249  pid_t childPid = presult->pid;
250  auto itChildState = m_procStates.find(childPid);
251  if(itChildState==m_procStates.end()) {
252  ATH_MSG_ERROR("Unable to find PID=" << childPid << " in the Proc States map!");
253  return StatusCode::FAILURE;
254  }
255 
256  ATH_MSG_DEBUG("Decoding the output of PID=" << childPid << " with the size=" << output.size);
257 
258  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) {
259  // We are dealing with the bootstrap function.
260  // Schedule exec_func()
261  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC,childPid)) {
262  ATH_MSG_ERROR("Problem scheduling execution on PID=" << childPid);
263  return StatusCode::FAILURE;
264  }
265 
266  // Update process state in the m_procStates map
267  itChildState->second=PROC_STATE_EXEC;
268  }
269 
271  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
272 
273  if(func==AthenaMPToolBase::FUNC_EXEC) {
274  // Store the number of processed events
275  int nevt(0);
276  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
277  m_nProcessedEvents[childPid]=nevt;
278  ATH_MSG_DEBUG("PID=" << childPid << " processed " << nevt << " events");
279 
280  // Add PID to the finalization queue
281  m_finQueue.push_back(childPid);
282  ATH_MSG_DEBUG("Added PID=" << childPid << " to the finalization queue");
283 
284  // If this is the only element in the queue then start its finalization
285  // Otherwise it has to wait its turn until all previous processes have been finalized
286  if(m_finQueue.size()==1) {
287  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,childPid)) {
288  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << childPid);
289  return StatusCode::FAILURE;
290  }
291  else {
292  ATH_MSG_INFO("Scheduled finalization of PID=" << childPid);
293  }
294  }
295 
296  // Update process state in the m_procStates map
297  itChildState->second=PROC_STATE_FIN;
298  }
299  else if(func==AthenaMPToolBase::FUNC_FIN) {
300  ATH_MSG_DEBUG("Finished finalization of PID=" << childPid);
301  pid_t pidFront = m_finQueue.front();
302  if(pidFront==childPid) {
303  // pid received as expected
304 
305  // Set the process free
306  if(m_processGroup->map_async(0,0,pidFront)) {
307  ATH_MSG_ERROR("Failed to set the process PID=" << pidFront << " free");
308  return StatusCode::FAILURE;
309  }
310 
311  // Remove it from the queue
312  m_finQueue.pop_front();
313  ATH_MSG_DEBUG("PID=" << childPid << " removed from the queue");
314  // Schedule finalization of the next process in the queue
315  if(m_finQueue.size()) {
316  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
317  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
318  return StatusCode::FAILURE;
319  }
320  else {
321  ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
322  }
323  }
324  }
325  else {
326  // Error: unexpected pid received from presult
327  ATH_MSG_ERROR("Finalized PID=" << childPid << " while PID=" << pid << " was expected");
328  return StatusCode::FAILURE;
329  }
330 
331  // Update process state in the m_procStates map
332  itChildState->second=PROC_STATE_STOP;
333  }
334  }
335  free(presult->output.data);
336  delete presult;
337  if(res) return StatusCode::FAILURE;
338  }
339  // ____________________ ______________________________________________ ______________________________
340 
341  return StatusCode::SUCCESS;
342 }
343 
345 {
346  ATH_MSG_INFO("Statuses of event processors");
347  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
348  for(size_t i=0; i<statuses.size(); ++i) {
349  // Get the number of events processed by this worker
350  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
351  std::ostringstream ostr;
352  if(it==m_nProcessedEvents.end())
353  ostr << "N/A";
354  else
355  ostr << it->second;
356  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid
357  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
358  << ". Number of events processed: " << ostr.str());
359  }
360 }
361 
362 void EvtRangeProcessor::subProcessLogs(std::vector<std::string>& filenames)
363 {
364  filenames.clear();
365  for(int i=0; i<m_nprocs; ++i) {
366  std::ostringstream workerIndex;
367  workerIndex << i;
368  std::filesystem::path worker_rundir(m_subprocTopDir);
369  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
370  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
371  }
372 }
373 
375 {
377  return jobOutputs;
378 }
379 
380 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::bootstrap_func()
381 {
382  if(m_debug) waitForSignal();
383 
384  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
385  outwork->data = malloc(sizeof(int));
386  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
387  outwork->size = sizeof(int);
388  // ...
389  // (possible) TODO: extend outwork with some error message, which will be eventually
390  // reported in the master proces
391  // ...
392 
393  // ________________________ Get RankID ________________________
394  //
396  ATH_MSG_ERROR("Unable to get rank ID!");
397  return outwork;
398  }
399  std::ostringstream workindex;
400  workindex<<m_rankId;
401 
402  // ________________________ Worker dir: mkdir ________________________
403  std::filesystem::path worker_rundir(m_subprocTopDir);
404  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
405  // TODO: this "worker_" can be made configurable too
406 
407  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
408  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
409  return outwork;
410  }
411 
412  // ________________________ Redirect logs ________________________
413  if(!m_debug) {
414  if(redirectLog(worker_rundir.string()))
415  return outwork;
416 
417  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
418  }
419 
420  // ________________________ Update Io Registry ____________________________
421  if(updateIoReg(worker_rundir.string()))
422  return outwork;
423 
424  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
425 
426  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
427  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
428  if(std::filesystem::is_regular_file("SimParams.db"))
429  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
430  if(std::filesystem::is_regular_file("DigitParams.db"))
431  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
432  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
433  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
434 
435  // _______________________ Handle saved PFC (if any) ______________________
436  if(handleSavedPfc(abs_worker_rundir))
437  return outwork;
438 
439  // ________________________ reopen descriptors ____________________________
440  if(reopenFds())
441  return outwork;
442 
443  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
444 
445 
446  // ________________________ I/O reinit ________________________
447  if(!m_ioMgr->io_reinitialize().isSuccess()) {
448  ATH_MSG_ERROR("Failed to reinitialize I/O");
449  return outwork;
450  } else {
451  ATH_MSG_DEBUG("Successfully reinitialized I/O");
452  }
453 
454  // ________________________ Event selector restart ________________________
455  IService* evtSelSvc = dynamic_cast<IService*>(evtSelector());
456  if(!evtSelSvc) {
457  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
458  return outwork;
459  }
460  if(!evtSelSvc->start().isSuccess()) {
461  ATH_MSG_ERROR("Failed to restart the event selector");
462  return outwork;
463  } else {
464  ATH_MSG_DEBUG("Successfully restarted the event selector");
465  }
466 
467  // ________________________ Restart background event selectors in pileup jobs ________________________
468  if(m_isPileup) {
469  const std::list<IService*>& service_list = serviceLocator()->getServices();
470  std::list<IService*>::const_iterator itSvc = service_list.begin(),
471  itSvcLast = service_list.end();
472  for(;itSvc!=itSvcLast;++itSvc) {
473  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc);
474  if(evtsel && (evtsel != evtSelector())) {
475  if((*itSvc)->start().isSuccess())
476  ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name());
477  else {
478  ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name());
479  return outwork;
480  }
481  }
482  }
483  }
484 
485  // ________________________ Worker dir: chdir ________________________
486  if(chdir(worker_rundir.string().c_str())==-1) {
487  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
488  return outwork;
489  }
490 
491  // Declare success and return
492  *(int*)(outwork->data) = 0;
493  return outwork;
494 }
495 
496 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::exec_func()
497 {
498  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
499 
500  int nEvt(1);
501  int nEventsProcessed(0);
502 
503  std::queue<std::string> queueTokens;
504 
505  // Get the yampl connection channels
506  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
507  std::string socket2ScattererName = m_channel2Scatterer.value() + std::string("_") + m_randStr;
508  yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
509  ATH_MSG_INFO("Created CLIENT socket to the Scatterer: " << socket2ScattererName);
510  std::ostringstream pidstr;
511  pidstr << getpid();
512 
513  // Construct a "welcome" message to be sent to the EvtRangeScatterer
514  std::string ping = pidstr.str() + std::string(" ready for event processing");
515 
516  while(true) {
517  void* message2scatterer = malloc(ping.size());
518  memcpy(message2scatterer,ping.data(),ping.size());
519  socket2Scatterer->send(message2scatterer,ping.size());
520  ATH_MSG_INFO("Sent a welcome message to the Scatterer");
521 
522  // Get the response - list of tokens - from the scatterer.
523  // The format of the response: | ResponseSize | RangeID, | evtEvtRange[,evtToken] |
524  char *responseBuffer(0);
525  std::string strPeerId;
526  ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
527  // If response size is 0 then break the loop
528  if(responseSize==1) {
529  ATH_MSG_INFO("Empty range received. Terminating the loop");
530  break;
531  }
532 
533  std::string responseStr(responseBuffer,responseSize);
534  ATH_MSG_INFO("Received response from the Scatterer : " << responseStr);
535 
536  // Start timing
537  System::ProcessTime time_start = System::getProcessTime();
538 
539  size_t startpos(0);
540  size_t endpos = responseStr.find(',');
541  while(endpos!=std::string::npos) {
542  queueTokens.push(responseStr.substr(startpos,endpos-startpos));
543  startpos = endpos+1;
544  endpos = responseStr.find(',',startpos);
545  }
546  queueTokens.push(responseStr.substr(startpos));
547  // Actually the first element in the tokens queue is the RangeID. Get it
548  std::string rangeID = queueTokens.front();
549  queueTokens.pop();
550  ATH_MSG_INFO("Received RangeID=" << rangeID);
551  // Fire an incident
552  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
553 
554  // Here we need to support two formats of the responseStr
555  // Format 1. RangeID,startEvent,endEvent
556  // Format 2. RangeID,fileName,startEvent,endEvent
557  //
558  // The difference between these two is that for Format 2 we first
559  // need to update InputCollections property on the Event Selector
560  // and only after that proceed with seeking
561  //
562  // The seeking part is identical for Format 1 and 2
563 
565 
566  // Determine the format
567  std::string filename("");
568  if(queueTokens.front().find("PFN:")==0) {
569  // We have Format 2
570  // Update InputCollections property of the Event Selector with the file name from Event Range
571  filename = queueTokens.front().substr(4);
572  if(setNewInputFile(filename).isFailure()) {
573  ATH_MSG_WARNING("Failed to set input file for the range: " << rangeID);
575  reportError(socket2Scatterer,rangeStatus);
576  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
577  continue;
578  }
579  queueTokens.pop();
580  }
581 
582  // Get the number of events to process
583  int startEvent = std::atoi(queueTokens.front().c_str());
584  queueTokens.pop();
585  int endEvent = std::atoi(queueTokens.front().c_str());
586  queueTokens.pop();
587  ATH_MSG_INFO("Range fields. File Name: " << (filename.empty()?"N/A":filename)
588  << ", First Event:" << startEvent
589  << ", Last Event:" << endEvent);
590 
591  // Process the events
592  IEvtSelector::Context* ctx = nullptr;
593  if (evtSelector()->createContext (ctx).isFailure()) {
594  ATH_MSG_WARNING("Failed to create IEventSelector context.");
596  }
597  else {
598  for(int i(startEvent-1); i<endEvent; ++i) {
599  StatusCode sc = m_evtSeek->seek(*ctx, i);
600  if(sc.isRecoverable()) {
601  ATH_MSG_WARNING("Event " << i << " from range: " << rangeID << " not in the input file");
603  break;
604  }
605  else if(sc.isFailure()) {
606  ATH_MSG_WARNING("Failed to seek to " << i << " in range: " << rangeID);
608  break;
609  }
610  ATH_MSG_INFO("Seek to " << i << " succeeded");
611  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
612  sc = m_evtProcessor->nextEvent(nEvt++);
613 
614  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
615  if(sc.isFailure()){
616  ATH_MSG_WARNING("Failed to process the event " << i << " in range:" << rangeID);
618  break;
619  }
620  else {
621  ATH_MSG_DEBUG("Event processed");
622  nEventsProcessed++;
623  }
624  }
625  }
626  if (evtSelector()->releaseContext (ctx).isFailure()) {
627  ATH_MSG_WARNING("Failed to release IEventSelector context.");
628  }
629 
630  // Fire dummy NextEventRange incident in order to cut the previous output and report it
631  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
632  if(rangeStatus!=AthenaMPToolBase::ESRANGE_SUCCESS) {
633  reportError(socket2Scatterer,rangeStatus);
634  continue;
635  }
636 
637  // Event range successfully processed
638  std::string strOutpFile;
639  // Get the full path of the event range output file
640  for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
641  if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
642  if(strOutpFile.empty()) {
643  strOutpFile = fdIt->path().string();
644  }
645  else {
646  strOutpFile += (std::string(",")+fdIt->path().string());
647  }
648  }
649  }
650 
651  // Stop timing
652  System::ProcessTime time_delta = System::getProcessTime() - time_start;
653 
654  // Prepare the output report
655  if(!strOutpFile.empty()) {
656  // We need to combine the output file name with
657  // 1. RangeID (requested by JEDI)
658  // 2. CPU time
659  // 3. Wall time
660  std::ostringstream outputReportStream;
661  outputReportStream << strOutpFile
662  << ",ID:" << rangeID
663  << ",CPU:" << time_delta.cpuTime<System::Sec>()
664  << ",WALL:" << time_delta.elapsedTime<System::Sec>();
665  std::string outputFileReport = outputReportStream.str();
666 
667  // Report the output
668  message2scatterer = malloc(outputFileReport.size());
669  memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
670  socket2Scatterer->send(message2scatterer,outputFileReport.size());
671  ATH_MSG_INFO("Reported the output " << outputFileReport);
672  }
673  else {
674  // This is an error: range successfully processed but no outputs were made
675  ATH_MSG_WARNING("Failed to make an output file for range: " << rangeID);
677  }
678  } // Main "event loop"
679 
680  if(m_evtProcessor->executeRun(0).isFailure()) {
681  ATH_MSG_WARNING("Could not finalize the Run");
682  }
683 
684  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
685 
686  // Return value: "ERRCODE|Func_Flag|NEvt"
687  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
688  void* outdata = malloc(outsize);
689  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
691  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
692  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
693 
694  outwork->data = outdata;
695  outwork->size = outsize;
696  // ...
697  // (possible) TODO: extend outwork with some error message, which will be eventually
698  // reported in the master proces
699  // ...
700 
701  delete socket2Scatterer;
702  delete socketFactory;
703 
704  return outwork;
705 }
706 
707 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::fin_func()
708 {
709  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
710 
711  if(m_appMgr->stop().isFailure()) {
712  ATH_MSG_WARNING("Unable to stop AppMgr");
713  }
714  else {
715  if(m_appMgr->finalize().isFailure()) {
716  std::cout << "Unable to finalize AppMgr" << std::endl;
717  }
718  }
719 
720  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
721 
722  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
723  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
724  void* outdata = malloc(outsize);
725  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
727  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
728  int nEvt = -1;
729  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
730 
731  outwork->data = outdata;
732  outwork->size = outsize;
733 
734  return outwork;
735 }
736 
737 StatusCode EvtRangeProcessor::startProcess()
738 {
739  m_nprocs++;
740 
741  // Create a rank for the new process
742  if(!m_sharedRankQueue->send_basic<int>(m_nprocs-1)) {
743  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
744  return StatusCode::FAILURE;
745  }
746 
747  pid_t pid = m_processGroup->launchProcess();
748  if(pid==0) {
749  ATH_MSG_ERROR("Unable to start new process");
750  return StatusCode::FAILURE;
751  }
752 
753  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP,pid)) {
754  ATH_MSG_ERROR("Unable to bootstrap new process");
755  return StatusCode::FAILURE;
756  }
757 
759  return StatusCode::SUCCESS;
760 }
761 
763 {
764  if(m_inpFile == newFile) return StatusCode::SUCCESS;
765 
766  // Get Property Server
767  IProperty* propertyServer = dynamic_cast<IProperty*>(evtSelector());
768  if(!propertyServer) {
769  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
770  return StatusCode::FAILURE;
771  }
772 
773  std::string propertyName("InputCollections");
774  if(m_inpFile.empty()) {
775  std::vector<std::string> vect;
776  StringArrayProperty inputFileList(propertyName, vect);
777  if(propertyServer->getProperty(&inputFileList).isFailure()) {
778  ATH_MSG_ERROR("Failed to get InputCollections property value of the Event Selector");
779  return StatusCode::FAILURE;
780  }
781  if(newFile==inputFileList.value()[0]) {
782  m_inpFile = newFile;
783  return StatusCode::SUCCESS;
784  }
785  }
786  std::vector<std::string> vect{newFile,};
787  StringArrayProperty newInputFileList(propertyName, vect);
788  if(propertyServer->setProperty(newInputFileList).isFailure()) {
789  ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector");
790  return StatusCode::FAILURE;
791  }
792  m_inpFile=newFile;
793  return StatusCode::SUCCESS;
794 }
795 
797 {
798  pid_t pid = getpid();
799  size_t messageSize = sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status);
800  void* message2scatterer = malloc(messageSize);
801  memcpy(message2scatterer,&pid,sizeof(pid_t));
802  memcpy((pid_t*)message2scatterer+1,&status,sizeof(AthenaMPToolBase::ESRange_Status));
803  socket->send(message2scatterer,messageSize);
804 }
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
EvtRangeProcessor::~EvtRangeProcessor
virtual ~EvtRangeProcessor() override
Definition: EvtRangeProcessor.cxx:60
EvtRangeProcessor::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: EvtRangeProcessor.cxx:707
EvtRangeProcessor::EvtRangeProcessor
EvtRangeProcessor()
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:428
EvtRangeProcessor::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: EvtRangeProcessor.cxx:380
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
EvtRangeProcessor::m_activeWorkers
int m_activeWorkers
Definition: EvtRangeProcessor.h:68
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:62
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
EvtRangeProcessor::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: EvtRangeProcessor.cxx:374
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
EvtRangeProcessor::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: EvtRangeProcessor.cxx:344
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
EvtRangeProcessor.h
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:56
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:68
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:358
skel.it
it
Definition: skel.GENtoEVGEN.py:396
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:86
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
athena.exitcode
int exitcode
Definition: athena.py:161
ProcessGroup.h
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:59
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
EvtRangeProcessor::m_inpFile
std::string m_inpFile
Definition: EvtRangeProcessor.h:69
EvtRangeProcessor::m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSeek
Definition: EvtRangeProcessor.h:73
EvtRangeProcessor::m_rankId
int m_rankId
Definition: EvtRangeProcessor.h:66
EvtRangeProcessor::PROC_STATE_EXEC
@ PROC_STATE_EXEC
Definition: EvtRangeProcessor.h:61
EvtRangeProcessor::m_finQueue
std::deque< pid_t > m_finQueue
Definition: EvtRangeProcessor.h:82
SUSY_SimplifiedModel_PostInclude.process
string process
Definition: SUSY_SimplifiedModel_PostInclude.py:42
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
EvtRangeProcessor::m_channel2EvtSel
StringProperty m_channel2EvtSel
Definition: EvtRangeProcessor.h:76
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
EvtRangeProcessor::m_procStates
std::map< pid_t, ProcessState > m_procStates
Definition: EvtRangeProcessor.h:83
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
EvtRangeProcessor::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: EvtRangeProcessor.h:81
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EvtRangeProcessor::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: EvtRangeProcessor.h:67
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:99
lumiFormat.i
int i
Definition: lumiFormat.py:85
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:65
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
EvtRangeProcessor::m_channel2Scatterer
StringProperty m_channel2Scatterer
Definition: EvtRangeProcessor.h:75
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:14
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:81
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:57
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
EvtRangeProcessor::m_debug
bool m_debug
Definition: EvtRangeProcessor.h:85
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:337
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
EvtRangeProcessor::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: EvtRangeProcessor.cxx:496
EvtRangeProcessor::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: EvtRangeProcessor.h:78
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
EvtRangeProcessor::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: EvtRangeProcessor.cxx:362
merge.output
output
Definition: merge.py:17
EvtRangeProcessor::reportError
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Definition: EvtRangeProcessor.cxx:796
EvtRangeProcessor::initialize
virtual StatusCode initialize() override
Definition: EvtRangeProcessor.cxx:64
EvtRangeProcessor::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: EvtRangeProcessor.h:71
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:60
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
EvtRangeProcessor::setNewInputFile
StatusCode setNewInputFile(const std::string &newFile)
Definition: EvtRangeProcessor.cxx:762
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:281
AthenaInterprocess::Process
Definition: Process.h:17
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
grepfile.filenames
list filenames
Definition: grepfile.py:34
EvtRangeProcessor::PROC_STATE_INIT
@ PROC_STATE_INIT
Definition: EvtRangeProcessor.h:60
EvtRangeProcessor::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EvtRangeProcessor.h:72
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
EvtRangeProcessor::m_sharedFailedPidQueue
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
Definition: EvtRangeProcessor.h:79
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
merge.status
status
Definition: merge.py:17
EvtRangeProcessor::finalize
virtual StatusCode finalize() override
Definition: EvtRangeProcessor.cxx:77
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
EvtRangeProcessor::PROC_STATE_FIN
@ PROC_STATE_FIN
Definition: EvtRangeProcessor.h:62
EvtRangeProcessor::PROC_STATE_STOP
@ PROC_STATE_STOP
Definition: EvtRangeProcessor.h:63
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:58
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:421
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:365
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:61
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:90