ATLAS Offline Software
Loading...
Searching...
No Matches
SharedEvtQueueConsumer Class Referencefinalabstract

#include <SharedEvtQueueConsumer.h>

Inheritance diagram for SharedEvtQueueConsumer:
Collaboration diagram for SharedEvtQueueConsumer:

Public Member Functions

 SharedEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent)
virtual ~SharedEvtQueueConsumer () override
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
virtual void reportSubprocessStatuses () override
virtual void subProcessLogs (std::vector< std::string > &) override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
virtual void setRandString (const std::string &randStr) override
virtual void setMaxEvt (int maxEvt) override
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
virtual void killChildren () override
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS , ESRANGE_NOTFOUND , ESRANGE_SEEKFAILED , ESRANGE_PROCFAILED ,
  ESRANGE_FILENOTMADE , ESRANGE_BADINPFILE
}
enum  Func_Flag { FUNC_BOOTSTRAP , FUNC_EXEC , FUNC_FIN }

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
int updateIoReg (const std::string &rundir)
std::string fmterror (int errnum)
int reopenFds ()
int handleSavedPfc (const std::filesystem::path &dest_path)
void waitForSignal ()
IEvtSelector * evtSelector ()

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process.
int m_maxEvt {-1}
 Maximum number of events assigned to the job.
std::string m_subprocTopDir
 Top run directory for subprocesses.
std::string m_subprocDirPrefix
 For ex. "worker__".
std::string m_evtSelName
 Name of the event selector.
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
ServiceHandle< IEventProcessor > m_evtProcessor
ServiceHandle< IAppMgrUI > m_appMgr
ServiceHandle< IFileMgr > m_fileMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
SmartIF< IEvtSelector > m_evtSelector
std::string m_fileMgrLog
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
std::string m_randStr
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}

Private Types

typedef System::ProcessTime::TimeValueType TimeValType

Private Member Functions

int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
int reopenFd (int fd, const std::string &name)

Private Attributes

Gaudi::Property< bool > m_useSharedReader {this, "UseSharedReader", false, "Work in pair with a SharedReader"}
Gaudi::Property< bool > m_useSharedWriter {this, "UseSharedWriter", false, "Work in pair with a SharedWriter"}
Gaudi::Property< bool > m_isRoundRobin {this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"}
Gaudi::Property< bool > m_debug {this, "Debug", false}
Gaudi::Property< bool > m_readEventOrders {this, "ReadEventOrders", false}
Gaudi::Property< int > m_nEventsBeforeFork {this, "EventsBeforeFork", 0}
Gaudi::Property< std::string > m_eventOrdersFile {this, "EventOrdersFile", "athenamp_eventorders.txt"}
int m_rankId {-1}
int m_nSkipEvents {0}
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
SmartIF< IEventSeekm_evtSeek
SmartIF< IEvtSelectorSeekm_evtSelSeek
IEvtSelector::Context * m_evtContext {nullptr}
SmartIF< IEventSharem_evtShare
SmartIF< IDataSharem_dataShare
AthenaInterprocess::SharedQueuem_sharedEventQueue {nullptr}
std::unique_ptr< AthenaInterprocess::SharedQueuem_sharedRankQueue
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
std::queue< pid_tm_finQueue
std::vector< int > m_eventOrders
pid_t m_masterPid

Detailed Description

Definition at line 23 of file SharedEvtQueueConsumer.h.

Member Typedef Documentation

◆ TimeValType

typedef System::ProcessTime::TimeValueType SharedEvtQueueConsumer::TimeValType
private

Definition at line 76 of file SharedEvtQueueConsumer.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

Constructor & Destructor Documentation

◆ SharedEvtQueueConsumer()

SharedEvtQueueConsumer::SharedEvtQueueConsumer ( const std::string & type,
const std::string & name,
const IInterface * parent )

Definition at line 34 of file SharedEvtQueueConsumer.cxx.

37 : AthenaMPToolBase(type,name,parent)
38 , m_chronoStatSvc("ChronoStatSvc", name)
39 , m_masterPid(getpid())
40{
41 m_subprocDirPrefix = "worker_";
42}
std::string m_subprocDirPrefix
For ex. "worker__".
ServiceHandle< IChronoStatSvc > m_chronoStatSvc

◆ ~SharedEvtQueueConsumer()

SharedEvtQueueConsumer::~SharedEvtQueueConsumer ( )
overridevirtual

Definition at line 44 of file SharedEvtQueueConsumer.cxx.

45{
46}

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/5]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag flag,
pid_t pid = 0 )
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [2/5]

