ATLAS Offline Software
Loading...
Searching...
No Matches
EvtRangeProcessor.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "EvtRangeProcessor.h"
8
10#include "CxxUtils/xmalloc.h"
11#include "GaudiKernel/IEvtSelector.h"
12#include "GaudiKernel/IIoComponentMgr.h"
13#include "GaudiKernel/IFileMgr.h"
14#include "GaudiKernel/IChronoStatSvc.h"
15#include "GaudiKernel/ISvcLocator.h"
16#include "GaudiKernel/IIncidentSvc.h"
17#include "GaudiKernel/FileIncident.h"
18#include "GaudiKernel/Timing.h"
19
20#include <sys/stat.h>
21#include <sstream>
22#include <fstream>
23#include <unistd.h>
24#include <stdio.h>
25#include <stdint.h>
26#include <stdexcept>
27#include <queue>
28#include <signal.h>
29#include <filesystem>
30
31#include "yampl/SocketFactory.h"
32
33
35 , const std::string& name
36 , const IInterface* parent)
37 : AthenaMPToolBase(type,name,parent)
38 , m_chronoStatSvc("ChronoStatSvc", name)
39 , m_incidentSvc("IncidentSvc", name)
40{
41 m_subprocDirPrefix = "worker_";
42}
43
47
49{
50 ATH_MSG_DEBUG("In initialize");
51
53 m_evtSeek = serviceLocator()->service(m_evtSelName);
54 ATH_CHECK(m_evtSeek.isValid());
55 ATH_CHECK(m_chronoStatSvc.retrieve());
56 ATH_CHECK(m_incidentSvc.retrieve());
57
58 return StatusCode::SUCCESS;
59}
60
61int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
62{
63 ATH_MSG_DEBUG("In makePool " << getpid());
64
65 if(nprocs==0 || nprocs<-1) {
66 ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
67 return -1;
68 }
69
70 if(topdir.empty()) {
71 ATH_MSG_ERROR("Empty name for the top directory!");
72 return -1;
73 }
74
75 m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
77 m_subprocTopDir = topdir;
78
79 // Create rank queue and fill it
80 std::ostringstream rankQueueName;
81 rankQueueName << "EvtRangeProcessor_RankQueue_" << getpid() << "_" << m_randStr;
82 m_sharedRankQueue = std::make_unique<AthenaInterprocess::SharedQueue>(rankQueueName.str(),m_nprocs,sizeof(int));
83 for(int i=0; i<m_nprocs; ++i)
84 if(!m_sharedRankQueue->send_basic<int>(i)) {
85 ATH_MSG_ERROR("Unable to send int to the ranks queue!");
86 return -1;
87 }
88
89 // Create the process group and map_async bootstrap
90 m_processGroup = new AthenaInterprocess::ProcessGroup(m_nprocs);
91 ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
92 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP)) {
93 return -1;
94 }
95 ATH_MSG_INFO("Workers bootstrapped");
96
97 // Populate the m_procStates map
98 for(const AthenaInterprocess::Process& process : m_processGroup->getChildren()) {
99 m_procStates[process.getProcessID()] = PROC_STATE_INIT;
100 }
101
102 return m_nprocs;
103}
104
105StatusCode EvtRangeProcessor::exec()
106{
107 ATH_MSG_DEBUG("In exec " << getpid());
108
109 // Do nothing here. The exec will be mapped on workers one at a time ...
110
111 return StatusCode::SUCCESS;
112}
113
114StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
115{
116 // This method performs two tasks:
117 // 1. Checks if any of the workers has changed its state, and if so performs appropriate actions
118 // 2. Tries to pull one result from the workers results queue, and if there is one, then decodes it
119
120 // First make sure we have a valid pointer to the Failed PID Queue
122 if(detStore()->retrieve(m_sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
123 ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
124 return StatusCode::FAILURE;
125 }
126 }
127
128 // ____________________ Step 1: check for state changes in the workers ______________________________
129 StatusCode sc = AthenaMPToolBase::wait_once(pid);
130 if(pid>0) {
131 // One of the workers finished. We need to figure out whether or not it finished abnormally
132
133 auto itProcState = m_procStates.find(pid);
134 if(itProcState==m_procStates.end()) {
135 // Untracked subprocess?? Something's wrong. Exit
136 // To Do: how to report this error to the pilot?
137 sc.ignore();
138 ATH_MSG_ERROR("Detected untracked process ID=" << pid);
139 return StatusCode::FAILURE;
140 }
141
142 // Deal with failed workers
143 if(sc.isFailure()) {
144
145 switch(itProcState->second) {
146 case PROC_STATE_INIT:
147 // If the failed process was in INIT state, exit immediately
148 ATH_MSG_ERROR("Worker with process ID=" << pid << " failed at initialization!");
149 return StatusCode::FAILURE;
150 case PROC_STATE_EXEC:
151 // If the failed process was in EXEC state, report pid to EvtRangeScatterer and attempt to start new worker
152
153 // Report pid to Event Range Scatterer
154 if(!m_sharedFailedPidQueue->send_basic<pid_t>(pid)) {
155 // To Do: how to report this error to the pilot?
156 ATH_MSG_ERROR("Failed to report the crashed pid to the Event Range Scatterer");
157 return StatusCode::FAILURE;
158 }
159
160 // Start new worker
161 if(startProcess().isSuccess()) {
162 ATH_MSG_INFO("Successfully started new process");
163 pid=0;
164 }
165 else {
166 // To Do: how to report this error to the pilot?
167 ATH_MSG_ERROR("Failed to start new process");
168 return StatusCode::FAILURE;
169 }
170 break;
171 case PROC_STATE_FIN:
172 // If the failed process was in FIN state, remove pid from the finQueue and schedule finalization of the next worker
173 m_finQueue.pop_front();
174
175 if(m_finQueue.size()) {
176 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
177 // To Do: how to report this error to the pilot?
178 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
179 return StatusCode::FAILURE;
180 }
181 else {
182 ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
183 }
184 }
185 break;
186 case PROC_STATE_STOP:
187 break;
188 default:
189 ATH_MSG_ERROR("Detected unexpected state " << itProcState->second << " of failed worker with PID=" << pid);
190 return StatusCode::FAILURE;
191 }
192 }
193 else {
194 // The worker finished successfully and it was the last worker. Release the Event Range Scatterer
195 if(--m_activeWorkers==0
196 && !m_sharedFailedPidQueue->send_basic<pid_t>(-1)) {
197 // To Do: how to report this error to the pilot?
198 ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
199 return StatusCode::FAILURE;
200 }
201 }
202
203 // Erase the pid from m_procStates map
204 m_procStates.erase(itProcState);
205 }
206 else {
207 sc.ignore();
208 if(pid<0) {
209 // Here we failed to wait on the group. Exit immediately
210 // To Do: how to report this error to the pilot?
211 ATH_MSG_ERROR("Failed to wait on the process group!");
212 return StatusCode::FAILURE;
213 }
214 }
215 // ____________________ ______________________________________________ ______________________________
216
217
218 // ____________________ Step 2: decode worker result (if any) ______________________________
219 AthenaInterprocess::ProcessResult* presult = m_processGroup->pullOneResult();
220 if(presult) {
221 if((unsigned)(presult->output.size)>=sizeof(int)) {
222 // Decode result
223 const AthenaInterprocess::ScheduledWork& output = presult->output;
224
225 // First extract pid from the ProcessResult and check its validity
226 pid_t childPid = presult->pid;
227 auto itChildState = m_procStates.find(childPid);
228 if(itChildState==m_procStates.end()) {
229 ATH_MSG_ERROR("Unable to find PID=" << childPid << " in the Proc States map!");
230 free(presult->output.data);
231 delete presult;
232 return StatusCode::FAILURE;
233 }
234
235 ATH_MSG_DEBUG("Decoding the output of PID=" << childPid << " with the size=" << output.size);
236
237 if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) {
238 // We are dealing with the bootstrap function.
239 // Schedule exec_func()
240 if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC,childPid)) {
241 ATH_MSG_ERROR("Problem scheduling execution on PID=" << childPid);
242 free(presult->output.data);
243 delete presult;
244 return StatusCode::FAILURE;
245 }
246
247 // Update process state in the m_procStates map
248 itChildState->second=PROC_STATE_EXEC;
249 }
250
252 memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
253
255 // Store the number of processed events
256 int nevt(0);
257 memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
258 m_nProcessedEvents[childPid]=nevt;
259 ATH_MSG_DEBUG("PID=" << childPid << " processed " << nevt << " events");
260
261 // Add PID to the finalization queue
262 m_finQueue.push_back(childPid);
263 ATH_MSG_DEBUG("Added PID=" << childPid << " to the finalization queue");
264
265 // If this is the only element in the queue then start its finalization
266 // Otherwise it has to wait its turn until all previous processes have been finalized
267 if(m_finQueue.size()==1) {
268 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,childPid)) {
269 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << childPid);
270 free(presult->output.data);
271 delete presult;
272 return StatusCode::FAILURE;
273 }
274 else {
275 ATH_MSG_INFO("Scheduled finalization of PID=" << childPid);
276 }
277 }
278
279 // Update process state in the m_procStates map
280 itChildState->second=PROC_STATE_FIN;
281 }
282 else if(func==AthenaMPToolBase::FUNC_FIN) {
283 ATH_MSG_DEBUG("Finished finalization of PID=" << childPid);
284 pid_t pidFront = m_finQueue.front();
285 if(pidFront==childPid) {
286 // pid received as expected
287
288 // Set the process free
289 if(m_processGroup->map_async(0,0,pidFront)) {
290 ATH_MSG_ERROR("Failed to set the process PID=" << pidFront << " free");
291 free(presult->output.data);
292 delete presult;
293 return StatusCode::FAILURE;
294 }
295
296 // Remove it from the queue
297 m_finQueue.pop_front();
298 ATH_MSG_DEBUG("PID=" << childPid << " removed from the queue");
299 // Schedule finalization of the next process in the queue
300 if(m_finQueue.size()) {
301 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())) {
302 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
303 free(presult->output.data);
304 delete presult;
305 return StatusCode::FAILURE;
306 }
307 else {
308 ATH_MSG_INFO("Scheduled finalization of PID=" << m_finQueue.front());
309 }
310 }
311 }
312 else {
313 // Error: unexpected pid received from presult
314 ATH_MSG_ERROR("Finalized PID=" << childPid << " while PID=" << pid << " was expected");
315 free(presult->output.data);
316 delete presult;
317 return StatusCode::FAILURE;
318 }
319
320 // Update process state in the m_procStates map
321 itChildState->second=PROC_STATE_STOP;
322 }
323 }
324 free(presult->output.data);
325 delete presult;
326 }
327 // ____________________ ______________________________________________ ______________________________
328
329 return StatusCode::SUCCESS;
330}
331
333{
334 ATH_MSG_INFO("Statuses of event processors");
335 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
336 for(size_t i=0; i<statuses.size(); ++i) {
337 // Get the number of events processed by this worker
338 std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
339 std::ostringstream ostr;
340 if(it==m_nProcessedEvents.end())
341 ostr << "N/A";
342 else
343 ostr << it->second;
344 ATH_MSG_INFO("*** Process PID=" << statuses[i].pid
345 << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
346 << ". Number of events processed: " << ostr.str());
347 }
348}
349
350void EvtRangeProcessor::subProcessLogs(std::vector<std::string>& filenames)
351{
352 filenames.clear();
353 for(int i=0; i<m_nprocs; ++i) {
354 std::ostringstream workerIndex;
355 workerIndex << i;
356 std::filesystem::path worker_rundir(m_subprocTopDir);
357 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
358 filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
359 }
360}
361
367
368std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::bootstrap_func()
369{
371
372 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
373 outwork->data = CxxUtils::xmalloc(sizeof(int));
374 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
375 outwork->size = sizeof(int);
376 // ...
377 // (possible) TODO: extend outwork with some error message, which will be eventually
378 // reported in the master proces
379 // ...
380
381 // ________________________ Get RankID ________________________
382 //
383 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
384 ATH_MSG_ERROR("Unable to get rank ID!");
385 return outwork;
386 }
387 std::ostringstream workindex;
388 workindex<<m_rankId;
389
390 // ________________________ Worker dir: mkdir ________________________
391 std::filesystem::path worker_rundir(m_subprocTopDir);
392 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
393 // TODO: this "worker_" can be made configurable too
394
395 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
396 ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
397 return outwork;
398 }
399
400 // ________________________ Redirect logs ________________________
401 if(!m_debug) {
402 if(redirectLog(worker_rundir.string()))
403 return outwork;
404
405 ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
406 }
407
408 // ________________________ Update Io Registry ____________________________
409 if(updateIoReg(worker_rundir.string()))
410 return outwork;
411
412 ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
413
414 // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
415 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
416 if(std::filesystem::is_regular_file("SimParams.db"))
417 COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
418 if(std::filesystem::is_regular_file("DigitParams.db"))
419 COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
420 if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
421 COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
422
423 // _______________________ Handle saved PFC (if any) ______________________
424 if(handleSavedPfc(abs_worker_rundir))
425 return outwork;
426
427 // ________________________ reopen descriptors ____________________________
428 if(reopenFds())
429 return outwork;
430
431 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
432
433
434 // ________________________ I/O reinit ________________________
435 if(!m_ioMgr->io_reinitialize().isSuccess()) {
436 ATH_MSG_ERROR("Failed to reinitialize I/O");
437 return outwork;
438 } else {
439 ATH_MSG_DEBUG("Successfully reinitialized I/O");
440 }
441
442 // ________________________ Event selector restart ________________________
443 IService* evtSelSvc = dynamic_cast<IService*>(evtSelector());
444 if(!evtSelSvc) {
445 ATH_MSG_ERROR("Failed to dyncast event selector to IService");
446 return outwork;
447 }
448 if(!evtSelSvc->start().isSuccess()) {
449 ATH_MSG_ERROR("Failed to restart the event selector");
450 return outwork;
451 } else {
452 ATH_MSG_DEBUG("Successfully restarted the event selector");
453 }
454
455 // ________________________ Restart background event selectors in pileup jobs ________________________
456 if(m_isPileup) {
457 const std::list<IService*>& service_list = serviceLocator()->getServices();
458 std::list<IService*>::const_iterator itSvc = service_list.begin(),
459 itSvcLast = service_list.end();
460 for(;itSvc!=itSvcLast;++itSvc) {
461 IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc);
462 if(evtsel && (evtsel != evtSelector())) {
463 if((*itSvc)->start().isSuccess())
464 ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name());
465 else {
466 ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name());
467 return outwork;
468 }
469 }
470 }
471 }
472
473 // ________________________ Worker dir: chdir ________________________
474 if(chdir(worker_rundir.string().c_str())==-1) {
475 ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
476 return outwork;
477 }
478
479 // Declare success and return
480 *(int*)(outwork->data) = 0;
481 return outwork;
482}
483
484std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::exec_func()
485{
486 ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
487
488 int nEvt(1);
489 int nEventsProcessed(0);
490
491 std::queue<std::string> queueTokens;
492
493 // Get the yampl connection channels
494 yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
495 std::string socket2ScattererName = m_channel2Scatterer.value() + std::string("_") + m_randStr;
496 yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
497 ATH_MSG_INFO("Created CLIENT socket to the Scatterer: " << socket2ScattererName);
498 std::ostringstream pidstr;
499 pidstr << getpid();
500
501 // Construct a "welcome" message to be sent to the EvtRangeScatterer
502 std::string ping = pidstr.str() + std::string(" ready for event processing");
503
504 while(true) {
505 void* message2scatterer = CxxUtils::xmalloc(ping.size());
506 memcpy(message2scatterer,ping.data(),ping.size());
507 socket2Scatterer->send(message2scatterer,ping.size());
508 ATH_MSG_INFO("Sent a welcome message to the Scatterer");
509
510 // Get the response - list of tokens - from the scatterer.
511 // The format of the response: | ResponseSize | RangeID, | evtEvtRange[,evtToken] |
512 char *responseBuffer(0);
513 std::string strPeerId;
514 ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
515 // If response size is 0 then break the loop
516 if(responseSize==1) {
517 ATH_MSG_INFO("Empty range received. Terminating the loop");
518 break;
519 }
520
521 std::string responseStr(responseBuffer,responseSize);
522 ATH_MSG_INFO("Received response from the Scatterer : " << responseStr);
523
524 // Start timing
525 System::ProcessTime time_start = System::getProcessTime();
526
527 size_t startpos(0);
528 size_t endpos = responseStr.find(',');
529 while(endpos!=std::string::npos) {
530 queueTokens.push(responseStr.substr(startpos,endpos-startpos));
531 startpos = endpos+1;
532 endpos = responseStr.find(',',startpos);
533 }
534 queueTokens.push(responseStr.substr(startpos));
535 // Actually the first element in the tokens queue is the RangeID. Get it
536 std::string rangeID = queueTokens.front();
537 queueTokens.pop();
538 ATH_MSG_INFO("Received RangeID=" << rangeID);
539 // Fire an incident
540 m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
541
542 // Here we need to support two formats of the responseStr
543 // Format 1. RangeID,startEvent,endEvent
544 // Format 2. RangeID,fileName,startEvent,endEvent
545 //
546 // The difference between these two is that for Format 2 we first
547 // need to update InputCollections property on the Event Selector
548 // and only after that proceed with seeking
549 //
550 // The seeking part is identical for Format 1 and 2
551
553
554 // Determine the format
555 std::string filename("");
556 if(queueTokens.front().find("PFN:")==0) {
557 // We have Format 2
558 // Update InputCollections property of the Event Selector with the file name from Event Range
559 filename = queueTokens.front().substr(4);
560 if(setNewInputFile(filename).isFailure()) {
561 ATH_MSG_WARNING("Failed to set input file for the range: " << rangeID);
563 reportError(socket2Scatterer,rangeStatus);
564 m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
565 continue;
566 }
567 queueTokens.pop();
568 }
569
570 // Get the number of events to process
571 int startEvent = std::atoi(queueTokens.front().c_str());
572 queueTokens.pop();
573 int endEvent = std::atoi(queueTokens.front().c_str());
574 queueTokens.pop();
575 ATH_MSG_INFO("Range fields. File Name: " << (filename.empty()?"N/A":filename)
576 << ", First Event:" << startEvent
577 << ", Last Event:" << endEvent);
578
579 // Process the events
580 IEvtSelector::Context* ctx = nullptr;
581 if (evtSelector()->createContext (ctx).isFailure()) {
582 ATH_MSG_WARNING("Failed to create IEventSelector context.");
584 }
585 else {
586 for(int i(startEvent-1); i<endEvent; ++i) {
587 StatusCode sc = m_evtSeek->seek(*ctx, i);
588 if(sc.isRecoverable()) {
589 ATH_MSG_WARNING("Event " << i << " from range: " << rangeID << " not in the input file");
591 break;
592 }
593 else if(sc.isFailure()) {
594 ATH_MSG_WARNING("Failed to seek to " << i << " in range: " << rangeID);
596 break;
597 }
598 ATH_MSG_INFO("Seek to " << i << " succeeded");
599 m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
600 sc = m_evtProcessor->nextEvent(nEvt++);
601
602 m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
603 if(sc.isFailure()){
604 ATH_MSG_WARNING("Failed to process the event " << i << " in range:" << rangeID);
606 break;
607 }
608 else {
609 ATH_MSG_DEBUG("Event processed");
610 nEventsProcessed++;
611 }
612 }
613 }
614 if (evtSelector()->releaseContext (ctx).isFailure()) {
615 ATH_MSG_WARNING("Failed to release IEventSelector context.");
616 }
617
618 // Fire dummy NextEventRange incident in order to cut the previous output and report it
619 m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
620 if(rangeStatus!=AthenaMPToolBase::ESRANGE_SUCCESS) {
621 reportError(socket2Scatterer,rangeStatus);
622 continue;
623 }
624
625 // Event range successfully processed
626 std::string strOutpFile;
627 // Get the full path of the event range output file
628 for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
629 if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
630 if(strOutpFile.empty()) {
631 strOutpFile = fdIt->path().string();
632 }
633 else {
634 strOutpFile += (std::string(",")+fdIt->path().string());
635 }
636 }
637 }
638
639 // Stop timing
640 System::ProcessTime time_delta = System::getProcessTime() - time_start;
641
642 // Prepare the output report
643 if(!strOutpFile.empty()) {
644 // We need to combine the output file name with
645 // 1. RangeID (requested by JEDI)
646 // 2. CPU time
647 // 3. Wall time
648 std::ostringstream outputReportStream;
649 outputReportStream << strOutpFile
650 << ",ID:" << rangeID
651 << ",CPU:" << time_delta.cpuTime<System::Sec>()
652 << ",WALL:" << time_delta.elapsedTime<System::Sec>();
653 std::string outputFileReport = outputReportStream.str();
654
655 // Report the output
656 message2scatterer = CxxUtils::xmalloc(outputFileReport.size());
657 memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
658 socket2Scatterer->send(message2scatterer,outputFileReport.size());
659 ATH_MSG_INFO("Reported the output " << outputFileReport);
660 }
661 else {
662 // This is an error: range successfully processed but no outputs were made
663 ATH_MSG_WARNING("Failed to make an output file for range: " << rangeID);
665 }
666 } // Main "event loop"
667
668 if(m_evtProcessor->executeRun(0).isFailure()) {
669 ATH_MSG_WARNING("Could not finalize the Run");
670 }
671
672 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
673
674 // Return value: "ERRCODE|Func_Flag|NEvt"
675 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
676 void* outdata = CxxUtils::xmalloc(outsize);
677 *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
679 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
680 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
681
682 outwork->data = outdata;
683 outwork->size = outsize;
684 // ...
685 // (possible) TODO: extend outwork with some error message, which will be eventually
686 // reported in the master proces
687 // ...
688
689 delete socket2Scatterer;
690 delete socketFactory;
691
692 return outwork;
693}
694
695std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeProcessor::fin_func()
696{
697 ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
698
699 if(m_appMgr->stop().isFailure()) {
700 ATH_MSG_WARNING("Unable to stop AppMgr");
701 }
702 else {
703 if(m_appMgr->finalize().isFailure()) {
704 std::cout << "Unable to finalize AppMgr" << std::endl;
705 }
706 }
707
708 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
709
710 // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
711 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
712 void* outdata = CxxUtils::xmalloc(outsize);
713 *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
715 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
716 int nEvt = -1;
717 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
718
719 outwork->data = outdata;
720 outwork->size = outsize;
721
722 return outwork;
723}
724
725StatusCode EvtRangeProcessor::startProcess()
726{
727 m_nprocs++;
728
729 // Create a rank for the new process
731 ATH_MSG_ERROR("Unable to send int to the ranks queue!");
732 return StatusCode::FAILURE;
733 }
734
735 pid_t pid = m_processGroup->launchProcess();
736 if(pid==0) {
737 ATH_MSG_ERROR("Unable to start new process");
738 return StatusCode::FAILURE;
739 }
740
741 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP,pid)) {
742 ATH_MSG_ERROR("Unable to bootstrap new process");
743 return StatusCode::FAILURE;
744 }
745
747 return StatusCode::SUCCESS;
748}
749
750StatusCode EvtRangeProcessor::setNewInputFile(const std::string& newFile)
751{
752 if(m_inpFile == newFile) return StatusCode::SUCCESS;
753
754 // Get Property Server
755 IProperty* propertyServer = dynamic_cast<IProperty*>(evtSelector());
756 if(!propertyServer) {
757 ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
758 return StatusCode::FAILURE;
759 }
760
761 std::string propertyName("InputCollections");
762 if(m_inpFile.empty()) {
763 std::vector<std::string> vect;
764 StringArrayProperty inputFileList(propertyName, vect);
765 if(propertyServer->getProperty(&inputFileList).isFailure()) {
766 ATH_MSG_ERROR("Failed to get InputCollections property value of the Event Selector");
767 return StatusCode::FAILURE;
768 }
769 if(newFile==inputFileList.value()[0]) {
770 m_inpFile = newFile;
771 return StatusCode::SUCCESS;
772 }
773 }
774 std::vector<std::string> vect{newFile,};
775 StringArrayProperty newInputFileList(std::move(propertyName), vect);
776 if(propertyServer->setProperty(newInputFileList).isFailure()) {
777 ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector");
778 return StatusCode::FAILURE;
779 }
780 m_inpFile=newFile;
781 return StatusCode::SUCCESS;
782}
783
785{
786 pid_t pid = getpid();
787 size_t messageSize = sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status);
788 void* message2scatterer = CxxUtils::xmalloc(messageSize);
789 memcpy(message2scatterer,&pid,sizeof(pid_t));
790 memcpy((pid_t*)message2scatterer+1,&status,sizeof(AthenaMPToolBase::ESRange_Status));
791 socket->send(message2scatterer,messageSize);
792}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
int32_t pid_t
Extension to IEvtSelector to allow for seeking.
static Double_t sc
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
std::string m_evtSelName
Name of the event selector.
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
IEvtSelector * evtSelector()
virtual StatusCode initialize() override
Gaudi::Property< bool > m_isPileup
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IAppMgrUI > m_appMgr
int m_nprocs
Number of workers spawned by the master process.
ServiceHandle< IIoComponentMgr > m_ioMgr
AthenaInterprocess::ProcessGroup * m_processGroup
ServiceHandle< IEventProcessor > m_evtProcessor
std::string m_subprocDirPrefix
For ex. "worker__".
std::string fmterror(int errnum)
StatusCode setNewInputFile(const std::string &newFile)
std::string m_inpFile
Cached name of the input file. To avoid reopening.
std::deque< pid_t > m_finQueue
virtual StatusCode initialize() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Gaudi::Property< bool > m_debug
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
virtual void subProcessLogs(std::vector< std::string > &) override
virtual void reportSubprocessStatuses() override
ServiceHandle< IIncidentSvc > m_incidentSvc
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
std::map< pid_t, int > m_nProcessedEvents
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
EvtRangeProcessor(const std::string &type, const std::string &name, const IInterface *parent)
virtual ~EvtRangeProcessor() override
Gaudi::Property< std::string > m_channel2Scatterer
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
std::map< pid_t, ProcessState > m_procStates
SmartIF< IEvtSelectorSeek > m_evtSeek
int m_activeWorkers
Keep track of the number of workers.
#define COPY_FILE_HACK(_src, _dest)
const std::string process
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
::StatusCode StatusCode
StatusCode definition for legacy code.
output
Definition merge.py:16
retrieve(aClass, aKey=None)
Definition PyKernel.py:110
Trapping version of malloc.