ATLAS Offline Software
EvtRangeProcessor.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "EvtRangeProcessor.h"
6 #include "copy_file_icc_hack.h"
8 
10 #include "CxxUtils/xmalloc.h"
11 #include "GaudiKernel/IEvtSelector.h"
12 #include "GaudiKernel/IIoComponentMgr.h"
13 #include "GaudiKernel/IFileMgr.h"
14 #include "GaudiKernel/IChronoStatSvc.h"
15 #include "GaudiKernel/ISvcLocator.h"
16 #include "GaudiKernel/IIncidentSvc.h"
17 #include "GaudiKernel/FileIncident.h"
18 #include "GaudiKernel/Timing.h"
19 
20 #include <sys/stat.h>
21 #include <sstream>
22 #include <fstream>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <stdexcept>
27 #include <queue>
28 #include <signal.h>
29 #include <filesystem>
30 
31 #include "yampl/SocketFactory.h"
32 
33 
35  , const std::string& name
36  , const IInterface* parent)
38  , m_chronoStatSvc("ChronoStatSvc", name)
39  , m_incidentSvc("IncidentSvc", name)
40 {
41  m_subprocDirPrefix = "worker_";
42 }
43 
45 {
46 }
47 
49 {
50  ATH_MSG_DEBUG("In initialize");
51 
53  m_evtSeek = serviceLocator()->service(m_evtSelName);
54  ATH_CHECK(m_evtSeek.isValid());
55  ATH_CHECK(m_chronoStatSvc.retrieve());
56  ATH_CHECK(m_incidentSvc.retrieve());
57 
58  return StatusCode::SUCCESS;
59 }
60 
61 int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
62 {
63  ATH_MSG_DEBUG("In makePool " << getpid());
64 
65  if(nprocs==0 || nprocs<-1) {
66  ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
67  return -1;
68  }
69 
70  if(topdir.empty()) {
71  ATH_MSG_ERROR("Empty name for the top directory!");
72  return -1;
73  }
74 
75  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
77  m_subprocTopDir = topdir;
78 
79  // Create rank queue and fill it
80  std::ostringstream rankQueueName;
81  rankQueueName << "EvtRangeProcessor_RankQueue_" << getpid() << "_" << m_randStr;
82  m_sharedRankQueue = std::make_unique<AthenaInterprocess::SharedQueue>(rankQueueName.str(),m_nprocs,sizeof(int));
83  for(int i=0; i<m_nprocs; ++i)
84  if(!m_sharedRankQueue->send_basic<int>(i)) {
85  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
86  return -1;
87  }
88 
89  // Create the process group and map_async bootstrap
91  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
92  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP)) {
93  return -1;
94  }
95  ATH_MSG_INFO("Workers bootstrapped");
96 
97  // Populate the m_procStates map
99  m_procStates[process.getProcessID()] = PROC_STATE_INIT;
100  }
101 
102  return m_nprocs;
103 }
104 
106 {
107  ATH_MSG_DEBUG("In exec " << getpid());
108 
109  // Do nothing here. The exec will be mapped on workers one at a time ...
110 
111  return StatusCode::SUCCESS;
112 }
113 
114 StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
115 {
116  // This method performs two tasks:
117  // 1. Checks if any of the workers has changed its state, and if so performs appropriate actions
118  // 2. Tries to pull one result from the workers results queue, and if there is one, then decodes it
119 
120  // First make sure we have a valid pointer to the Failed PID Queue
121  if(m_sharedFailedPidQueue==0) {
122  if(detStore()->retrieve(m_sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
123  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
124  return StatusCode::FAILURE;
125  }
126  }
127 
128  // ____________________ Step 1: check for state changes in the workers ______________________________
129  StatusCode sc = AthenaMPToolBase::wait_once(pid);
130  if(pid>0) {
131  // One of the workers finished. We need to figure out whether or not it finished abnormally
132 
133  auto itProcState = m_procStates.find(pid);
134  if(itProcState==m_procStates.end()) {
135  // Untracked subprocess?? Something's wrong. Exit
136  // To Do: how to report this error to the pilot?
137  sc.ignore();
138  ATH_MSG_ERROR("Detected untracked process ID=" << pid);
139  return StatusCode::FAILURE;
140  }
141 
142  // Deal with failed workers
143  if(sc.isFailure()) {
144 
145  switch(itProcState->second) {
146  case PROC_STATE_INIT:
147  // If the failed process was in INIT state, exit immediately
148  ATH_MSG_ERROR("Worker with process ID=" << pid << " failed at initialization!");
149  return StatusCode::FAILURE;
150  case PROC_STATE_EXEC:
151  // If the failed process was in EXEC state, report pid to EvtRangeScatterer and attempt to start new worker
152 
153  // Report pid to Event Range Scatterer
155  // To Do: how to report this error to the pilot?
156  ATH_MSG_ERROR("Failed to report the crashed pid to the Event Range Scatterer");
157  return StatusCode::FAILURE;
158  }
159 
160  // Start new worker
161  if(startProcess().isSuccess()) {
162  ATH_MSG_INFO("Successfully started new process");
163  pid=0;
164  }
165  else {
166  // To Do: how to report this error to the pilot?
167  ATH_MSG_ERROR("Failed to start new process");
168  return StatusCode::FAILURE;
169  }
170  break;
171  case PROC_STATE_FIN:
172  // If the failed process was in FIN state, remove pid from the finQueue and schedule finalization of the next worker
173  m_finQueue.pop_front();
174 
175  if(m_finQueue.size()) {
176  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
177  // To Do: how to report this error to the pilot?
178  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
179  return StatusCode::FAILURE;
180  }
181  else {
182  ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
183  }
184  }
185  break;
186  case PROC_STATE_STOP:
187  break;
188  default:
189  ATH_MSG_ERROR("Detected unexpected state " << itProcState->second << " of failed worker with PID=" << pid);
190  return StatusCode::FAILURE;
191  }
192  }
193  else {
194  // The worker finished successfully and it was the last worker. Release the Event Range Scatterer
195  if(--m_activeWorkers==0
197  // To Do: how to report this error to the pilot?
198  ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
199  return StatusCode::FAILURE;
200  }
201  }
202 
203  // Erase the pid from m_procStates map
204  m_procStates.erase(itProcState);
205  }
206  else {
207  sc.ignore();
208  if(pid<0) {
209  // Here we failed to wait on the group. Exit immediately
210  // To Do: how to report this error to the pilot?
211  ATH_MSG_ERROR("Failed to wait on the process group!");
212  return StatusCode::FAILURE;
213  }
214  }
215  // ____________________ ______________________________________________ ______________________________
216 
217 
218  // ____________________ Step 2: decode worker result (if any) ______________________________
220  if(presult) {
221  if((unsigned)(presult->output.size)>=sizeof(int)) {
222  // Decode result
224 
225  // First extract pid from the ProcessResult and check its validity
226  pid_t childPid = presult->pid;
227  auto itChildState = m_procStates.find(childPid);
228  if(itChildState==m_procStates.end()) {
229  ATH_MSG_ERROR("Unable to find PID=" << childPid << " in the Proc States map!");
230  free(presult->output.data);
231  delete presult;
232  return StatusCode::FAILURE;
233  }
234 
235  ATH_MSG_DEBUG("Decoding the output of PID=" << childPid << " with the size=" << output.size);
236 
237  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) {
238  // We are dealing with the bootstrap function.
239  // Schedule exec_func()
240  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC,childPid)) {
241  ATH_MSG_ERROR("Problem scheduling execution on PID=" << childPid);
242  free(presult->output.data);
243  delete presult;
244  return StatusCode::FAILURE;
245  }
246 
247  // Update process state in the m_procStates map
248  itChildState->second=PROC_STATE_EXEC;
249  }
250 
252  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
253 
254  if(func==AthenaMPToolBase::FUNC_EXEC) {
255  // Store the number of processed events
256  int nevt(0);
257  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
258  m_nProcessedEvents[childPid]=nevt;
259  ATH_MSG_DEBUG("PID=" << childPid << " processed " << nevt << " events");
260 
261  // Add PID to the finalization queue
262  m_finQueue.push_back(childPid);
263  ATH_MSG_DEBUG("Added PID=" << childPid << " to the finalization queue");
264 
265  // If this is the only element in the queue then start its finalization
266  // Otherwise it has to wait its turn until all previous processes have been finalized
267  if(m_finQueue.size()==1) {
268  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,childPid)) {
269  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << childPid);
270  free(presult->output.data);
271  delete presult;
272  return StatusCode::FAILURE;
273  }
274  else {
275  ATH_MSG_INFO("Scheduled finalization of PID=" << childPid);
276  }
277  }
278 
279  // Update process state in the m_procStates map
280  itChildState->second=PROC_STATE_FIN;
281  }
282  else if(func==AthenaMPToolBase::FUNC_FIN) {
283  ATH_MSG_DEBUG("Finished finalization of PID=" << childPid);
284  pid_t pidFront = m_finQueue.front();
285  if(pidFront==childPid) {
286  // pid received as expected
287 
288  // Set the process free
289  if(m_processGroup->map_async(0,0,pidFront)) {
290  ATH_MSG_ERROR("Failed to set the process PID=" << pidFront << " free");
291  free(presult->output.data);
292  delete presult;
293  return StatusCode::FAILURE;
294  }
295 
296  // Remove it from the queue
297  m_finQueue.pop_front();
298  ATH_MSG_DEBUG("PID=" << childPid << " removed from the queue");
299  // Schedule finalization of the next process in the queue
300  if(m_finQueue.size()) {
301  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
302  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
303  free(presult->output.data);
304  delete presult;
305  return StatusCode::FAILURE;
306  }
307  else {
308  ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
309  }
310  }
311  }
312  else {
313  // Error: unexpected pid received from presult
314  ATH_MSG_ERROR("Finalized PID=" << childPid << " while PID=" << pid << " was expected");
315  free(presult->output.data);
316  delete presult;
317  return StatusCode::FAILURE;
318  }
319 
320  // Update process state in the m_procStates map
321  itChildState->second=PROC_STATE_STOP;
322  }
323  }
324  free(presult->output.data);
325  delete presult;
326  }
327  // ____________________ ______________________________________________ ______________________________
328 
329  return StatusCode::SUCCESS;
330 }
331 
333 {
334  ATH_MSG_INFO("Statuses of event processors");
335  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
336  for(size_t i=0; i<statuses.size(); ++i) {
337  // Get the number of events processed by this worker
338  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
339  std::ostringstream ostr;
340  if(it==m_nProcessedEvents.end())
341  ostr << "N/A";
342  else
343  ostr << it->second;
344  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid
345  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
346  << ". Number of events processed: " << ostr.str());
347  }
348 }
349 
350 void EvtRangeProcessor::subProcessLogs(std::vector<std::string>& filenames)
351 {
352  filenames.clear();
353  for(int i=0; i<m_nprocs; ++i) {
354  std::ostringstream workerIndex;
355  workerIndex << i;
356  std::filesystem::path worker_rundir(m_subprocTopDir);
357  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
358  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
359  }
360 }
361 
363 {
365  return jobOutputs;
366 }
367 
368 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::bootstrap_func()
369 {
370  if(m_debug) waitForSignal();
371 
372  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
373  outwork->data = CxxUtils::xmalloc(sizeof(int));
374  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
375  outwork->size = sizeof(int);
376  // ...
377  // (possible) TODO: extend outwork with some error message, which will be eventually
378  // reported in the master proces
379  // ...
380 
381  // ________________________ Get RankID ________________________
382  //
383  if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
384  ATH_MSG_ERROR("Unable to get rank ID!");
385  return outwork;
386  }
387  std::ostringstream workindex;
388  workindex<<m_rankId;
389 
390  // ________________________ Worker dir: mkdir ________________________
391  std::filesystem::path worker_rundir(m_subprocTopDir);
392  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
393  // TODO: this "worker_" can be made configurable too
394 
395  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
396  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
397  return outwork;
398  }
399 
400  // ________________________ Redirect logs ________________________
401  if(!m_debug) {
402  if(redirectLog(worker_rundir.string()))
403  return outwork;
404 
405  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
406  }
407 
408  // ________________________ Update Io Registry ____________________________
409  if(updateIoReg(worker_rundir.string()))
410  return outwork;
411 
412  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
413 
414  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
415  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
416  if(std::filesystem::is_regular_file("SimParams.db"))
417  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
418  if(std::filesystem::is_regular_file("DigitParams.db"))
419  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
420  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
421  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
422 
423  // _______________________ Handle saved PFC (if any) ______________________
424  if(handleSavedPfc(abs_worker_rundir))
425  return outwork;
426 
427  // ________________________ reopen descriptors ____________________________
428  if(reopenFds())
429  return outwork;
430 
431  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
432 
433 
434  // ________________________ I/O reinit ________________________
435  if(!m_ioMgr->io_reinitialize().isSuccess()) {
436  ATH_MSG_ERROR("Failed to reinitialize I/O");
437  return outwork;
438  } else {
439  ATH_MSG_DEBUG("Successfully reinitialized I/O");
440  }
441 
442  // ________________________ Event selector restart ________________________
443  IService* evtSelSvc = dynamic_cast<IService*>(evtSelector());
444  if(!evtSelSvc) {
445  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
446  return outwork;
447  }
448  if(!evtSelSvc->start().isSuccess()) {
449  ATH_MSG_ERROR("Failed to restart the event selector");
450  return outwork;
451  } else {
452  ATH_MSG_DEBUG("Successfully restarted the event selector");
453  }
454 
455  // ________________________ Restart background event selectors in pileup jobs ________________________
456  if(m_isPileup) {
457  const std::list<IService*>& service_list = serviceLocator()->getServices();
458  std::list<IService*>::const_iterator itSvc = service_list.begin(),
459  itSvcLast = service_list.end();
460  for(;itSvc!=itSvcLast;++itSvc) {
461  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc);
462  if(evtsel && (evtsel != evtSelector())) {
463  if((*itSvc)->start().isSuccess())
464  ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name());
465  else {
466  ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name());
467  return outwork;
468  }
469  }
470  }
471  }
472 
473  // ________________________ Worker dir: chdir ________________________
474  if(chdir(worker_rundir.string().c_str())==-1) {
475  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
476  return outwork;
477  }
478 
479  // Declare success and return
480  *(int*)(outwork->data) = 0;
481  return outwork;
482 }
483 
484 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::exec_func()
485 {
486  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
487 
488  int nEvt(1);
489  int nEventsProcessed(0);
490 
491  std::queue<std::string> queueTokens;
492 
493  // Get the yampl connection channels
494  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
495  std::string socket2ScattererName = m_channel2Scatterer.value() + std::string("_") + m_randStr;
496  yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
497  ATH_MSG_INFO("Created CLIENT socket to the Scatterer: " << socket2ScattererName);
498  std::ostringstream pidstr;
499  pidstr << getpid();
500 
501  // Construct a "welcome" message to be sent to the EvtRangeScatterer
502  std::string ping = pidstr.str() + std::string(" ready for event processing");
503 
504  while(true) {
505  void* message2scatterer = CxxUtils::xmalloc(ping.size());
506  memcpy(message2scatterer,ping.data(),ping.size());
507  socket2Scatterer->send(message2scatterer,ping.size());
508  ATH_MSG_INFO("Sent a welcome message to the Scatterer");
509 
510  // Get the response - list of tokens - from the scatterer.
511  // The format of the response: | ResponseSize | RangeID, | evtEvtRange[,evtToken] |
512  char *responseBuffer(0);
513  std::string strPeerId;
514  ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
515  // If response size is 0 then break the loop
516  if(responseSize==1) {
517  ATH_MSG_INFO("Empty range received. Terminating the loop");
518  break;
519  }
520 
521  std::string responseStr(responseBuffer,responseSize);
522  ATH_MSG_INFO("Received response from the Scatterer : " << responseStr);
523 
524  // Start timing
525  System::ProcessTime time_start = System::getProcessTime();
526 
527  size_t startpos(0);
528  size_t endpos = responseStr.find(',');
529  while(endpos!=std::string::npos) {
530  queueTokens.push(responseStr.substr(startpos,endpos-startpos));
531  startpos = endpos+1;
532  endpos = responseStr.find(',',startpos);
533  }
534  queueTokens.push(responseStr.substr(startpos));
535  // Actually the first element in the tokens queue is the RangeID. Get it
536  std::string rangeID = queueTokens.front();
537  queueTokens.pop();
538  ATH_MSG_INFO("Received RangeID=" << rangeID);
539  // Fire an incident
540  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
541 
542  // Here we need to support two formats of the responseStr
543  // Format 1. RangeID,startEvent,endEvent
544  // Format 2. RangeID,fileName,startEvent,endEvent
545  //
546  // The difference between these two is that for Format 2 we first
547  // need to update InputCollections property on the Event Selector
548  // and only after that proceed with seeking
549  //
550  // The seeking part is identical for Format 1 and 2
551 
553 
554  // Determine the format
555  std::string filename("");
556  if(queueTokens.front().find("PFN:")==0) {
557  // We have Format 2
558  // Update InputCollections property of the Event Selector with the file name from Event Range
559  filename = queueTokens.front().substr(4);
560  if(setNewInputFile(filename).isFailure()) {
561  ATH_MSG_WARNING("Failed to set input file for the range: " << rangeID);
563  reportError(socket2Scatterer,rangeStatus);
564  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
565  continue;
566  }
567  queueTokens.pop();
568  }
569 
570  // Get the number of events to process
571  int startEvent = std::atoi(queueTokens.front().c_str());
572  queueTokens.pop();
573  int endEvent = std::atoi(queueTokens.front().c_str());
574  queueTokens.pop();
575  ATH_MSG_INFO("Range fields. File Name: " << (filename.empty()?"N/A":filename)
576  << ", First Event:" << startEvent
577  << ", Last Event:" << endEvent);
578 
579  // Process the events
580  IEvtSelector::Context* ctx = nullptr;
581  if (evtSelector()->createContext (ctx).isFailure()) {
582  ATH_MSG_WARNING("Failed to create IEventSelector context.");
584  }
585  else {
586  for(int i(startEvent-1); i<endEvent; ++i) {
587  StatusCode sc = m_evtSeek->seek(*ctx, i);
588  if(sc.isRecoverable()) {
589  ATH_MSG_WARNING("Event " << i << " from range: " << rangeID << " not in the input file");
591  break;
592  }
593  else if(sc.isFailure()) {
594  ATH_MSG_WARNING("Failed to seek to " << i << " in range: " << rangeID);
596  break;
597  }
598  ATH_MSG_INFO("Seek to " << i << " succeeded");
599  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
600  sc = m_evtProcessor->nextEvent(nEvt++);
601 
602  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
603  if(sc.isFailure()){
604  ATH_MSG_WARNING("Failed to process the event " << i << " in range:" << rangeID);
606  break;
607  }
608  else {
609  ATH_MSG_DEBUG("Event processed");
610  nEventsProcessed++;
611  }
612  }
613  }
614  if (evtSelector()->releaseContext (ctx).isFailure()) {
615  ATH_MSG_WARNING("Failed to release IEventSelector context.");
616  }
617 
618  // Fire dummy NextEventRange incident in order to cut the previous output and report it
619  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
620  if(rangeStatus!=AthenaMPToolBase::ESRANGE_SUCCESS) {
621  reportError(socket2Scatterer,rangeStatus);
622  continue;
623  }
624 
625  // Event range successfully processed
626  std::string strOutpFile;
627  // Get the full path of the event range output file
628  for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
629  if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
630  if(strOutpFile.empty()) {
631  strOutpFile = fdIt->path().string();
632  }
633  else {
634  strOutpFile += (std::string(",")+fdIt->path().string());
635  }
636  }
637  }
638 
639  // Stop timing
640  System::ProcessTime time_delta = System::getProcessTime() - time_start;
641 
642  // Prepare the output report
643  if(!strOutpFile.empty()) {
644  // We need to combine the output file name with
645  // 1. RangeID (requested by JEDI)
646  // 2. CPU time
647  // 3. Wall time
648  std::ostringstream outputReportStream;
649  outputReportStream << strOutpFile
650  << ",ID:" << rangeID
651  << ",CPU:" << time_delta.cpuTime<System::Sec>()
652  << ",WALL:" << time_delta.elapsedTime<System::Sec>();
653  std::string outputFileReport = outputReportStream.str();
654 
655  // Report the output
656  message2scatterer = CxxUtils::xmalloc(outputFileReport.size());
657  memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
658  socket2Scatterer->send(message2scatterer,outputFileReport.size());
659  ATH_MSG_INFO("Reported the output " << outputFileReport);
660  }
661  else {
662  // This is an error: range successfully processed but no outputs were made
663  ATH_MSG_WARNING("Failed to make an output file for range: " << rangeID);
665  }
666  } // Main "event loop"
667 
668  if(m_evtProcessor->executeRun(0).isFailure()) {
669  ATH_MSG_WARNING("Could not finalize the Run");
670  }
671 
672  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
673 
674  // Return value: "ERRCODE|Func_Flag|NEvt"
675  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
676  void* outdata = CxxUtils::xmalloc(outsize);
677  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
679  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
680  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
681 
682  outwork->data = outdata;
683  outwork->size = outsize;
684  // ...
685  // (possible) TODO: extend outwork with some error message, which will be eventually
686  // reported in the master proces
687  // ...
688 
689  delete socket2Scatterer;
690  delete socketFactory;
691 
692  return outwork;
693 }
694 
695 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::fin_func()
696 {
697  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
698 
699  if(m_appMgr->stop().isFailure()) {
700  ATH_MSG_WARNING("Unable to stop AppMgr");
701  }
702  else {
703  if(m_appMgr->finalize().isFailure()) {
704  std::cout << "Unable to finalize AppMgr" << std::endl;
705  }
706  }
707 
708  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
709 
710  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
711  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
712  void* outdata = CxxUtils::xmalloc(outsize);
713  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
715  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
716  int nEvt = -1;
717  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
718 
719  outwork->data = outdata;
720  outwork->size = outsize;
721 
722  return outwork;
723 }
724 
725 StatusCode EvtRangeProcessor::startProcess()
726 {
727  m_nprocs++;
728 
729  // Create a rank for the new process
730  if(!m_sharedRankQueue->send_basic<int>(m_nprocs-1)) {
731  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
732  return StatusCode::FAILURE;
733  }
734 
735  pid_t pid = m_processGroup->launchProcess();
736  if(pid==0) {
737  ATH_MSG_ERROR("Unable to start new process");
738  return StatusCode::FAILURE;
739  }
740 
741  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP,pid)) {
742  ATH_MSG_ERROR("Unable to bootstrap new process");
743  return StatusCode::FAILURE;
744  }
745 
747  return StatusCode::SUCCESS;
748 }
749 
751 {
752  if(m_inpFile == newFile) return StatusCode::SUCCESS;
753 
754  // Get Property Server
755  IProperty* propertyServer = dynamic_cast<IProperty*>(evtSelector());
756  if(!propertyServer) {
757  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
758  return StatusCode::FAILURE;
759  }
760 
761  std::string propertyName("InputCollections");
762  if(m_inpFile.empty()) {
763  std::vector<std::string> vect;
764  StringArrayProperty inputFileList(propertyName, vect);
765  if(propertyServer->getProperty(&inputFileList).isFailure()) {
766  ATH_MSG_ERROR("Failed to get InputCollections property value of the Event Selector");
767  return StatusCode::FAILURE;
768  }
769  if(newFile==inputFileList.value()[0]) {
770  m_inpFile = newFile;
771  return StatusCode::SUCCESS;
772  }
773  }
774  std::vector<std::string> vect{newFile,};
775  StringArrayProperty newInputFileList(std::move(propertyName), vect);
776  if(propertyServer->setProperty(newInputFileList).isFailure()) {
777  ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector");
778  return StatusCode::FAILURE;
779  }
780  m_inpFile=newFile;
781  return StatusCode::SUCCESS;
782 }
783 
785 {
786  pid_t pid = getpid();
787  size_t messageSize = sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status);
788  void* message2scatterer = CxxUtils::xmalloc(messageSize);
789  memcpy(message2scatterer,&pid,sizeof(pid_t));
790  memcpy((pid_t*)message2scatterer+1,&status,sizeof(AthenaMPToolBase::ESRange_Status));
791  socket->send(message2scatterer,messageSize);
792 }
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
EvtRangeProcessor::~EvtRangeProcessor
virtual ~EvtRangeProcessor() override
Definition: EvtRangeProcessor.cxx:44
EvtRangeProcessor::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: EvtRangeProcessor.cxx:695
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:403
EvtRangeProcessor::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: EvtRangeProcessor.cxx:368
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
EvtRangeProcessor::m_activeWorkers
int m_activeWorkers
Keep track of the number of workers.
Definition: EvtRangeProcessor.h:69
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:64
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
EvtRangeProcessor::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: EvtRangeProcessor.cxx:362
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
EvtRangeProcessor::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: EvtRangeProcessor.cxx:332
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
EvtRangeProcessor.h
EvtRangeProcessor::m_debug
Gaudi::Property< bool > m_debug
Definition: EvtRangeProcessor.h:66
EvtRangeProcessor::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: EvtRangeProcessor.h:76
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:58
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
skel.it
it
Definition: skel.GENtoEVGEN.py:407
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Name of the event selector.
Definition: AthenaMPToolBase.h:89
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
athena.exitcode
int exitcode
Definition: athena.py:161
ProcessGroup.h
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:61
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
EvtRangeProcessor::m_inpFile
std::string m_inpFile
Cached name of the input file. To avoid reopening.
Definition: EvtRangeProcessor.h:70
EvtRangeProcessor::m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSeek
Definition: EvtRangeProcessor.h:74
EvtRangeProcessor::m_rankId
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
Definition: EvtRangeProcessor.h:68
EvtRangeProcessor::PROC_STATE_EXEC
@ PROC_STATE_EXEC
Definition: EvtRangeProcessor.h:58
EvtRangeProcessor::m_finQueue
std::deque< pid_t > m_finQueue
Definition: EvtRangeProcessor.h:80
SUSY_SimplifiedModel_PostInclude.process
string process
Definition: SUSY_SimplifiedModel_PostInclude.py:43
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
EvtRangeProcessor::m_procStates
std::map< pid_t, ProcessState > m_procStates
Definition: EvtRangeProcessor.h:81
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
EvtRangeProcessor::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: EvtRangeProcessor.h:79
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:103
lumiFormat.i
int i
Definition: lumiFormat.py:85
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:67
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:83
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:59
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
EvtRangeProcessor::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: EvtRangeProcessor.cxx:484
EvtRangeProcessor::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: EvtRangeProcessor.cxx:350
EvtRangeProcessor::EvtRangeProcessor
EvtRangeProcessor(const std::string &type, const std::string &name, const IInterface *parent)
Definition: EvtRangeProcessor.cxx:34
merge.output
output
Definition: merge.py:16
python.PyKernel.detStore
detStore
Definition: PyKernel.py:41
EvtRangeProcessor::reportError
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Definition: EvtRangeProcessor.cxx:784
EvtRangeProcessor::initialize
virtual StatusCode initialize() override
Definition: EvtRangeProcessor.cxx:48
EvtRangeProcessor::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: EvtRangeProcessor.h:72
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:62
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
EvtRangeProcessor::setNewInputFile
StatusCode setNewInputFile(const std::string &newFile)
Definition: EvtRangeProcessor.cxx:750
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
AthenaInterprocess::Process
Definition: Process.h:17
EvtRangeProcessor::m_channel2Scatterer
Gaudi::Property< std::string > m_channel2Scatterer
Definition: EvtRangeProcessor.h:64
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
grepfile.filenames
list filenames
Definition: grepfile.py:34
EvtRangeProcessor::PROC_STATE_INIT
@ PROC_STATE_INIT
Definition: EvtRangeProcessor.h:57
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
EvtRangeProcessor::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EvtRangeProcessor.h:73
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
EvtRangeProcessor::m_sharedFailedPidQueue
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
Definition: EvtRangeProcessor.h:77
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:23
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
merge.status
status
Definition: merge.py:16
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
EvtRangeProcessor::PROC_STATE_FIN
@ PROC_STATE_FIN
Definition: EvtRangeProcessor.h:59
EvtRangeProcessor::PROC_STATE_STOP
@ PROC_STATE_STOP
Definition: EvtRangeProcessor.h:60
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:60
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
xmalloc.h
Trapping version of malloc.
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:63
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:94