virtual StatusCode exec SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [3/5]

int decodeProcessResult SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess::ProcessResult * presult,
bool doFinalize )
private

◆ ATLAS_NOT_THREAD_SAFE() [4/5]

virtual int makePool SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( int maxevt,
int nprocs,
const std::string & topdir )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [5/5]

virtual StatusCode wait_once SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( pid_t & pid)
overridevirtual

Reimplemented from AthenaMPToolBase.

◆ bootstrap_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::bootstrap_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 254 of file SharedEvtQueueConsumer.cxx.

255{
257
258 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
259 outwork->data = CxxUtils::xmalloc(sizeof(int));
260 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
261 outwork->size = sizeof(int);
262
263 // For PileUp Digi Fork-After-N-Events >>>>
264 // Retrieve cuEvent-s for all background event selectors, if we forked after N events
265 std::map<IService*,int> bkgEvtSelectors;
266
267 if(m_isPileup) {
268 for(IService* ptrSvc : serviceLocator()->getServices()) {
269 IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
270 if(evtsel && (evtsel != m_evtSelector)) {
271 if(m_nEventsBeforeFork>0) {
272 IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
273 if(evtselseek) {
274 bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
275 }
276 else {
277 ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
278 return outwork;
279 }
280 }
281 else {
282 bkgEvtSelectors.emplace(ptrSvc,0);
283 }
284 }
285 }
286 }
287 // <<<< For PileUp Digi Fork-After-N-Events
288
289 // ...
290 // (possible) TODO: extend outwork with some error message, which will be eventually
291 // reported in the master proces
292 // ...
293
294 // ________________________ Get IncidentSvc and fire PostFork ________________________
295 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
296 if(!p_incidentSvc) {
297 ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
298 return outwork;
299 }
300 p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
301
302
303 // ________________________ Get RankID ________________________
304 //
305 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
306 ATH_MSG_ERROR("Unable to get rank ID!");
307 return outwork;
308 }
309 std::ostringstream workindex;
310 workindex<<m_rankId;
311
312 // ________________________ Worker dir: mkdir ________________________
313 std::filesystem::path worker_rundir(m_subprocTopDir);
314 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
315 // TODO: this "worker_" can be made configurable too
316
317 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
318 ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
319 return outwork;
320 }
321
322 // __________ Redirect logs unless we want to attach debugger ____________
323 if(!m_debug) {
324 if(redirectLog(worker_rundir.string()))
325 return outwork;
326
327 ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
328 }
329
330 // ________________________ Update Io Registry ____________________________
331 if(updateIoReg(worker_rundir.string()))
332 return outwork;
333
334 ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
335
336 // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
337 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
338 if(std::filesystem::is_regular_file("SimParams.db"))
339 COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
340 if(std::filesystem::is_regular_file("DigitParams.db"))
341 COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
342 if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
343 COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
344
345 // _______________________ Handle saved PFC (if any) ______________________
346 if(handleSavedPfc(abs_worker_rundir))
347 return outwork;
348
349 // ________________________ reopen descriptors ____________________________
350 if(reopenFds())
351 return outwork;
352
353 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
354
355
356 // ________________________ Make Shared Reader/Writer Client ________________________
358 ATH_CHECK( m_evtShare->makeClient(m_rankId), outwork);
359 }
360
362 SmartIF<IProperty> propertyServer(m_dataShare);
363 if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
364 ATH_MSG_ERROR("Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
365 return outwork;
366 }
367 else {
368 ATH_MSG_DEBUG("Successfully made the conversion service a share client");
369 }
370 }
371
372 // ________________________ I/O reinit ________________________
373 ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
374
375 // _______________ Get the value of SkipEvent ________________________
376 if(m_evtSelector) {
377 SmartIF<IProperty> propertyServer(m_evtSelector);
378 ATH_CHECK( propertyServer.isValid(), outwork);
379
380 IntegerProperty skipEventsProp("SkipEvents", m_nSkipEvents);
381 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
382 ATH_MSG_INFO("Event Selector does not have SkipEvents property");
383 }
384 else {
385 m_nSkipEvents = skipEventsProp.value();
386 }
387
388 // ________________________ Event selector restart ________________________
389 SmartIF<IService> evtSelSvc(m_evtSelector);
390 ATH_CHECK( evtSelSvc.isValid(), outwork);
391 ATH_CHECK( evtSelSvc->start(), outwork);
392 }
393 // For PileUp jobs >>>>
394 // Main event selector: advance it if we either forked after N events, or skipEvents!=0
395 // Background event selectors: restart, and advance if we forked after N events
396 if(m_isPileup) {
397 // Deal with the main event selector first
398 m_evtSelSeek = serviceLocator()->service(m_evtSelName);
399 if(!m_evtSelSeek) {
400 ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
401 return outwork;
402 }
403
406 ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
407 return outwork;
408 }
409
410 // Deal with background event selectors
411 for(auto [evtsel,curEvt] : bkgEvtSelectors) {
412 if(evtsel->start().isSuccess()) {
413 if (m_nEventsBeforeFork>0) {
414 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
415 if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
416 ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
417 return outwork;
418 }
419 }
420 }
421 else {
422 ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
423 return outwork;
424 }
425 }
426 }
427 // <<<< For PileUp jobs
428
429 // _______________________ Event orders for debugging ________________________________
431 std::fstream fs(m_eventOrdersFile,std::fstream::in);
432 if(fs.good()) {
433 ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
434 while(fs.good()){
435 std::string line;
436 std::getline(fs,line);
437 if(line.empty())continue;
438
439 // Parse the string
440 size_t idx(0);
441 int rank = std::stoi(line,&idx);
442 if(rank==m_rankId) {
443 msg(MSG::INFO) << "This worker will proces the following events #";
444 while(idx<line.size()-1) {
445 line = line.substr(idx+1);
446 int evtnum = std::stoi(line,&idx);
447 m_eventOrders.push_back(evtnum);
448 msg(MSG::INFO) << " " << evtnum;
449 }
450 msg(MSG::INFO) << endmsg;
451 }
452 }
453 if(m_eventOrders.empty()) {
454 ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
455 return outwork;
456 }
457 fs.close();
458 }
459 else {
460 ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
461 return outwork;
462 }
463 }
464
465 // ________________________ Worker dir: chdir ________________________
466 if(chdir(worker_rundir.string().c_str())==-1) {
467 ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
468 return outwork;
469 }
470
471 // ___________________ Fire UpdateAfterFork incident _________________
472 p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
473
474 // Declare success and return
475 *(int*)(outwork->data) = 0;
476 return outwork;
477}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
static Double_t fs
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
std::string m_evtSelName
Name of the event selector.
SmartIF< IEvtSelector > m_evtSelector
Gaudi::Property< bool > m_isPileup
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IIoComponentMgr > m_ioMgr
std::string fmterror(int errnum)
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
SmartIF< IEventShare > m_evtShare
Gaudi::Property< std::string > m_eventOrdersFile
std::vector< int > m_eventOrders
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_readEventOrders
Gaudi::Property< bool > m_useSharedWriter
Gaudi::Property< bool > m_useSharedReader
SmartIF< IDataShare > m_dataShare
Gaudi::Property< bool > m_debug
IEvtSelector::Context * m_evtContext
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
#define COPY_FILE_HACK(_src, _dest)
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
mkdir(path, recursive=True)
MsgStream & msg
Definition testRead.cxx:32

