ATLAS Offline Software
Loading...
Searching...
No Matches
SharedEvtQueueConsumer.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
14#include "CxxUtils/xmalloc.h"
15#include "GaudiKernel/IEvtSelector.h"
16#include "GaudiKernel/IIoComponentMgr.h"
17#include "GaudiKernel/IFileMgr.h"
18#include "GaudiKernel/IChronoStatSvc.h"
19#include "GaudiKernel/ISvcLocator.h"
20#include "GaudiKernel/IIncidentSvc.h"
21#include "GaudiKernel/IConversionSvc.h"
22
23#include <filesystem>
24
25#include <sys/stat.h>
26#include <sstream>
27#include <fstream>
28#include <unistd.h>
29#include <stdio.h>
30#include <stdint.h>
31#include <stdexcept>
32#include <cmath> // For pow
33
35 , const std::string& name
36 , const IInterface* parent)
37 : AthenaMPToolBase(type,name,parent)
38 , m_chronoStatSvc("ChronoStatSvc", name)
39 , m_masterPid(getpid())
40{
41 m_subprocDirPrefix = "worker_";
42}
43
47
49{
50 ATH_MSG_DEBUG("In initialize");
51
53
54 // For pile-up jobs use event loop manager for seeking
55 // otherwise use event selector
56 if(m_isPileup) {
57 m_evtSeek = SmartIF<IEventSeek>(m_evtProcessor.get());
58 if(!m_evtSeek) {
59 ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
60 return StatusCode::FAILURE;
61 }
62 }
63 else if(m_evtSelector) {
64 m_evtSelSeek = serviceLocator()->service(m_evtSelName);
65 ATH_CHECK(m_evtSelSeek.isValid());
66 }
67
68 if(m_evtSelector) {
69 ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
70
71 m_evtShare = serviceLocator()->service(m_evtSelName);
72 if(!m_evtShare) {
74 ATH_MSG_ERROR("Error retrieving IEventShare");
75 return StatusCode::FAILURE;
76 }
77 ATH_MSG_INFO("Could not retrieve IEventShare");
78 }
79
80 //FIXME: AthenaPool dependent for now
81
83 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
84 if(!m_dataShare) {
85 ATH_MSG_ERROR("Error retrieving AthenaPoolSharedIOCnvSvc");
86 return StatusCode::FAILURE;
87 }
88 }
89 }
90
91 ATH_CHECK(m_chronoStatSvc.retrieve());
92
93 return StatusCode::SUCCESS;
94}
95
97{
98 if(getpid()==m_masterPid) {
99 ATH_MSG_INFO("finalize() in the master process");
100 // Merge saved event orders into one in the master run directory
101
102 // 1. Check if master run directory already contains a file with saved orders
103 // If so, then rename it with random suffix
104 std::filesystem::path ordersFile(m_eventOrdersFile.value());
105 if(std::filesystem::exists(ordersFile)) {
106 srand((unsigned)time(0));
107 std::ostringstream randname;
108 randname << rand();
109 std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
110 ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
111 ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
112
113 std::filesystem::path ordersFileBakpath(ordersFileBak);
114 std::filesystem::rename(ordersFile,ordersFileBakpath);
115 }
116
117 // 2. Merge workers event orders into the master file
118 std::fstream fs(m_eventOrdersFile,std::fstream::out);
119 for(int i=0; i<m_nprocs; ++i) {
120 std::ostringstream workerIndex;
121 workerIndex << i;
122 std::filesystem::path worker_rundir(m_subprocTopDir);
123 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
124 std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
125 ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
126 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
127 std::string line;
128 while(fs_worker.good()) {
129 std::getline(fs_worker,line);
130 fs << line << std::endl;
131 }
132 fs_worker.close();
133 }
134 fs.close();
135 } // if(getpid()==m_masterPid)
136
137 if (m_evtContext) {
138 ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
139 m_evtContext = nullptr;
140 }
141
142 return StatusCode::SUCCESS;
143}
144
145int SharedEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
146{
147 ATH_MSG_DEBUG("In makePool " << getpid());
148
149 if(nprocs==0 || nprocs<-1) {
150 ATH_MSG_ERROR("Invalid value for the nprocs parameter: " << nprocs);
151 return -1;
152 }
153
154 if(topdir.empty()) {
155 ATH_MSG_ERROR("Empty name for the top directory!");
156 return -1;
157 }
158
159 m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
160 m_subprocTopDir = topdir;
161
162 // Get the shared event queue
163 ATH_MSG_DEBUG("Event queue name AthenaMPEventQueue_" << m_randStr);
164 ATH_CHECK( detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr), -1);
165
166 // Create rank queue and fill it
167 m_sharedRankQueue = std::make_unique<AthenaInterprocess::SharedQueue>("SharedEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
168 for(int i=0; i<m_nprocs; ++i)
169 if(!m_sharedRankQueue->send_basic<int>(i)) {
170 ATH_MSG_ERROR("Unable to send int to the ranks queue!");
171 return -1;
172 }
173
174 // Create the process group and map_async bootstrap
175 m_processGroup = new AthenaInterprocess::ProcessGroup(m_nprocs);
176 ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
177 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
178 return -1;
179 ATH_MSG_INFO("Workers bootstrapped");
180
181 return m_nprocs;
182}
183
184StatusCode SharedEvtQueueConsumer::exec()
185{
186 ATH_MSG_DEBUG("In exec " << getpid());
187
188 if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
189 return StatusCode::FAILURE;
190 ATH_MSG_INFO("Workers started processing events");
191
192 return StatusCode::SUCCESS;
193}
194
195StatusCode SharedEvtQueueConsumer::wait_once(pid_t& pid)
196{
197 StatusCode sc = AthenaMPToolBase::wait_once(pid);
198 AthenaInterprocess::ProcessResult* presult(0);
199 if(sc.isFailure()) {
200 // We are to stop waiting. Pull all available ProcessResults from the queue
201 // Don't serialize finalizations
202 do {
203 presult = m_processGroup->pullOneResult();
204 if(presult && (unsigned)(presult->output.size)>sizeof(int))
205 decodeProcessResult(presult,false);
206 if(presult) free(presult->output.data);
207 delete presult;
208 } while(presult);
209 }
210 else {
211 // Pull one result and decode it if necessary
212 presult = m_processGroup->pullOneResult();
213 int res(0);
214 if(presult && (unsigned)(presult->output.size)>sizeof(int))
215 res = decodeProcessResult(presult,true);
216 if(presult) free(presult->output.data);
217 delete presult;
218 if(res) return StatusCode::FAILURE;
219 }
220 return sc;
221}
222
224{
225 ATH_MSG_INFO("Statuses of event processors");
226 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
227 for(size_t i=0; i<statuses.size(); ++i) {
228 // Get the number of events processed by this worker
229 auto it = m_eventStat.find(statuses[i].pid);
230 msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
231 << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
232 << ". Number of events processed: ";
233 if(it==m_eventStat.end())
234 msg(MSG::INFO) << "N/A" << endmsg;
235 else
236 msg(MSG::INFO) << it->second.first
237 << ", Event Loop Time: " << it->second.second << "sec."
238 << endmsg;
239 }
240}
241
242void SharedEvtQueueConsumer::subProcessLogs(std::vector<std::string>& filenames)
243{
244 filenames.clear();
245 for(int i=0; i<m_nprocs; ++i) {
246 std::ostringstream workerIndex;
247 workerIndex << i;
248 std::filesystem::path worker_rundir(m_subprocTopDir);
249 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
250 filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
251 }
252}
253
254std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::bootstrap_func()
255{
257
258 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
259 outwork->data = CxxUtils::xmalloc(sizeof(int));
260 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
261 outwork->size = sizeof(int);
262
263 // For PileUp Digi Fork-After-N-Events >>>>
264 // Retrieve cuEvent-s for all background event selectors, if we forked after N events
265 std::map<IService*,int> bkgEvtSelectors;
266
267 if(m_isPileup) {
268 for(IService* ptrSvc : serviceLocator()->getServices()) {
269 IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
270 if(evtsel && (evtsel != m_evtSelector)) {
271 if(m_nEventsBeforeFork>0) {
272 IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
273 if(evtselseek) {
274 bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
275 }
276 else {
277 ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
278 return outwork;
279 }
280 }
281 else {
282 bkgEvtSelectors.emplace(ptrSvc,0);
283 }
284 }
285 }
286 }
287 // <<<< For PileUp Digi Fork-After-N-Events
288
289 // ...
290 // (possible) TODO: extend outwork with some error message, which will be eventually
291 // reported in the master proces
292 // ...
293
294 // ________________________ Get IncidentSvc and fire PostFork ________________________
295 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
296 if(!p_incidentSvc) {
297 ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
298 return outwork;
299 }
300 p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
301
302
303 // ________________________ Get RankID ________________________
304 //
305 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
306 ATH_MSG_ERROR("Unable to get rank ID!");
307 return outwork;
308 }
309 std::ostringstream workindex;
310 workindex<<m_rankId;
311
312 // ________________________ Worker dir: mkdir ________________________
313 std::filesystem::path worker_rundir(m_subprocTopDir);
314 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
315 // TODO: this "worker_" can be made configurable too
316
317 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
318 ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
319 return outwork;
320 }
321
322 // __________ Redirect logs unless we want to attach debugger ____________
323 if(!m_debug) {
324 if(redirectLog(worker_rundir.string()))
325 return outwork;
326
327 ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
328 }
329
330 // ________________________ Update Io Registry ____________________________
331 if(updateIoReg(worker_rundir.string()))
332 return outwork;
333
334 ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
335
336 // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
337 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
338 if(std::filesystem::is_regular_file("SimParams.db"))
339 COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
340 if(std::filesystem::is_regular_file("DigitParams.db"))
341 COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
342 if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
343 COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
344
345 // _______________________ Handle saved PFC (if any) ______________________
346 if(handleSavedPfc(abs_worker_rundir))
347 return outwork;
348
349 // ________________________ reopen descriptors ____________________________
350 if(reopenFds())
351 return outwork;
352
353 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
354
355
356 // ________________________ Make Shared Reader/Writer Client ________________________
358 ATH_CHECK( m_evtShare->makeClient(m_rankId), outwork);
359 }
360
362 SmartIF<IProperty> propertyServer(m_dataShare);
363 if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
364 ATH_MSG_ERROR("Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
365 return outwork;
366 }
367 else {
368 ATH_MSG_DEBUG("Successfully made the conversion service a share client");
369 }
370 }
371
372 // ________________________ I/O reinit ________________________
373 ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
374
375 // _______________ Get the value of SkipEvent ________________________
376 if(m_evtSelector) {
377 SmartIF<IProperty> propertyServer(m_evtSelector);
378 ATH_CHECK( propertyServer.isValid(), outwork);
379
380 IntegerProperty skipEventsProp("SkipEvents", m_nSkipEvents);
381 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
382 ATH_MSG_INFO("Event Selector does not have SkipEvents property");
383 }
384 else {
385 m_nSkipEvents = skipEventsProp.value();
386 }
387
388 // ________________________ Event selector restart ________________________
389 SmartIF<IService> evtSelSvc(m_evtSelector);
390 ATH_CHECK( evtSelSvc.isValid(), outwork);
391 ATH_CHECK( evtSelSvc->start(), outwork);
392 }
393 // For PileUp jobs >>>>
394 // Main event selector: advance it if we either forked after N events, or skipEvents!=0
395 // Background event selectors: restart, and advance if we forked after N events
396 if(m_isPileup) {
397 // Deal with the main event selector first
398 m_evtSelSeek = serviceLocator()->service(m_evtSelName);
399 if(!m_evtSelSeek) {
400 ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
401 return outwork;
402 }
403
406 ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
407 return outwork;
408 }
409
410 // Deal with background event selectors
411 for(auto [evtsel,curEvt] : bkgEvtSelectors) {
412 if(evtsel->start().isSuccess()) {
413 if (m_nEventsBeforeFork>0) {
414 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
415 if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
416 ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
417 return outwork;
418 }
419 }
420 }
421 else {
422 ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
423 return outwork;
424 }
425 }
426 }
427 // <<<< For PileUp jobs
428
429 // _______________________ Event orders for debugging ________________________________
431 std::fstream fs(m_eventOrdersFile,std::fstream::in);
432 if(fs.good()) {
433 ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
434 while(fs.good()){
435 std::string line;
436 std::getline(fs,line);
437 if(line.empty())continue;
438
439 // Parse the string
440 size_t idx(0);
441 int rank = std::stoi(line,&idx);
442 if(rank==m_rankId) {
443 msg(MSG::INFO) << "This worker will proces the following events #";
444 while(idx<line.size()-1) {
445 line = line.substr(idx+1);
446 int evtnum = std::stoi(line,&idx);
447 m_eventOrders.push_back(evtnum);
448 msg(MSG::INFO) << " " << evtnum;
449 }
450 msg(MSG::INFO) << endmsg;
451 }
452 }
453 if(m_eventOrders.empty()) {
454 ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
455 return outwork;
456 }
457 fs.close();
458 }
459 else {
460 ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
461 return outwork;
462 }
463 }
464
465 // ________________________ Worker dir: chdir ________________________
466 if(chdir(worker_rundir.string().c_str())==-1) {
467 ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
468 return outwork;
469 }
470
471 // ___________________ Fire UpdateAfterFork incident _________________
472 p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
473
474 // Declare success and return
475 *(int*)(outwork->data) = 0;
476 return outwork;
477}
478
479std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::exec_func()
480{
481 ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
482
483 bool all_ok(true);
484
485 long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
486 int nEvt(m_nEventsBeforeFork);
487 int nEventsProcessed(0);
488 long evtnumAndChunk(0);
489
490 unsigned evtCounter(0);
491 int evtnum(0), chunkSize(1);
492 auto predefinedEvt = m_eventOrders.cbegin();
493
494 // If the event orders file already exists in worker's run directory, then it's an unexpected error!
495 std::filesystem::path ordersFile(m_eventOrdersFile.value());
496 if(std::filesystem::exists(ordersFile)) {
497 ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
498 all_ok = false;
499 }
500
501 System::ProcessTime time_start = System::getProcessTime();
502 if(all_ok) {
503 std::fstream fs(m_eventOrdersFile,std::fstream::out);
504 fs << m_rankId;
505 bool firstOrder(true);
506 while(true) {
507 if(m_isRoundRobin) {
508 evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
509 if(m_maxEvt!=-1 && evtnum>=m_maxEvt+m_nSkipEvents) {
510 break;
511 }
512 evtCounter++;
513 }
514 else {
516 if(predefinedEvt==m_eventOrders.cend()) break;
517 evtnum = *predefinedEvt;
518 predefinedEvt++;
519 fs << (firstOrder?":":",") << evtnum;
520 fs.flush();
521 firstOrder=false;
522 ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
523 }
524 else {
525 if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
526 // The event queue is empty, but we should check whether there are more events to come or not
527 ATH_MSG_DEBUG("Event queue is empty");
528 usleep(1000);
529 continue;
530 }
531 if(evtnumAndChunk<=0) {
532 evtnumAndChunk *= -1;
533 ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
534 break;
535 }
536 ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
537 chunkSize = evtnumAndChunk >> (sizeof(int)*8);
538 evtnum = evtnumAndChunk & intmask;
539 ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
540
541 // Save event order
542 for(int i(0);i<chunkSize;++i) {
543 fs << (firstOrder?":":",") << evtnum+i;
544 firstOrder=false;
545 }
546 fs.flush();
547 } // Get event numbers from the shared queue
548 } // Not RoundRobin
549 nEvt+=chunkSize;
550 StatusCode sc;
552 sc = m_evtShare->share(evtnum);
553 if(sc.isFailure()){
554 ATH_MSG_ERROR("Unable to share " << evtnum);
555 all_ok=false;
556 break;
557 }
558 else {
559 ATH_MSG_INFO("Share of " << evtnum << " succeeded");
560 }
561 }
562 else if(m_evtSelector) {
563 m_chronoStatSvc->chronoStart("AthenaMP_seek");
564 if (m_evtSeek) {
565 sc=m_evtSeek->seek(evtnum);
566 }
567 else {
568 sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
569 }
570 if(sc.isFailure()){
571 ATH_MSG_ERROR("Unable to seek to " << evtnum);
572 all_ok=false;
573 break;
574 }
575 else {
576 ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
577 }
578 m_chronoStatSvc->chronoStop("AthenaMP_seek");
579 }
580 m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
581 sc = m_evtProcessor->nextEvent(nEvt);
582 nEventsProcessed += chunkSize;
583 if(sc.isFailure()){
584 if(chunkSize==1) {
585 ATH_MSG_ERROR("Unable to process event " << evtnum);
586 }
587 else {
588 ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
589 }
590 all_ok=false;
591 break;
592 }
593 m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
594 if(m_mpRunStop->stopScheduled()) {
595 ATH_MSG_INFO("Scheduled stop");
596 break;
597 }
598 }
599 fs.close();
600 }
601 System::ProcessTime time_delta = System::getProcessTime() - time_start;
602 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
603
604 if(all_ok) {
605 if(m_evtProcessor->executeRun(0).isFailure()) {
606 ATH_MSG_ERROR("Could not finalize the Run");
607 all_ok=false;
608 }
609 else if(!m_useSharedReader && m_evtSelector) {
610 StatusCode sc;
611 if (m_evtSeek) {
612 sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
613 }
614 else {
615 sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
616 }
617 if(sc.isFailure()) {
618 ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
619 }
620 }
621 }
622
623 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
624
625 // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
626 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
627 void* outdata = CxxUtils::xmalloc(outsize);
628 *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
630 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
631 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
632 memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
633 outwork->data = outdata;
634 outwork->size = outsize;
635 // ...
636 // (possible) TODO: extend outwork with some error message, which will be eventually
637 // reported in the master proces
638 // ...
639 return outwork;
640}
641
642std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::fin_func()
643{
644 ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
645
646 bool all_ok(true);
647
648 if(m_appMgr->stop().isFailure()) {
649 ATH_MSG_ERROR("Unable to stop AppMgr");
650 all_ok=false;
651 }
652 else {
653 if(m_appMgr->finalize().isFailure()) {
654 std::cerr << "Unable to finalize AppMgr" << std::endl;
655 all_ok=false;
656 }
657 }
658
659 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
660
661 // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
662 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
663 void* outdata = CxxUtils::xmalloc(outsize);
664 *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
666 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
667 int nEvt = -1;
668 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
669 TimeValType elapsed = -1;
670 memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
671
672 outwork->data = outdata;
673 outwork->size = outsize;
674
675 return outwork;
676}
677
678int SharedEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
679{
680 if(!presult) return 0;
681 const AthenaInterprocess::ScheduledWork& output = presult->output;
682 ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
683 if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType)) return 0;
684
686 memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
688 // Store the number of processed events
689 int nevt(0);
690 TimeValType elapsed(0);
691 memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
692 memcpy(&elapsed,(char*)output.data+2*+sizeof(int)+sizeof(func),sizeof(TimeValType));
693 m_eventStat[presult->pid]=std::pair<int,TimeValType>(nevt,elapsed);
694 ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events in " << elapsed << "sec.");
695
696 if(doFinalize) {
697 // Add PID to the finalization queue
698 m_finQueue.push(presult->pid);
699 ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
700
701 // If this is the only element in the queue then start its finalization
702 // Otherwise it has to wait its turn until all previous processes have been finalized
703 if(m_finQueue.size()==1) {
704 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
705 || m_processGroup->map_async(0,0,presult->pid)) {
706 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
707 return 1;
708 }
709 else {
710 ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
711 }
712 }
713 }
714 }
715 else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
716 ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
717 pid_t pid = m_finQueue.front();
718 if(pid==presult->pid) {
719 // pid received as expected. Remove it from the queue
720 m_finQueue.pop();
721 ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
722 // Schedule finalization of the next processe in the queue
723 if(m_finQueue.size()) {
724 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
725 || m_processGroup->map_async(0,0,m_finQueue.front())) {
726 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
727 return 1;
728 }
729 else {
730 ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
731 }
732 }
733 }
734 else {
735 // Error: unexpected pid received from presult
736 ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
737 return 1;
738 }
739 }
740
741 return 0;
742}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
int32_t pid_t
Abstract interface for seeking within an event stream.
Extension to IEvtSelector to allow for seeking.
std::pair< std::vector< unsigned int >, bool > res
static Double_t fs
static Double_t sc
constexpr int pow(int base, int exp) noexcept
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
std::string m_evtSelName
Name of the event selector.
SmartIF< IEvtSelector > m_evtSelector
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
int m_maxEvt
Maximum number of events assigned to the job.
virtual StatusCode initialize() override
Gaudi::Property< bool > m_isPileup
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IAppMgrUI > m_appMgr
int m_nprocs
Number of workers spawned by the master process.
const AthenaInterprocess::IMPRunStop * m_mpRunStop
ServiceHandle< IIoComponentMgr > m_ioMgr
AthenaInterprocess::ProcessGroup * m_processGroup
ServiceHandle< IEventProcessor > m_evtProcessor
std::string m_subprocDirPrefix
For ex. "worker__".
std::string fmterror(int errnum)
Abstract interface for seeking for an event selector.
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
virtual StatusCode finalize() override
virtual StatusCode initialize() override
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
SharedEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
SmartIF< IEventShare > m_evtShare
std::queue< pid_t > m_finQueue
Gaudi::Property< std::string > m_eventOrdersFile
SmartIF< IEventSeek > m_evtSeek
std::vector< int > m_eventOrders
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_readEventOrders
Gaudi::Property< bool > m_useSharedWriter
Gaudi::Property< bool > m_useSharedReader
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual void reportSubprocessStatuses() override
SmartIF< IDataShare > m_dataShare
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Gaudi::Property< bool > m_isRoundRobin
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Gaudi::Property< bool > m_debug
IEvtSelector::Context * m_evtContext
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual ~SharedEvtQueueConsumer() override
System::ProcessTime::TimeValueType TimeValType
virtual void subProcessLogs(std::vector< std::string > &) override
#define COPY_FILE_HACK(_src, _dest)
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
::StatusCode StatusCode
StatusCode definition for legacy code.
retrieve(aClass, aKey=None)
Definition PyKernel.py:110
MsgStream & msg
Definition testRead.cxx:32
Trapping version of malloc.