ATLAS Offline Software
EvtRangeProcessor.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "EvtRangeProcessor.h"
6 #include "copy_file_icc_hack.h"
8 
10 #include "GaudiKernel/IEvtSelector.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/IFileMgr.h"
13 #include "GaudiKernel/IChronoStatSvc.h"
14 #include "GaudiKernel/ISvcLocator.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/FileIncident.h"
17 #include "GaudiKernel/Timing.h"
18 
19 #include <sys/stat.h>
20 #include <sstream>
21 #include <fstream>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <stdexcept>
26 #include <queue>
27 #include <signal.h>
28 #include <filesystem>
29 
30 #include "yampl/SocketFactory.h"
31 
32 
34  , const std::string& name
35  , const IInterface* parent)
37  , m_rankId(-1)
38  , m_nEventsBeforeFork(0)
39  , m_activeWorkers(0)
40  , m_inpFile("")
41  , m_chronoStatSvc("ChronoStatSvc", name)
42  , m_incidentSvc("IncidentSvc", name)
43  , m_evtSeek(nullptr)
44  , m_channel2Scatterer("")
45  , m_channel2EvtSel("")
46  , m_sharedRankQueue(0)
47  , m_sharedFailedPidQueue(0)
48  , m_debug(false)
49 {
50  declareInterface<IAthenaMPTool>(this);
51 
52  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
53  declareProperty("Channel2Scatterer", m_channel2Scatterer);
54  declareProperty("Channel2EvtSel", m_channel2EvtSel);
55  declareProperty("Debug", m_debug);
56 
57  m_subprocDirPrefix = "worker_";
58 }
59 
61 {
62 }
63 
65 {
66  ATH_MSG_DEBUG("In initialize");
67 
69  ATH_CHECK(serviceLocator()->service(m_evtSelName,m_evtSeek));
70  ATH_CHECK(m_chronoStatSvc.retrieve());
71  ATH_CHECK(m_incidentSvc.retrieve());
72 
73  return StatusCode::SUCCESS;
74 }
75 
77 {
78  delete m_sharedRankQueue;
79  return StatusCode::SUCCESS;
80 }
81 
82 int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
83 {
84  ATH_MSG_DEBUG("In makePool " << getpid());
85 
86  if(nprocs==0 || nprocs<-1) {
87  ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
88  return -1;
89  }
90 
91  if(topdir.empty()) {
92  ATH_MSG_ERROR("Empty name for the top directory!");
93  return -1;
94  }
95 
96  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
98  m_subprocTopDir = topdir;
99 
100  // Create rank queue and fill it
101  std::ostringstream rankQueueName;
102  rankQueueName << "EvtRangeProcessor_RankQueue_" << getpid() << "_" << m_randStr;
103  m_sharedRankQueue = new AthenaInterprocess::SharedQueue(rankQueueName.str(),m_nprocs,sizeof(int));
104  for(int i=0; i<m_nprocs; ++i)
105  if(!m_sharedRankQueue->send_basic<int>(i)) {
106  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
107  return -1;
108  }
109 
110  // Create the process group and map_async bootstrap
112  ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
113  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP)) {
114  return -1;
115  }
116  ATH_MSG_INFO("Workers bootstraped");
117 
118  // Populate the m_procStates map
120  m_procStates[process.getProcessID()] = PROC_STATE_INIT;
121  }
122 
123  return m_nprocs;
124 }
125 
127 {
128  ATH_MSG_DEBUG("In exec " << getpid());
129 
130  // Do nothing here. The exec will be mapped on workers one at a time ...
131 
132  return StatusCode::SUCCESS;
133 }
134 
135 StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
136 {
137  // This method performs two tasks:
138  // 1. Checks if any of the workers has changed its state, and if so performs appropriate actions
139  // 2. Tries to pull one result from the workers results queue, and if there is one, then decodes it
140 
141  // First make sure we have a valid pointer to the Failed PID Queue
142  if(m_sharedFailedPidQueue==0) {
143  if(detStore()->retrieve(m_sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
144  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
145  return StatusCode::FAILURE;
146  }
147  }
148 
149  // ____________________ Step 1: check for state changes in the workers ______________________________
150  StatusCode sc = AthenaMPToolBase::wait_once(pid);
151  if(pid>0) {
152  // One of the workers finished. We need to figure out whether or not it finished abnormally
153 
154  auto itProcState = m_procStates.find(pid);
155  if(itProcState==m_procStates.end()) {
156  // Untracked subprocess?? Something's wrong. Exit
157  // To Do: how to report this error to the pilot?
158  sc.ignore();
159  ATH_MSG_ERROR("Detected untracked process ID=" << pid);
160  return StatusCode::FAILURE;
161  }
162 
163  // Deal with failed workers
164  if(sc.isFailure()) {
165 
166  switch(itProcState->second) {
167  case PROC_STATE_INIT:
168  // If the failed process was in INIT state, exit immediately
169  ATH_MSG_ERROR("Worker with process ID=" << pid << " failed at initialization!");
170  return StatusCode::FAILURE;
171  case PROC_STATE_EXEC:
172  // If the failed process was in EXEC state, report pid to EvtRangeScatterer and attempt to start new worker
173 
174  // Report pid to Event Range Scatterer
176  // To Do: how to report this error to the pilot?
177  ATH_MSG_ERROR("Failed to report the crashed pid to the Event Range Scatterer");
178  return StatusCode::FAILURE;
179  }
180 
181  // Start new worker
182  if(startProcess().isSuccess()) {
183  ATH_MSG_INFO("Successfully started new process");
184  pid=0;
185  }
186  else {
187  // To Do: how to report this error to the pilot?
188  ATH_MSG_ERROR("Failed to start new process");
189  return StatusCode::FAILURE;
190  }
191  break;
192  case PROC_STATE_FIN:
193  // If the failed process was in FIN state, remove pid from the finQueue and schedule finalization of the next worker
194  m_finQueue.pop_front();
195 
196  if(m_finQueue.size()) {
197  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
198  // To Do: how to report this error to the pilot?
199  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
200  return StatusCode::FAILURE;
201  }
202  else {
203  ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
204  }
205  }
206  break;
207  case PROC_STATE_STOP:
208  break;
209  default:
210  ATH_MSG_ERROR("Detected unexpected state " << itProcState->second << " of failed worker with PID=" << pid);
211  return StatusCode::FAILURE;
212  }
213  }
214  else {
215  // The worker finished successfully and it was the last worker. Release the Event Range Scatterer
216  if(--m_activeWorkers==0
218  // To Do: how to report this error to the pilot?
219  ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
220  return StatusCode::FAILURE;
221  }
222  }
223 
224  // Erase the pid from m_procStates map
225  m_procStates.erase(itProcState);
226  }
227  else {
228  sc.ignore();
229  if(pid<0) {
230  // Here we failed to wait on the group. Exit immediately
231  // To Do: how to report this error to the pilot?
232  ATH_MSG_ERROR("Failed to wait on the process group!");
233  return StatusCode::FAILURE;
234  }
235  }
236  // ____________________ ______________________________________________ ______________________________
237 
238 
239  // ____________________ Step 2: decode worker result (if any) ______________________________
241  if(presult) {
242  int res{0};
243  if((unsigned)(presult->output.size)>=sizeof(int)) {
244  // Decode result
246 
247  // First extract pid from the ProcessResult and check its validity
248  pid_t childPid = presult->pid;
249  auto itChildState = m_procStates.find(childPid);
250  if(itChildState==m_procStates.end()) {
251  ATH_MSG_ERROR("Unable to find PID=" << childPid << " in the Proc States map!");
252  return StatusCode::FAILURE;
253  }
254 
255  ATH_MSG_DEBUG("Decoding the output of PID=" << childPid << " with the size=" << output.size);
256 
257  if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) {
258  // We are dealing with the bootstrap function.
259  // Schedule exec_func()
260  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC,childPid)) {
261  ATH_MSG_ERROR("Problem scheduling execution on PID=" << childPid);
262  return StatusCode::FAILURE;
263  }
264 
265  // Update process state in the m_procStates map
266  itChildState->second=PROC_STATE_EXEC;
267  }
268 
270  memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
271 
272  if(func==AthenaMPToolBase::FUNC_EXEC) {
273  // Store the number of processed events
274  int nevt(0);
275  memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
276  m_nProcessedEvents[childPid]=nevt;
277  ATH_MSG_DEBUG("PID=" << childPid << " processed " << nevt << " events");
278 
279  // Add PID to the finalization queue
280  m_finQueue.push_back(childPid);
281  ATH_MSG_DEBUG("Added PID=" << childPid << " to the finalization queue");
282 
283  // If this is the only element in the queue then start its finalization
284  // Otherwise it has to wait its turn until all previous processes have been finalized
285  if(m_finQueue.size()==1) {
286  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,childPid)) {
287  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << childPid);
288  return StatusCode::FAILURE;
289  }
290  else {
291  ATH_MSG_INFO("Scheduled finalization of PID=" << childPid);
292  }
293  }
294 
295  // Update process state in the m_procStates map
296  itChildState->second=PROC_STATE_FIN;
297  }
298  else if(func==AthenaMPToolBase::FUNC_FIN) {
299  ATH_MSG_DEBUG("Finished finalization of PID=" << childPid);
300  pid_t pidFront = m_finQueue.front();
301  if(pidFront==childPid) {
302  // pid received as expected
303 
304  // Set the process free
305  if(m_processGroup->map_async(0,0,pidFront)) {
306  ATH_MSG_ERROR("Failed to set the process PID=" << pidFront << " free");
307  return StatusCode::FAILURE;
308  }
309 
310  // Remove it from the queue
311  m_finQueue.pop_front();
312  ATH_MSG_DEBUG("PID=" << childPid << " removed from the queue");
313  // Schedule finalization of the next process in the queue
314  if(m_finQueue.size()) {
315  if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
316  ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
317  return StatusCode::FAILURE;
318  }
319  else {
320  ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
321  }
322  }
323  }
324  else {
325  // Error: unexpected pid received from presult
326  ATH_MSG_ERROR("Finalized PID=" << childPid << " while PID=" << pid << " was expected");
327  return StatusCode::FAILURE;
328  }
329 
330  // Update process state in the m_procStates map
331  itChildState->second=PROC_STATE_STOP;
332  }
333  }
334  free(presult->output.data);
335  delete presult;
336  if(res) return StatusCode::FAILURE;
337  }
338  // ____________________ ______________________________________________ ______________________________
339 
340  return StatusCode::SUCCESS;
341 }
342 
344 {
345  ATH_MSG_INFO("Statuses of event processors");
346  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
347  for(size_t i=0; i<statuses.size(); ++i) {
348  // Get the number of events processed by this worker
349  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
350  std::ostringstream ostr;
351  if(it==m_nProcessedEvents.end())
352  ostr << "N/A";
353  else
354  ostr << it->second;
355  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid
356  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
357  << ". Number of events processed: " << ostr.str());
358  }
359 }
360 
361 void EvtRangeProcessor::subProcessLogs(std::vector<std::string>& filenames)
362 {
363  filenames.clear();
364  for(int i=0; i<m_nprocs; ++i) {
365  std::ostringstream workerIndex;
366  workerIndex << i;
367  std::filesystem::path worker_rundir(m_subprocTopDir);
368  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
369  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
370  }
371 }
372 
374 {
376  return jobOutputs;
377 }
378 
379 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::bootstrap_func()
380 {
381  if(m_debug) waitForSignal();
382 
383  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
384  outwork->data = malloc(sizeof(int));
385  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
386  outwork->size = sizeof(int);
387  // ...
388  // (possible) TODO: extend outwork with some error message, which will be eventually
389  // reported in the master proces
390  // ...
391 
392  // ________________________ Get RankID ________________________
393  //
395  ATH_MSG_ERROR("Unable to get rank ID!");
396  return outwork;
397  }
398  std::ostringstream workindex;
399  workindex<<m_rankId;
400 
401  // ________________________ Worker dir: mkdir ________________________
402  std::filesystem::path worker_rundir(m_subprocTopDir);
403  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
404  // TODO: this "worker_" can be made configurable too
405 
406  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
407  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
408  return outwork;
409  }
410 
411  // ________________________ Redirect logs ________________________
412  if(!m_debug) {
413  if(redirectLog(worker_rundir.string()))
414  return outwork;
415 
416  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
417  }
418 
419  // ________________________ Update Io Registry ____________________________
420  if(updateIoReg(worker_rundir.string()))
421  return outwork;
422 
423  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
424 
425  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
426  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
427  if(std::filesystem::is_regular_file("SimParams.db"))
428  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
429  if(std::filesystem::is_regular_file("DigitParams.db"))
430  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
431  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
432  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
433 
434  // _______________________ Handle saved PFC (if any) ______________________
435  if(handleSavedPfc(abs_worker_rundir))
436  return outwork;
437 
438  // ________________________ reopen descriptors ____________________________
439  if(reopenFds())
440  return outwork;
441 
442  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
443 
444 
445  // ________________________ I/O reinit ________________________
446  if(!m_ioMgr->io_reinitialize().isSuccess()) {
447  ATH_MSG_ERROR("Failed to reinitialize I/O");
448  return outwork;
449  } else {
450  ATH_MSG_DEBUG("Successfully reinitialized I/O");
451  }
452 
453  // ________________________ Event selector restart ________________________
454  IService* evtSelSvc = dynamic_cast<IService*>(evtSelector());
455  if(!evtSelSvc) {
456  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
457  return outwork;
458  }
459  if(!evtSelSvc->start().isSuccess()) {
460  ATH_MSG_ERROR("Failed to restart the event selector");
461  return outwork;
462  } else {
463  ATH_MSG_DEBUG("Successfully restarted the event selector");
464  }
465 
466  // ________________________ Restart background event selectors in pileup jobs ________________________
467  if(m_isPileup) {
468  const std::list<IService*>& service_list = serviceLocator()->getServices();
469  std::list<IService*>::const_iterator itSvc = service_list.begin(),
470  itSvcLast = service_list.end();
471  for(;itSvc!=itSvcLast;++itSvc) {
472  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc);
473  if(evtsel && (evtsel != evtSelector())) {
474  if((*itSvc)->start().isSuccess())
475  ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name());
476  else {
477  ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name());
478  return outwork;
479  }
480  }
481  }
482  }
483 
484  // ________________________ Worker dir: chdir ________________________
485  if(chdir(worker_rundir.string().c_str())==-1) {
486  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
487  return outwork;
488  }
489 
490  // Declare success and return
491  *(int*)(outwork->data) = 0;
492  return outwork;
493 }
494 
495 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::exec_func()
496 {
497  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
498 
499  int nEvt(1);
500  int nEventsProcessed(0);
501 
502  std::queue<std::string> queueTokens;
503 
504  // Get the yampl connection channels
505  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
506  std::string socket2ScattererName = m_channel2Scatterer.value() + std::string("_") + m_randStr;
507  yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
508  ATH_MSG_INFO("Created CLIENT socket to the Scatterer: " << socket2ScattererName);
509  std::ostringstream pidstr;
510  pidstr << getpid();
511 
512  // Construct a "welcome" message to be sent to the EvtRangeScatterer
513  std::string ping = pidstr.str() + std::string(" ready for event processing");
514 
515  while(true) {
516  void* message2scatterer = malloc(ping.size());
517  memcpy(message2scatterer,ping.data(),ping.size());
518  socket2Scatterer->send(message2scatterer,ping.size());
519  ATH_MSG_INFO("Sent a welcome message to the Scatterer");
520 
521  // Get the response - list of tokens - from the scatterer.
522  // The format of the response: | ResponseSize | RangeID, | evtEvtRange[,evtToken] |
523  char *responseBuffer(0);
524  std::string strPeerId;
525  ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
526  // If response size is 0 then break the loop
527  if(responseSize==1) {
528  ATH_MSG_INFO("Empty range received. Terminating the loop");
529  break;
530  }
531 
532  std::string responseStr(responseBuffer,responseSize);
533  ATH_MSG_INFO("Received response from the Scatterer : " << responseStr);
534 
535  // Start timing
536  System::ProcessTime time_start = System::getProcessTime();
537 
538  size_t startpos(0);
539  size_t endpos = responseStr.find(',');
540  while(endpos!=std::string::npos) {
541  queueTokens.push(responseStr.substr(startpos,endpos-startpos));
542  startpos = endpos+1;
543  endpos = responseStr.find(',',startpos);
544  }
545  queueTokens.push(responseStr.substr(startpos));
546  // Actually the first element in the tokens queue is the RangeID. Get it
547  std::string rangeID = queueTokens.front();
548  queueTokens.pop();
549  ATH_MSG_INFO("Received RangeID=" << rangeID);
550  // Fire an incident
551  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
552 
553  // Here we need to support two formats of the responseStr
554  // Format 1. RangeID,startEvent,endEvent
555  // Format 2. RangeID,fileName,startEvent,endEvent
556  //
557  // The difference between these two is that for Format 2 we first
558  // need to update InputCollections property on the Event Selector
559  // and only after that proceed with seeking
560  //
561  // The seeking part is identical for Format 1 and 2
562 
564 
565  // Determine the format
566  std::string filename("");
567  if(queueTokens.front().find("PFN:")==0) {
568  // We have Format 2
569  // Update InputCollections property of the Event Selector with the file name from Event Range
570  filename = queueTokens.front().substr(4);
571  if(setNewInputFile(filename).isFailure()) {
572  ATH_MSG_WARNING("Failed to set input file for the range: " << rangeID);
574  reportError(socket2Scatterer,rangeStatus);
575  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
576  continue;
577  }
578  queueTokens.pop();
579  }
580 
581  // Get the number of events to process
582  int startEvent = std::atoi(queueTokens.front().c_str());
583  queueTokens.pop();
584  int endEvent = std::atoi(queueTokens.front().c_str());
585  queueTokens.pop();
586  ATH_MSG_INFO("Range fields. File Name: " << (filename.empty()?"N/A":filename)
587  << ", First Event:" << startEvent
588  << ", Last Event:" << endEvent);
589 
590  // Process the events
591  IEvtSelector::Context* ctx = nullptr;
592  if (evtSelector()->createContext (ctx).isFailure()) {
593  ATH_MSG_WARNING("Failed to create IEventSelector context.");
595  }
596  else {
597  for(int i(startEvent-1); i<endEvent; ++i) {
598  StatusCode sc = m_evtSeek->seek(*ctx, i);
599  if(sc.isRecoverable()) {
600  ATH_MSG_WARNING("Event " << i << " from range: " << rangeID << " not in the input file");
602  break;
603  }
604  else if(sc.isFailure()) {
605  ATH_MSG_WARNING("Failed to seek to " << i << " in range: " << rangeID);
607  break;
608  }
609  ATH_MSG_INFO("Seek to " << i << " succeeded");
610  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
611  sc = m_evtProcessor->nextEvent(nEvt++);
612 
613  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
614  if(sc.isFailure()){
615  ATH_MSG_WARNING("Failed to process the event " << i << " in range:" << rangeID);
617  break;
618  }
619  else {
620  ATH_MSG_DEBUG("Event processed");
621  nEventsProcessed++;
622  }
623  }
624  }
625  if (evtSelector()->releaseContext (ctx).isFailure()) {
626  ATH_MSG_WARNING("Failed to release IEventSelector context.");
627  }
628 
629  // Fire dummy NextEventRange incident in order to cut the previous output and report it
630  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
631  if(rangeStatus!=AthenaMPToolBase::ESRANGE_SUCCESS) {
632  reportError(socket2Scatterer,rangeStatus);
633  continue;
634  }
635 
636  // Event range successfully processed
637  std::string strOutpFile;
638  // Get the full path of the event range output file
639  for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
640  if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
641  if(strOutpFile.empty()) {
642  strOutpFile = fdIt->path().string();
643  }
644  else {
645  strOutpFile += (std::string(",")+fdIt->path().string());
646  }
647  }
648  }
649 
650  // Stop timing
651  System::ProcessTime time_delta = System::getProcessTime() - time_start;
652 
653  // Prepare the output report
654  if(!strOutpFile.empty()) {
655  // We need to combine the output file name with
656  // 1. RangeID (requested by JEDI)
657  // 2. CPU time
658  // 3. Wall time
659  std::ostringstream outputReportStream;
660  outputReportStream << strOutpFile
661  << ",ID:" << rangeID
662  << ",CPU:" << time_delta.cpuTime<System::Sec>()
663  << ",WALL:" << time_delta.elapsedTime<System::Sec>();
664  std::string outputFileReport = outputReportStream.str();
665 
666  // Report the output
667  message2scatterer = malloc(outputFileReport.size());
668  memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
669  socket2Scatterer->send(message2scatterer,outputFileReport.size());
670  ATH_MSG_INFO("Reported the output " << outputFileReport);
671  }
672  else {
673  // This is an error: range successfully processed but no outputs were made
674  ATH_MSG_WARNING("Failed to make an output file for range: " << rangeID);
676  }
677  } // Main "event loop"
678 
679  if(m_evtProcessor->executeRun(0).isFailure()) {
680  ATH_MSG_WARNING("Could not finalize the Run");
681  }
682 
683  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
684 
685  // Return value: "ERRCODE|Func_Flag|NEvt"
686  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
687  void* outdata = malloc(outsize);
688  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
690  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
691  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
692 
693  outwork->data = outdata;
694  outwork->size = outsize;
695  // ...
696  // (possible) TODO: extend outwork with some error message, which will be eventually
697  // reported in the master proces
698  // ...
699 
700  delete socket2Scatterer;
701  delete socketFactory;
702 
703  return outwork;
704 }
705 
706 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::fin_func()
707 {
708  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
709 
710  if(m_appMgr->stop().isFailure()) {
711  ATH_MSG_WARNING("Unable to stop AppMgr");
712  }
713  else {
714  if(m_appMgr->finalize().isFailure()) {
715  std::cout << "Unable to finalize AppMgr" << std::endl;
716  }
717  }
718 
719  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
720 
721  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
722  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
723  void* outdata = malloc(outsize);
724  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
726  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
727  int nEvt = -1;
728  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
729 
730  outwork->data = outdata;
731  outwork->size = outsize;
732 
733  return outwork;
734 }
735 
736 StatusCode EvtRangeProcessor::startProcess()
737 {
738  m_nprocs++;
739 
740  // Create a rank for the new process
741  if(!m_sharedRankQueue->send_basic<int>(m_nprocs-1)) {
742  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
743  return StatusCode::FAILURE;
744  }
745 
746  pid_t pid = m_processGroup->launchProcess();
747  if(pid==0) {
748  ATH_MSG_ERROR("Unable to start new process");
749  return StatusCode::FAILURE;
750  }
751 
752  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP,pid)) {
753  ATH_MSG_ERROR("Unable to bootstrap new process");
754  return StatusCode::FAILURE;
755  }
756 
758  return StatusCode::SUCCESS;
759 }
760 
762 {
763  if(m_inpFile == newFile) return StatusCode::SUCCESS;
764 
765  // Get Property Server
766  IProperty* propertyServer = dynamic_cast<IProperty*>(evtSelector());
767  if(!propertyServer) {
768  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
769  return StatusCode::FAILURE;
770  }
771 
772  std::string propertyName("InputCollections");
773  if(m_inpFile.empty()) {
774  std::vector<std::string> vect;
775  StringArrayProperty inputFileList(propertyName, vect);
776  if(propertyServer->getProperty(&inputFileList).isFailure()) {
777  ATH_MSG_ERROR("Failed to get InputCollections property value of the Event Selector");
778  return StatusCode::FAILURE;
779  }
780  if(newFile==inputFileList.value()[0]) {
781  m_inpFile = newFile;
782  return StatusCode::SUCCESS;
783  }
784  }
785  std::vector<std::string> vect{newFile,};
786  StringArrayProperty newInputFileList(propertyName, vect);
787  if(propertyServer->setProperty(newInputFileList).isFailure()) {
788  ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector");
789  return StatusCode::FAILURE;
790  }
791  m_inpFile=newFile;
792  return StatusCode::SUCCESS;
793 }
794 
796 {
797  pid_t pid = getpid();
798  size_t messageSize = sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status);
799  void* message2scatterer = malloc(messageSize);
800  memcpy(message2scatterer,&pid,sizeof(pid_t));
801  memcpy((pid_t*)message2scatterer+1,&status,sizeof(AthenaMPToolBase::ESRange_Status));
802  socket->send(message2scatterer,messageSize);
803 }
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
EvtRangeProcessor::~EvtRangeProcessor
virtual ~EvtRangeProcessor() override
Definition: EvtRangeProcessor.cxx:60
EvtRangeProcessor::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: EvtRangeProcessor.cxx:706
EvtRangeProcessor::EvtRangeProcessor
EvtRangeProcessor()
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:432
EvtRangeProcessor::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: EvtRangeProcessor.cxx:379
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
EvtRangeProcessor::m_activeWorkers
int m_activeWorkers
Definition: EvtRangeProcessor.h:68
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:62
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
EvtRangeProcessor::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: EvtRangeProcessor.cxx:373
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
EvtRangeProcessor::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: EvtRangeProcessor.cxx:343
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
EvtRangeProcessor.h
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:56
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:68
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:32
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:362
skel.it
it
Definition: skel.GENtoEVGEN.py:423
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:86
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
athena.exitcode
int exitcode
Definition: athena.py:159
ProcessGroup.h
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:59
AthenaInterprocess::ProcessResult::pid
pid_t pid
Definition: ProcessGroup.h:23
EvtRangeProcessor::m_inpFile
std::string m_inpFile
Definition: EvtRangeProcessor.h:69
EvtRangeProcessor::m_rankId
int m_rankId
Definition: EvtRangeProcessor.h:66
EvtRangeProcessor::PROC_STATE_EXEC
@ PROC_STATE_EXEC
Definition: EvtRangeProcessor.h:61
EvtRangeProcessor::m_finQueue
std::deque< pid_t > m_finQueue
Definition: EvtRangeProcessor.h:82
SUSY_SimplifiedModel_PostInclude.process
string process
Definition: SUSY_SimplifiedModel_PostInclude.py:42
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
EvtRangeProcessor::m_evtSeek
IEvtSelectorSeek * m_evtSeek
Definition: EvtRangeProcessor.h:73
EvtRangeProcessor::m_channel2EvtSel
StringProperty m_channel2EvtSel
Definition: EvtRangeProcessor.h:76
IEvtSelectorSeek::seek
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
copy_file_icc_hack.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
EvtRangeProcessor::m_procStates
std::map< pid_t, ProcessState > m_procStates
Definition: EvtRangeProcessor.h:83
AthenaInterprocess::ProcessResult::output
ScheduledWork output
Definition: ProcessGroup.h:24
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
EvtRangeProcessor::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: EvtRangeProcessor.h:81
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EvtRangeProcessor::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: EvtRangeProcessor.h:67
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:99
lumiFormat.i
int i
Definition: lumiFormat.py:92
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:65
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
EvtRangeProcessor::m_channel2Scatterer
StringProperty m_channel2Scatterer
Definition: EvtRangeProcessor.h:75
res
std::pair< std::vector< unsigned int >, bool > res
Definition: JetGroupProductTest.cxx:14
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:81
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:57
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
EvtRangeProcessor::m_debug
bool m_debug
Definition: EvtRangeProcessor.h:85
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:129
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:341
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
EvtRangeProcessor::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: EvtRangeProcessor.cxx:495
EvtRangeProcessor::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: EvtRangeProcessor.h:78
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
EvtRangeProcessor::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: EvtRangeProcessor.cxx:361
merge.output
output
Definition: merge.py:17
EvtRangeProcessor::reportError
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Definition: EvtRangeProcessor.cxx:795
EvtRangeProcessor::initialize
virtual StatusCode initialize() override
Definition: EvtRangeProcessor.cxx:64
EvtRangeProcessor::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: EvtRangeProcessor.h:71
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:60
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
EvtRangeProcessor::setNewInputFile
StatusCode setNewInputFile(const std::string &newFile)
Definition: EvtRangeProcessor.cxx:761
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:278
AthenaInterprocess::Process
Definition: Process.h:17
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
grepfile.filenames
list filenames
Definition: grepfile.py:34
EvtRangeProcessor::PROC_STATE_INIT
@ PROC_STATE_INIT
Definition: EvtRangeProcessor.h:60
EvtRangeProcessor::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EvtRangeProcessor.h:72
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
EvtRangeProcessor::m_sharedFailedPidQueue
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
Definition: EvtRangeProcessor.h:79
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
merge.status
status
Definition: merge.py:17
EvtRangeProcessor::finalize
virtual StatusCode finalize() override
Definition: EvtRangeProcessor.cxx:76
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
IEvtSelectorSeek.h
Extension to IEvtSelector to allow for seeking.
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
EvtRangeProcessor::PROC_STATE_FIN
@ PROC_STATE_FIN
Definition: EvtRangeProcessor.h:62
EvtRangeProcessor::PROC_STATE_STOP
@ PROC_STATE_STOP
Definition: EvtRangeProcessor.h:63
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:58
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:425
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:369
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:61
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:90