◆ evtSelector()

IEvtSelector * AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83{ return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 479 of file SharedEvtQueueConsumer.cxx.

480{
481 ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
482
483 bool all_ok(true);
484
485 long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
486 int nEvt(m_nEventsBeforeFork);
487 int nEventsProcessed(0);
488 long evtnumAndChunk(0);
489
490 unsigned evtCounter(0);
491 int evtnum(0), chunkSize(1);
492 auto predefinedEvt = m_eventOrders.cbegin();
493
494 // If the event orders file already exists in worker's run directory, then it's an unexpected error!
495 std::filesystem::path ordersFile(m_eventOrdersFile.value());
496 if(std::filesystem::exists(ordersFile)) {
497 ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
498 all_ok = false;
499 }
500
501 System::ProcessTime time_start = System::getProcessTime();
502 if(all_ok) {
503 std::fstream fs(m_eventOrdersFile,std::fstream::out);
504 fs << m_rankId;
505 bool firstOrder(true);
506 while(true) {
507 if(m_isRoundRobin) {
508 evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
509 if(m_maxEvt!=-1 && evtnum>=m_maxEvt+m_nSkipEvents) {
510 break;
511 }
512 evtCounter++;
513 }
514 else {
516 if(predefinedEvt==m_eventOrders.cend()) break;
517 evtnum = *predefinedEvt;
518 predefinedEvt++;
519 fs << (firstOrder?":":",") << evtnum;
520 fs.flush();
521 firstOrder=false;
522 ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
523 }
524 else {
525 if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
526 // The event queue is empty, but we should check whether there are more events to come or not
527 ATH_MSG_DEBUG("Event queue is empty");
528 usleep(1000);
529 continue;
530 }
531 if(evtnumAndChunk<=0) {
532 evtnumAndChunk *= -1;
533 ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
534 break;
535 }
536 ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
537 chunkSize = evtnumAndChunk >> (sizeof(int)*8);
538 evtnum = evtnumAndChunk & intmask;
539 ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
540
541 // Save event order
542 for(int i(0);i<chunkSize;++i) {
543 fs << (firstOrder?":":",") << evtnum+i;
544 firstOrder=false;
545 }
546 fs.flush();
547 } // Get event numbers from the shared queue
548 } // Not RoundRobin
549 nEvt+=chunkSize;
552 sc = m_evtShare->share(evtnum);
553 if(sc.isFailure()){
554 ATH_MSG_ERROR("Unable to share " << evtnum);
555 all_ok=false;
556 break;
557 }
558 else {
559 ATH_MSG_INFO("Share of " << evtnum << " succeeded");
560 }
561 }
562 else if(m_evtSelector) {
563 m_chronoStatSvc->chronoStart("AthenaMP_seek");
564 if (m_evtSeek) {
565 sc=m_evtSeek->seek(evtnum);
566 }
567 else {
568 sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
569 }
570 if(sc.isFailure()){
571 ATH_MSG_ERROR("Unable to seek to " << evtnum);
572 all_ok=false;
573 break;
574 }
575 else {
576 ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
577 }
578 m_chronoStatSvc->chronoStop("AthenaMP_seek");
579 }
580 m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
581 sc = m_evtProcessor->nextEvent(nEvt);
582 nEventsProcessed += chunkSize;
583 if(sc.isFailure()){
584 if(chunkSize==1) {
585 ATH_MSG_ERROR("Unable to process event " << evtnum);
586 }
587 else {
588 ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
589 }
590 all_ok=false;
591 break;
592 }
593 m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
594 if(m_mpRunStop->stopScheduled()) {
595 ATH_MSG_INFO("Scheduled stop");
596 break;
597 }
598 }
599 fs.close();
600 }
601 System::ProcessTime time_delta = System::getProcessTime() - time_start;
602 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
603
604 if(all_ok) {
605 if(m_evtProcessor->executeRun(0).isFailure()) {
606 ATH_MSG_ERROR("Could not finalize the Run");
607 all_ok=false;
608 }
609 else if(!m_useSharedReader && m_evtSelector) {
611 if (m_evtSeek) {
612 sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
613 }
614 else {
615 sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
616 }
617 if(sc.isFailure()) {
618 ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
619 }
620 }
621 }
622
623 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
624
625 // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
626 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
627 void* outdata = CxxUtils::xmalloc(outsize);
628 *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
630 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
631 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
632 memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
633 outwork->data = outdata;
634 outwork->size = outsize;
635 // ...
636 // (possible) TODO: extend outwork with some error message, which will be eventually
637 // reported in the master proces
638 // ...
639 return outwork;
640}
#define ATH_MSG_WARNING(x)
static Double_t sc
constexpr int pow(int base, int exp) noexcept
int m_maxEvt
Maximum number of events assigned to the job.
int m_nprocs
Number of workers spawned by the master process.
const AthenaInterprocess::IMPRunStop * m_mpRunStop
ServiceHandle< IEventProcessor > m_evtProcessor
SmartIF< IEventSeek > m_evtSeek
Gaudi::Property< bool > m_isRoundRobin
AthenaInterprocess::SharedQueue * m_sharedEventQueue
System::ProcessTime::TimeValueType TimeValType
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 642 of file SharedEvtQueueConsumer.cxx.

643{
644 ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
645
646 bool all_ok(true);
647
648 if(m_appMgr->stop().isFailure()) {
649 ATH_MSG_ERROR("Unable to stop AppMgr");
650 all_ok=false;
651 }
652 else {
653 if(m_appMgr->finalize().isFailure()) {
654 std::cerr << "Unable to finalize AppMgr" << std::endl;
655 all_ok=false;
656 }
657 }
658
659 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
660
661 // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
662 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
663 void* outdata = CxxUtils::xmalloc(outsize);
664 *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
666 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
667 int nEvt = -1;
668 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
669 TimeValType elapsed = -1;
670 memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
671
672 outwork->data = outdata;
673 outwork->size = outsize;
674
675 return outwork;
676}
ServiceHandle< IAppMgrUI > m_appMgr

◆ finalize()

StatusCode SharedEvtQueueConsumer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 96 of file SharedEvtQueueConsumer.cxx.

97{
98 if(getpid()==m_masterPid) {
99 ATH_MSG_INFO("finalize() in the master process");
100 // Merge saved event orders into one in the master run directory
101
102 // 1. Check if master run directory already contains a file with saved orders
103 // If so, then rename it with random suffix
104 std::filesystem::path ordersFile(m_eventOrdersFile.value());
105 if(std::filesystem::exists(ordersFile)) {
106 srand((unsigned)time(0));
107 std::ostringstream randname;
108 randname << rand();
109 std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
110 ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
111 ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
112
113 std::filesystem::path ordersFileBakpath(ordersFileBak);
114 std::filesystem::rename(ordersFile,ordersFileBakpath);
115 }
116
117 // 2. Merge workers event orders into the master file
118 std::fstream fs(m_eventOrdersFile,std::fstream::out);
119 for(int i=0; i<m_nprocs; ++i) {
120 std::ostringstream workerIndex;
121 workerIndex << i;
122 std::filesystem::path worker_rundir(m_subprocTopDir);
123 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
124 std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
125 ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
126 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
127 std::string line;
128 while(fs_worker.good()) {
129 std::getline(fs_worker,line);
130 fs << line << std::endl;
131 }
132 fs_worker.close();
133 }
134 fs.close();
135 } // if(getpid()==m_masterPid)
136
137 if (m_evtContext) {
138 ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
139 m_evtContext = nullptr;
140 }
141
142 return StatusCode::SUCCESS;
143}
time(flags, cells_name, *args, **kw)

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334{
335 char buf[256];
336 strerror_r(errnum, buf, sizeof(buf));
337 return std::string(buf);
338}

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr AthenaMPToolBase::generateOutputReport ( )
overridevirtualinherited

Reimplemented in EvtRangeProcessor, EvtRangeScatterer, and SharedWriterTool.

Definition at line 119 of file AthenaMPToolBase.cxx.

120{
122
123 if(m_fileMgrLog.empty()) {
124 ATH_MSG_WARNING(name() << " cannot make output report because FileMgr has not been configured to write log file!");
125 }
126 else {
127 // Collect output files made by the workers
128 std::string line;
129 for(int i=0;i<m_nprocs;++i) {
130 // Get the name of FileMgr log
131 std::ostringstream workindex;
132 workindex<<i;
133 std::filesystem::path logFilePath(m_subprocTopDir);
134 logFilePath /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
135 std::filesystem::path logFile(logFilePath);
136 logFile /= std::filesystem::path(m_fileMgrLog);
137 if(!(std::filesystem::exists(logFile)&&std::filesystem::is_regular_file(logFile))) {
138 ATH_MSG_WARNING(logFile.string() << " either does not exist or is not a regular file. Skipping");
139 continue;
140 }
141
142 ATH_MSG_DEBUG("FileMgr log file (" << i << ") " << logFile);
143
144 std::ifstream inpStream(logFile.string().c_str());
145 std::set<std::string> reportedFiles; // Don't report the same file twice
146 while(!inpStream.eof()) {
147 std::getline(inpStream,line);
148 if(line.find("WRITE")!=std::string::npos) {
149 // Parse the entry
150 size_t startpos(0);
151 std::vector<std::string> entries;
152 while(startpos<line.size()) {
153 while(line[startpos]==' ')
154 startpos++;
155
156 size_t endpos = line.find(' ',startpos);
157 if(endpos==std::string::npos) endpos = line.size();
158 entries.push_back(line.substr(startpos,endpos-startpos));
159 startpos=endpos+1;
160 }
161
162 // enties[0] is filename
163 std::filesystem::path filenamePath(entries[0]);
164 std::filesystem::path basename = filenamePath.filename();
165 if(reportedFiles.find(basename.string())==reportedFiles.end())
166 reportedFiles.insert(basename.string());
167 else
168 continue;
169 std::filesystem::path absolutename = basename.is_absolute() ? basename : std::filesystem::absolute(std::filesystem::path(logFilePath)/=basename);
170 AthenaMP::AllWorkerOutputsIterator it1 = jobOutputs->find(basename.string());
171 if(it1==jobOutputs->end()) {
172 (*jobOutputs)[basename.string()] = AthenaMP::SingleWorkerOutputs();
173 (*jobOutputs)[basename.string()].reserve(m_nprocs);
174 }
175
176 AthenaMP::WorkerOutput newOutput;
177 newOutput.filename = absolutename.string();
178 newOutput.technology = entries[1];
179 newOutput.description = entries[2];
180 newOutput.access_mode = entries[3];
181 newOutput.shared = (line.find("SHARED")!=std::string::npos);
182
183 (*jobOutputs)[basename.string()].emplace_back(std::move(newOutput));
184 }
185 }
186 }
187 }
188 return jobOutputs;
189}
std::string m_fileMgrLog
double entries
Definition listroot.cxx:49
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
std::string basename(std::string name)
Definition utils.cxx:207

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path & dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397{
398 if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399 COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400 return 0;
401}

◆ initialize()

StatusCode SharedEvtQueueConsumer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 48 of file SharedEvtQueueConsumer.cxx.

49{
50 ATH_MSG_DEBUG("In initialize");
51
53
54 // For pile-up jobs use event loop manager for seeking
55 // otherwise use event selector
56 if(m_isPileup) {
57 m_evtSeek = SmartIF<IEventSeek>(m_evtProcessor.get());
58 if(!m_evtSeek) {
59 ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
60 return StatusCode::FAILURE;
61 }
62 }
63 else if(m_evtSelector) {
64 m_evtSelSeek = serviceLocator()->service(m_evtSelName);
65 ATH_CHECK(m_evtSelSeek.isValid());
66 }
67
68 if(m_evtSelector) {
69 ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
70
71 m_evtShare = serviceLocator()->service(m_evtSelName);
72 if(!m_evtShare) {
74 ATH_MSG_ERROR("Error retrieving IEventShare");
75 return StatusCode::FAILURE;
76 }
77 ATH_MSG_INFO("Could not retrieve IEventShare");
78 }
79
80 //FIXME: AthenaPool dependent for now
81
83 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
84 if(!m_dataShare) {
85 ATH_MSG_ERROR("Error retrieving AthenaPoolSharedIOCnvSvc");
86 return StatusCode::FAILURE;
87 }
88 }
89 }
90
91 ATH_CHECK(m_chronoStatSvc.retrieve());
92
93 return StatusCode::SUCCESS;
94}
virtual StatusCode initialize() override

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202{
203 for(const AthenaInterprocess::Process& child : m_processGroup->getChildren()) {
204 kill(child.getProcessID(),SIGKILL);
205 }
206}
AthenaInterprocess::ProcessGroup * m_processGroup

◆ operator()

virtual std::unique_ptr< ScheduledWork > AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string & rundir,
bool addTimeStamp = true )
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270{
271 // Redirect both stdout and stderr to the same file AthenaMP.log
272 int dup2result1(0), dup2result2(0);
273
274 int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275 if(newout==-1) {
276 ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277 return -1;
278 }
279 dup2result1 = dup2(newout, STDOUT_FILENO);
280 dup2result2 = dup2(newout, STDERR_FILENO);
281 TEMP_FAILURE_RETRY(close(newout));
282 if(dup2result1==-1) {
283 ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284 return -1;
285 }
286 if(dup2result2==-1) {
287 ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288 return -1;
289 }
290
291 if(addTimeStamp) {
292 SmartIF<IProperty> propertyServer(msgSvc());
293 if(propertyServer==0) {
294 ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295 return -1;
296 }
297
298 std::string propertyName("Format");
299 std::string oldFormat("");
300 StringProperty formatProp(propertyName,oldFormat);
301 StatusCode sc = propertyServer->getProperty(&formatProp);
302 if(sc.isFailure()) {
303 ATH_MSG_WARNING("Message Service does not have Format property");
304 }
305 else {
306 oldFormat = formatProp.value();
307 if(oldFormat.find("%t")==std::string::npos) {
308 // Add time stamps
309 std::string newFormat("%t " + oldFormat);
310 StringProperty newFormatProp(std::move(propertyName),newFormat);
311 ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312 }
313 else {
314 ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315 }
316 }
317 }
318
319 return 0;
320}
msgSvc
Provide convenience handles for various services.
Definition StdJOSetup.py:36
@ open
Definition BinningType.h:40

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int fd,
const std::string & name )
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420{
421 ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422 int old_openflags = fcntl(fd,F_GETFL,0);
423 switch(old_openflags & O_ACCMODE) {
424 case O_RDONLY: {
425 ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426 break;
427 }
428 case O_WRONLY: {
429 ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430 break;
431 }
432 case O_RDWR: {
433 ATH_MSG_DEBUG("The File Access Mode is RDWR");
434 break;
435 }
436 }
437
438 int old_descflags = fcntl(fd,F_GETFD,0);
439 off_t oldpos = lseek(fd,0,SEEK_CUR);
440 if(oldpos==-1) {
441 if(errno==ESPIPE) {
442 ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443 }
444 else {
445 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446 return -1;
447 }
448 }
449 else {
450 Io::Fd newfd = open(name.c_str(),old_openflags);
451 if(newfd==-1) {
452 ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453 return -1;
454 }
455 if(lseek(newfd,oldpos,SEEK_SET)==-1){
456 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457 TEMP_FAILURE_RETRY(close(newfd));
458 return -1;
459 }
460 TEMP_FAILURE_RETRY(close(fd));
461 if(dup2(newfd,fd)==-1) {
462 ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463 TEMP_FAILURE_RETRY(close(newfd));
464 return -1;
465 }
466 if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467 ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468 TEMP_FAILURE_RETRY(close(newfd));
469 return -1;
470 }
471 TEMP_FAILURE_RETRY(close(newfd));
472 }
473 return 0;
474}

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341{
342 // Reopen file descriptors.
343 // First go over all open files, which have been registered with the FileMgr
344 // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345 std::set<int> fdLog;
346
347 // Query the FileMgr contents
348 std::vector<const Io::FileAttr*> filemgrFiles;
349 std::vector<const Io::FileAttr*>::const_iterator itFile;
350 unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351 if(filenum!=filemgrFiles.size())
352 ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353
354 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355 ATH_MSG_DEBUG("* " << **itFile);
356 const std::string& filename = (**itFile).name();
357 Io::Fd fd = (**itFile).fd();
358
359 if(fd==-1) {
360 // It is legal to have fd=-1 for remote inputs
361 // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362 // So, this hopefully is a temporary patch
363 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364 continue;
365 }
366
367 if(reopenFd(fd,filename))
368 return -1;
369
370 fdLog.insert(fd);
371 }
372
373 // Check the FdsRegistry
374 for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376 ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377 }
378 else {
379 ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380
381 if(regEntry.fd==-1) {
382 // Same protection as the one above
383 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384 continue;
385 }
386
387 if(reopenFd(regEntry.fd,regEntry.name))
388 return -1;
389
390 fdLog.insert(regEntry.fd);
391 }
392 }
393 return 0;
394}
int reopenFd(int fd, const std::string &name)
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
ServiceHandle< IFileMgr > m_fileMgr

◆ reportSubprocessStatuses()

void SharedEvtQueueConsumer::reportSubprocessStatuses ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 223 of file SharedEvtQueueConsumer.cxx.

224{
225 ATH_MSG_INFO("Statuses of event processors");
226 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
227 for(size_t i=0; i<statuses.size(); ++i) {
228 // Get the number of events processed by this worker
229 auto it = m_eventStat.find(statuses[i].pid);
230 msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
231 << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
232 << ". Number of events processed: ";
233 if(it==m_eventStat.end())
234 msg(MSG::INFO) << "N/A" << endmsg;
235 else
236 msg(MSG::INFO) << it->second.first
237 << ", Event Loop Time: " << it->second.second << "sec."
238 << endmsg;
239 }
240}
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
list statuses

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44{m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop * runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45{m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string & randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197{
198 m_randStr = randStr;
199}

◆ subProcessLogs()

void SharedEvtQueueConsumer::subProcessLogs ( std::vector< std::string > & filenames)
overridevirtual

Definition at line 242 of file SharedEvtQueueConsumer.cxx.

243{
244 filenames.clear();
245 for(int i=0; i<m_nprocs; ++i) {
246 std::ostringstream workerIndex;
247 workerIndex << i;
248 std::filesystem::path worker_rundir(m_subprocTopDir);
249 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
250 filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
251 }
252}
list filenames
Definition grepfile.py:34

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string & rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323{
324 ATH_CHECK(m_ioMgr.retrieve(), -1);
325
326 // update the IoRegistry for the new workdir - make sure we use absolute path
327 std::filesystem::path abs_rundir = std::filesystem::absolute(rundir);
328 ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329
330 return 0;
331}

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry > registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192{
193 m_fdsRegistry = std::move(registry);
194}

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404{
405 ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406 sigset_t mask, oldmask;
407
409
410 sigemptyset (&mask);
411 sigaddset (&mask, SIGUSR1);
412
413 sigprocmask (SIG_BLOCK, &mask, &oldmask);
415 sigsuspend (&oldmask);
416 sigprocmask (SIG_UNBLOCK, &mask, NULL);
417}
#define sigemptyset(x)
Definition SealSignal.h:82
#define sigaddset(x, y)
Definition SealSignal.h:84
int sigset_t
Definition SealSignal.h:80
std::atomic< bool > sig_done

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_chronoStatSvc

ServiceHandle<IChronoStatSvc> SharedEvtQueueConsumer::m_chronoStatSvc
private

Definition at line 66 of file SharedEvtQueueConsumer.h.

◆ m_dataShare

SmartIF<IDataShare> SharedEvtQueueConsumer::m_dataShare
private

Definition at line 71 of file SharedEvtQueueConsumer.h.

◆ m_debug

Gaudi::Property<bool> SharedEvtQueueConsumer::m_debug {this, "Debug", false}
private

Definition at line 58 of file SharedEvtQueueConsumer.h.

58{this, "Debug", false};

◆ m_eventOrders

std::vector<int> SharedEvtQueueConsumer::m_eventOrders
private

Definition at line 81 of file SharedEvtQueueConsumer.h.

◆ m_eventOrdersFile

Gaudi::Property<std::string> SharedEvtQueueConsumer::m_eventOrdersFile {this, "EventOrdersFile", "athenamp_eventorders.txt"}
private

Definition at line 61 of file SharedEvtQueueConsumer.h.

61{this, "EventOrdersFile", "athenamp_eventorders.txt"};

◆ m_eventStat

std::map<pid_t,std::pair<int,TimeValType> > SharedEvtQueueConsumer::m_eventStat
private

Definition at line 77 of file SharedEvtQueueConsumer.h.

◆ m_evtContext

IEvtSelector::Context* SharedEvtQueueConsumer::m_evtContext {nullptr}
private

Definition at line 69 of file SharedEvtQueueConsumer.h.

69{nullptr};

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSeek

SmartIF<IEventSeek> SharedEvtQueueConsumer::m_evtSeek
private

Definition at line 67 of file SharedEvtQueueConsumer.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_evtSelSeek

SmartIF<IEvtSelectorSeek> SharedEvtQueueConsumer::m_evtSelSeek
private

Definition at line 68 of file SharedEvtQueueConsumer.h.

◆ m_evtShare

SmartIF<IEventShare> SharedEvtQueueConsumer::m_evtShare
private

Definition at line 70 of file SharedEvtQueueConsumer.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_finQueue

std::queue<pid_t> SharedEvtQueueConsumer::m_finQueue
private

Definition at line 78 of file SharedEvtQueueConsumer.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

103{this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"};

◆ m_isRoundRobin

Gaudi::Property<bool> SharedEvtQueueConsumer::m_isRoundRobin {this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"}
private

Definition at line 57 of file SharedEvtQueueConsumer.h.

57{this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"};

◆ m_masterPid

pid_t SharedEvtQueueConsumer::m_masterPid
private

Definition at line 82 of file SharedEvtQueueConsumer.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

86{-1};

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

92{nullptr};

◆ m_nEventsBeforeFork

Gaudi::Property<int> SharedEvtQueueConsumer::m_nEventsBeforeFork {this, "EventsBeforeFork", 0}
private

Definition at line 60 of file SharedEvtQueueConsumer.h.

60{this, "EventsBeforeFork", 0};

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

85{-1};

◆ m_nSkipEvents

int SharedEvtQueueConsumer::m_nSkipEvents {0}
private

Definition at line 64 of file SharedEvtQueueConsumer.h.

64{0};

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

91{nullptr};

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_rankId

int SharedEvtQueueConsumer::m_rankId {-1}
private

Definition at line 63 of file SharedEvtQueueConsumer.h.

63{-1}; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)

◆ m_readEventOrders

Gaudi::Property<bool> SharedEvtQueueConsumer::m_readEventOrders {this, "ReadEventOrders", false}
private

Definition at line 59 of file SharedEvtQueueConsumer.h.

59{this, "ReadEventOrders", false};

◆ m_sharedEventQueue

AthenaInterprocess::SharedQueue* SharedEvtQueueConsumer::m_sharedEventQueue {nullptr}
private

Definition at line 73 of file SharedEvtQueueConsumer.h.

73{nullptr};

◆ m_sharedRankQueue

std::unique_ptr<AthenaInterprocess::SharedQueue> SharedEvtQueueConsumer::m_sharedRankQueue
private

Definition at line 74 of file SharedEvtQueueConsumer.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.

◆ m_useSharedReader

Gaudi::Property<bool> SharedEvtQueueConsumer::m_useSharedReader {this, "UseSharedReader", false, "Work in pair with a SharedReader"}
private

Definition at line 55 of file SharedEvtQueueConsumer.h.

55{this, "UseSharedReader", false, "Work in pair with a SharedReader"};

◆ m_useSharedWriter

Gaudi::Property<bool> SharedEvtQueueConsumer::m_useSharedWriter {this, "UseSharedWriter", false, "Work in pair with a SharedWriter"}
private

Definition at line 56 of file SharedEvtQueueConsumer.h.

56{this, "UseSharedWriter", false, "Work in pair with a SharedWriter"};

The documentation for this class was generated from the following files: