ATLAS Offline Software
Loading...
Searching...
No Matches
EvtRangeProcessor Class Referencefinalabstract

#include <EvtRangeProcessor.h>

Inheritance diagram for EvtRangeProcessor:
Collaboration diagram for EvtRangeProcessor:

Public Member Functions

 EvtRangeProcessor (const std::string &type, const std::string &name, const IInterface *parent)
virtual ~EvtRangeProcessor () override
virtual StatusCode initialize () override
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
virtual void reportSubprocessStatuses () override
virtual void subProcessLogs (std::vector< std::string > &) override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
virtual StatusCode finalize () override
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
virtual void setRandString (const std::string &randStr) override
virtual void setMaxEvt (int maxEvt) override
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
virtual void killChildren () override
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS , ESRANGE_NOTFOUND , ESRANGE_SEEKFAILED , ESRANGE_PROCFAILED ,
  ESRANGE_FILENOTMADE , ESRANGE_BADINPFILE
}
enum  Func_Flag { FUNC_BOOTSTRAP , FUNC_EXEC , FUNC_FIN }

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
int updateIoReg (const std::string &rundir)
std::string fmterror (int errnum)
int reopenFds ()
int handleSavedPfc (const std::filesystem::path &dest_path)
void waitForSignal ()
IEvtSelector * evtSelector ()

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process.
int m_maxEvt {-1}
 Maximum number of events assigned to the job.
std::string m_subprocTopDir
 Top run directory for subprocesses.
std::string m_subprocDirPrefix
 For ex. "worker__".
std::string m_evtSelName
 Name of the event selector.
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
ServiceHandle< IEventProcessor > m_evtProcessor
ServiceHandle< IAppMgrUI > m_appMgr
ServiceHandle< IFileMgr > m_fileMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
SmartIF< IEvtSelector > m_evtSelector
std::string m_fileMgrLog
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
std::string m_randStr
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}

Private Types

enum  ProcessState { PROC_STATE_INIT , PROC_STATE_EXEC , PROC_STATE_FIN , PROC_STATE_STOP }

Private Member Functions

StatusCode startProcess ATLAS_NOT_THREAD_SAFE ()
StatusCode setNewInputFile (const std::string &newFile)
void reportError (yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
int reopenFd (int fd, const std::string &name)

Private Attributes

Gaudi::Property< int > m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
Gaudi::Property< std::string > m_channel2Scatterer {this, "Channel2Scatterer", {}}
Gaudi::Property< std::string > m_channel2EvtSel {this, "Channel2EvtSel", {}}
Gaudi::Property< bool > m_debug {this, "Debug", false}
int m_rankId {-1}
 Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
int m_activeWorkers {0}
 Keep track of the number of workers.
std::string m_inpFile
 Cached name of the input file. To avoid reopening.
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
SmartIF< IEvtSelectorSeekm_evtSeek
std::unique_ptr< AthenaInterprocess::SharedQueuem_sharedRankQueue
AthenaInterprocess::SharedQueuem_sharedFailedPidQueue {nullptr}
std::map< pid_t, int > m_nProcessedEvents
std::deque< pid_tm_finQueue
std::map< pid_t, ProcessStatem_procStates

Detailed Description

Definition at line 25 of file EvtRangeProcessor.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

◆ ProcessState

Enumerator
PROC_STATE_INIT 
PROC_STATE_EXEC 
PROC_STATE_FIN 
PROC_STATE_STOP 

Definition at line 56 of file EvtRangeProcessor.h.

Constructor & Destructor Documentation

◆ EvtRangeProcessor()

EvtRangeProcessor::EvtRangeProcessor ( const std::string & type,
const std::string & name,
const IInterface * parent )

Definition at line 34 of file EvtRangeProcessor.cxx.

37 : AthenaMPToolBase(type,name,parent)
38 , m_chronoStatSvc("ChronoStatSvc", name)
39 , m_incidentSvc("IncidentSvc", name)
40{
41 m_subprocDirPrefix = "worker_";
42}
std::string m_subprocDirPrefix
For ex. "worker__".
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
ServiceHandle< IIncidentSvc > m_incidentSvc

◆ ~EvtRangeProcessor()

EvtRangeProcessor::~EvtRangeProcessor ( )
overridevirtual

Definition at line 44 of file EvtRangeProcessor.cxx.

45{
46}

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/5]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag flag,
pid_t pid = 0 )
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [2/5]

StatusCode startProcess EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( )
private

◆ ATLAS_NOT_THREAD_SAFE() [3/5]

virtual StatusCode exec EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [4/5]

virtual int makePool EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( int maxevt,
int nprocs,
const std::string & topdir )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [5/5]

virtual StatusCode wait_once EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( pid_t & pid)
overridevirtual

Reimplemented from AthenaMPToolBase.

◆ bootstrap_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeProcessor::bootstrap_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 368 of file EvtRangeProcessor.cxx.

369{
371
372 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
373 outwork->data = CxxUtils::xmalloc(sizeof(int));
374 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
375 outwork->size = sizeof(int);
376 // ...
377 // (possible) TODO: extend outwork with some error message, which will be eventually
378 // reported in the master proces
379 // ...
380
381 // ________________________ Get RankID ________________________
382 //
383 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
384 ATH_MSG_ERROR("Unable to get rank ID!");
385 return outwork;
386 }
387 std::ostringstream workindex;
388 workindex<<m_rankId;
389
390 // ________________________ Worker dir: mkdir ________________________
391 std::filesystem::path worker_rundir(m_subprocTopDir);
392 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
393 // TODO: this "worker_" can be made configurable too
394
395 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
396 ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
397 return outwork;
398 }
399
400 // ________________________ Redirect logs ________________________
401 if(!m_debug) {
402 if(redirectLog(worker_rundir.string()))
403 return outwork;
404
405 ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
406 }
407
408 // ________________________ Update Io Registry ____________________________
409 if(updateIoReg(worker_rundir.string()))
410 return outwork;
411
412 ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
413
414 // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
415 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
416 if(std::filesystem::is_regular_file("SimParams.db"))
417 COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
418 if(std::filesystem::is_regular_file("DigitParams.db"))
419 COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
420 if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
421 COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
422
423 // _______________________ Handle saved PFC (if any) ______________________
424 if(handleSavedPfc(abs_worker_rundir))
425 return outwork;
426
427 // ________________________ reopen descriptors ____________________________
428 if(reopenFds())
429 return outwork;
430
431 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
432
433
434 // ________________________ I/O reinit ________________________
435 if(!m_ioMgr->io_reinitialize().isSuccess()) {
436 ATH_MSG_ERROR("Failed to reinitialize I/O");
437 return outwork;
438 } else {
439 ATH_MSG_DEBUG("Successfully reinitialized I/O");
440 }
441
442 // ________________________ Event selector restart ________________________
443 IService* evtSelSvc = dynamic_cast<IService*>(evtSelector());
444 if(!evtSelSvc) {
445 ATH_MSG_ERROR("Failed to dyncast event selector to IService");
446 return outwork;
447 }
448 if(!evtSelSvc->start().isSuccess()) {
449 ATH_MSG_ERROR("Failed to restart the event selector");
450 return outwork;
451 } else {
452 ATH_MSG_DEBUG("Successfully restarted the event selector");
453 }
454
455 // ________________________ Restart background event selectors in pileup jobs ________________________
456 if(m_isPileup) {
457 const std::list<IService*>& service_list = serviceLocator()->getServices();
458 std::list<IService*>::const_iterator itSvc = service_list.begin(),
459 itSvcLast = service_list.end();
460 for(;itSvc!=itSvcLast;++itSvc) {
461 IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc);
462 if(evtsel && (evtsel != evtSelector())) {
463 if((*itSvc)->start().isSuccess())
464 ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name());
465 else {
466 ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name());
467 return outwork;
468 }
469 }
470 }
471 }
472
473 // ________________________ Worker dir: chdir ________________________
474 if(chdir(worker_rundir.string().c_str())==-1) {
475 ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
476 return outwork;
477 }
478
479 // Declare success and return
480 *(int*)(outwork->data) = 0;
481 return outwork;
482}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
IEvtSelector * evtSelector()
Gaudi::Property< bool > m_isPileup
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IIoComponentMgr > m_ioMgr
std::string fmterror(int errnum)
Gaudi::Property< bool > m_debug
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
#define COPY_FILE_HACK(_src, _dest)
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
mkdir(path, recursive=True)

◆ evtSelector()

IEvtSelector * AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83{ return m_evtSelector; }
SmartIF< IEvtSelector > m_evtSelector

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeProcessor::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 484 of file EvtRangeProcessor.cxx.

485{
486 ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
487
488 int nEvt(1);
489 int nEventsProcessed(0);
490
491 std::queue<std::string> queueTokens;
492
493 // Get the yampl connection channels
494 yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
495 std::string socket2ScattererName = m_channel2Scatterer.value() + std::string("_") + m_randStr;
496 yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
497 ATH_MSG_INFO("Created CLIENT socket to the Scatterer: " << socket2ScattererName);
498 std::ostringstream pidstr;
499 pidstr << getpid();
500
501 // Construct a "welcome" message to be sent to the EvtRangeScatterer
502 std::string ping = pidstr.str() + std::string(" ready for event processing");
503
504 while(true) {
505 void* message2scatterer = CxxUtils::xmalloc(ping.size());
506 memcpy(message2scatterer,ping.data(),ping.size());
507 socket2Scatterer->send(message2scatterer,ping.size());
508 ATH_MSG_INFO("Sent a welcome message to the Scatterer");
509
510 // Get the response - list of tokens - from the scatterer.
511 // The format of the response: | ResponseSize | RangeID, | evtEvtRange[,evtToken] |
512 char *responseBuffer(0);
513 std::string strPeerId;
514 ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
515 // If response size is 0 then break the loop
516 if(responseSize==1) {
517 ATH_MSG_INFO("Empty range received. Terminating the loop");
518 break;
519 }
520
521 std::string responseStr(responseBuffer,responseSize);
522 ATH_MSG_INFO("Received response from the Scatterer : " << responseStr);
523
524 // Start timing
525 System::ProcessTime time_start = System::getProcessTime();
526
527 size_t startpos(0);
528 size_t endpos = responseStr.find(',');
529 while(endpos!=std::string::npos) {
530 queueTokens.push(responseStr.substr(startpos,endpos-startpos));
531 startpos = endpos+1;
532 endpos = responseStr.find(',',startpos);
533 }
534 queueTokens.push(responseStr.substr(startpos));
535 // Actually the first element in the tokens queue is the RangeID. Get it
536 std::string rangeID = queueTokens.front();
537 queueTokens.pop();
538 ATH_MSG_INFO("Received RangeID=" << rangeID);
539 // Fire an incident
540 m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
541
542 // Here we need to support two formats of the responseStr
543 // Format 1. RangeID,startEvent,endEvent
544 // Format 2. RangeID,fileName,startEvent,endEvent
545 //
546 // The difference between these two is that for Format 2 we first
547 // need to update InputCollections property on the Event Selector
548 // and only after that proceed with seeking
549 //
550 // The seeking part is identical for Format 1 and 2
551
553
554 // Determine the format
555 std::string filename("");
556 if(queueTokens.front().find("PFN:")==0) {
557 // We have Format 2
558 // Update InputCollections property of the Event Selector with the file name from Event Range
559 filename = queueTokens.front().substr(4);
560 if(setNewInputFile(filename).isFailure()) {
561 ATH_MSG_WARNING("Failed to set input file for the range: " << rangeID);
563 reportError(socket2Scatterer,rangeStatus);
564 m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
565 continue;
566 }
567 queueTokens.pop();
568 }
569
570 // Get the number of events to process
571 int startEvent = std::atoi(queueTokens.front().c_str());
572 queueTokens.pop();
573 int endEvent = std::atoi(queueTokens.front().c_str());
574 queueTokens.pop();
575 ATH_MSG_INFO("Range fields. File Name: " << (filename.empty()?"N/A":filename)
576 << ", First Event:" << startEvent
577 << ", Last Event:" << endEvent);
578
579 // Process the events
580 IEvtSelector::Context* ctx = nullptr;
581 if (evtSelector()->createContext (ctx).isFailure()) {
582 ATH_MSG_WARNING("Failed to create IEventSelector context.");
584 }
585 else {
586 for(int i(startEvent-1); i<endEvent; ++i) {
587 StatusCode sc = m_evtSeek->seek(*ctx, i);
588 if(sc.isRecoverable()) {
589 ATH_MSG_WARNING("Event " << i << " from range: " << rangeID << " not in the input file");
591 break;
592 }
593 else if(sc.isFailure()) {
594 ATH_MSG_WARNING("Failed to seek to " << i << " in range: " << rangeID);
596 break;
597 }
598 ATH_MSG_INFO("Seek to " << i << " succeeded");
599 m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
600 sc = m_evtProcessor->nextEvent(nEvt++);
601
602 m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
603 if(sc.isFailure()){
604 ATH_MSG_WARNING("Failed to process the event " << i << " in range:" << rangeID);
606 break;
607 }
608 else {
609 ATH_MSG_DEBUG("Event processed");
610 nEventsProcessed++;
611 }
612 }
613 }
614 if (evtSelector()->releaseContext (ctx).isFailure()) {
615 ATH_MSG_WARNING("Failed to release IEventSelector context.");
616 }
617
618 // Fire dummy NextEventRange incident in order to cut the previous output and report it
619 m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
620 if(rangeStatus!=AthenaMPToolBase::ESRANGE_SUCCESS) {
621 reportError(socket2Scatterer,rangeStatus);
622 continue;
623 }
624
625 // Event range successfully processed
626 std::string strOutpFile;
627 // Get the full path of the event range output file
628 for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
629 if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
630 if(strOutpFile.empty()) {
631 strOutpFile = fdIt->path().string();
632 }
633 else {
634 strOutpFile += (std::string(",")+fdIt->path().string());
635 }
636 }
637 }
638
639 // Stop timing
640 System::ProcessTime time_delta = System::getProcessTime() - time_start;
641
642 // Prepare the output report
643 if(!strOutpFile.empty()) {
644 // We need to combine the output file name with
645 // 1. RangeID (requested by JEDI)
646 // 2. CPU time
647 // 3. Wall time
648 std::ostringstream outputReportStream;
649 outputReportStream << strOutpFile
650 << ",ID:" << rangeID
651 << ",CPU:" << time_delta.cpuTime<System::Sec>()
652 << ",WALL:" << time_delta.elapsedTime<System::Sec>();
653 std::string outputFileReport = outputReportStream.str();
654
655 // Report the output
656 message2scatterer = CxxUtils::xmalloc(outputFileReport.size());
657 memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
658 socket2Scatterer->send(message2scatterer,outputFileReport.size());
659 ATH_MSG_INFO("Reported the output " << outputFileReport);
660 }
661 else {
662 // This is an error: range successfully processed but no outputs were made
663 ATH_MSG_WARNING("Failed to make an output file for range: " << rangeID);
665 }
666 } // Main "event loop"
667
668 if(m_evtProcessor->executeRun(0).isFailure()) {
669 ATH_MSG_WARNING("Could not finalize the Run");
670 }
671
672 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
673
674 // Return value: "ERRCODE|Func_Flag|NEvt"
675 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
676 void* outdata = CxxUtils::xmalloc(outsize);
677 *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
679 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
680 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
681
682 outwork->data = outdata;
683 outwork->size = outsize;
684 // ...
685 // (possible) TODO: extend outwork with some error message, which will be eventually
686 // reported in the master proces
687 // ...
688
689 delete socket2Scatterer;
690 delete socketFactory;
691
692 return outwork;
693}
#define ATH_MSG_WARNING(x)
static Double_t sc
ServiceHandle< IEventProcessor > m_evtProcessor
StatusCode setNewInputFile(const std::string &newFile)
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Gaudi::Property< std::string > m_channel2Scatterer
SmartIF< IEvtSelectorSeek > m_evtSeek
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeProcessor::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 695 of file EvtRangeProcessor.cxx.

696{
697 ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
698
699 if(m_appMgr->stop().isFailure()) {
700 ATH_MSG_WARNING("Unable to stop AppMgr");
701 }
702 else {
703 if(m_appMgr->finalize().isFailure()) {
704 std::cout << "Unable to finalize AppMgr" << std::endl;
705 }
706 }
707
708 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
709
710 // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
711 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
712 void* outdata = CxxUtils::xmalloc(outsize);
713 *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
715 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
716 int nEvt = -1;
717 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
718
719 outwork->data = outdata;
720 outwork->size = outsize;
721
722 return outwork;
723}
ServiceHandle< IAppMgrUI > m_appMgr

◆ finalize()

StatusCode AthenaMPToolBase::finalize ( )
overridevirtualinherited

Reimplemented in EvtRangeScatterer, SharedEvtQueueConsumer, SharedHiveEvtQueueConsumer, and SharedWriterTool.

Definition at line 87 of file AthenaMPToolBase.cxx.

88{
89 return StatusCode::SUCCESS;
90}

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334{
335 char buf[256];
336 strerror_r(errnum, buf, sizeof(buf));
337 return std::string(buf);
338}

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr EvtRangeProcessor::generateOutputReport ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 362 of file EvtRangeProcessor.cxx.

363{
365 return jobOutputs;
366}
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path & dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397{
398 if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399 COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400 return 0;
401}

◆ initialize()

StatusCode EvtRangeProcessor::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 48 of file EvtRangeProcessor.cxx.

49{
50 ATH_MSG_DEBUG("In initialize");
51
53 m_evtSeek = serviceLocator()->service(m_evtSelName);
54 ATH_CHECK(m_evtSeek.isValid());
55 ATH_CHECK(m_chronoStatSvc.retrieve());
56 ATH_CHECK(m_incidentSvc.retrieve());
57
58 return StatusCode::SUCCESS;
59}
#define ATH_CHECK
Evaluate an expression and check for errors.
std::string m_evtSelName
Name of the event selector.
virtual StatusCode initialize() override

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202{
203 for(const AthenaInterprocess::Process& child : m_processGroup->getChildren()) {
204 kill(child.getProcessID(),SIGKILL);
205 }
206}
AthenaInterprocess::ProcessGroup * m_processGroup

◆ operator()

virtual std::unique_ptr< ScheduledWork > AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string & rundir,
bool addTimeStamp = true )
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270{
271 // Redirect both stdout and stderr to the same file AthenaMP.log
272 int dup2result1(0), dup2result2(0);
273
274 int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275 if(newout==-1) {
276 ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277 return -1;
278 }
279 dup2result1 = dup2(newout, STDOUT_FILENO);
280 dup2result2 = dup2(newout, STDERR_FILENO);
281 TEMP_FAILURE_RETRY(close(newout));
282 if(dup2result1==-1) {
283 ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284 return -1;
285 }
286 if(dup2result2==-1) {
287 ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288 return -1;
289 }
290
291 if(addTimeStamp) {
292 SmartIF<IProperty> propertyServer(msgSvc());
293 if(propertyServer==0) {
294 ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295 return -1;
296 }
297
298 std::string propertyName("Format");
299 std::string oldFormat("");
300 StringProperty formatProp(propertyName,oldFormat);
301 StatusCode sc = propertyServer->getProperty(&formatProp);
302 if(sc.isFailure()) {
303 ATH_MSG_WARNING("Message Service does not have Format property");
304 }
305 else {
306 oldFormat = formatProp.value();
307 if(oldFormat.find("%t")==std::string::npos) {
308 // Add time stamps
309 std::string newFormat("%t " + oldFormat);
310 StringProperty newFormatProp(std::move(propertyName),newFormat);
311 ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312 }
313 else {
314 ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315 }
316 }
317 }
318
319 return 0;
320}
msgSvc
Provide convenience handles for various services.
Definition StdJOSetup.py:36
@ open
Definition BinningType.h:40

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int fd,
const std::string & name )
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420{
421 ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422 int old_openflags = fcntl(fd,F_GETFL,0);
423 switch(old_openflags & O_ACCMODE) {
424 case O_RDONLY: {
425 ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426 break;
427 }
428 case O_WRONLY: {
429 ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430 break;
431 }
432 case O_RDWR: {
433 ATH_MSG_DEBUG("The File Access Mode is RDWR");
434 break;
435 }
436 }
437
438 int old_descflags = fcntl(fd,F_GETFD,0);
439 off_t oldpos = lseek(fd,0,SEEK_CUR);
440 if(oldpos==-1) {
441 if(errno==ESPIPE) {
442 ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443 }
444 else {
445 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446 return -1;
447 }
448 }
449 else {
450 Io::Fd newfd = open(name.c_str(),old_openflags);
451 if(newfd==-1) {
452 ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453 return -1;
454 }
455 if(lseek(newfd,oldpos,SEEK_SET)==-1){
456 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457 TEMP_FAILURE_RETRY(close(newfd));
458 return -1;
459 }
460 TEMP_FAILURE_RETRY(close(fd));
461 if(dup2(newfd,fd)==-1) {
462 ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463 TEMP_FAILURE_RETRY(close(newfd));
464 return -1;
465 }
466 if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467 ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468 TEMP_FAILURE_RETRY(close(newfd));
469 return -1;
470 }
471 TEMP_FAILURE_RETRY(close(newfd));
472 }
473 return 0;
474}

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341{
342 // Reopen file descriptors.
343 // First go over all open files, which have been registered with the FileMgr
344 // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345 std::set<int> fdLog;
346
347 // Query the FileMgr contents
348 std::vector<const Io::FileAttr*> filemgrFiles;
349 std::vector<const Io::FileAttr*>::const_iterator itFile;
350 unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351 if(filenum!=filemgrFiles.size())
352 ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353
354 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355 ATH_MSG_DEBUG("* " << **itFile);
356 const std::string& filename = (**itFile).name();
357 Io::Fd fd = (**itFile).fd();
358
359 if(fd==-1) {
360 // It is legal to have fd=-1 for remote inputs
361 // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362 // So, this hopefully is a temporary patch
363 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364 continue;
365 }
366
367 if(reopenFd(fd,filename))
368 return -1;
369
370 fdLog.insert(fd);
371 }
372
373 // Check the FdsRegistry
374 for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376 ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377 }
378 else {
379 ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380
381 if(regEntry.fd==-1) {
382 // Same protection as the one above
383 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384 continue;
385 }
386
387 if(reopenFd(regEntry.fd,regEntry.name))
388 return -1;
389
390 fdLog.insert(regEntry.fd);
391 }
392 }
393 return 0;
394}
int reopenFd(int fd, const std::string &name)
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
ServiceHandle< IFileMgr > m_fileMgr

◆ reportError()

void EvtRangeProcessor::reportError ( yampl::ISocket * socket,
AthenaMPToolBase::ESRange_Status status )
private

Definition at line 784 of file EvtRangeProcessor.cxx.

785{
786 pid_t pid = getpid();
787 size_t messageSize = sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status);
788 void* message2scatterer = CxxUtils::xmalloc(messageSize);
789 memcpy(message2scatterer,&pid,sizeof(pid_t));
790 memcpy((pid_t*)message2scatterer+1,&status,sizeof(AthenaMPToolBase::ESRange_Status));
791 socket->send(message2scatterer,messageSize);
792}
int32_t pid_t

◆ reportSubprocessStatuses()

void EvtRangeProcessor::reportSubprocessStatuses ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 332 of file EvtRangeProcessor.cxx.

333{
334 ATH_MSG_INFO("Statuses of event processors");
335 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
336 for(size_t i=0; i<statuses.size(); ++i) {
337 // Get the number of events processed by this worker
338 std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
339 std::ostringstream ostr;
340 if(it==m_nProcessedEvents.end())
341 ostr << "N/A";
342 else
343 ostr << it->second;
344 ATH_MSG_INFO("*** Process PID=" << statuses[i].pid
345 << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
346 << ". Number of events processed: " << ostr.str());
347 }
348}
std::map< pid_t, int > m_nProcessedEvents
list statuses

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44{m_maxEvt=maxEvt;}
int m_maxEvt
Maximum number of events assigned to the job.

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop * runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45{m_mpRunStop=runStop;}
const AthenaInterprocess::IMPRunStop * m_mpRunStop

◆ setNewInputFile()

StatusCode EvtRangeProcessor::setNewInputFile ( const std::string & newFile)
private

Definition at line 750 of file EvtRangeProcessor.cxx.

751{
752 if(m_inpFile == newFile) return StatusCode::SUCCESS;
753
754 // Get Property Server
755 IProperty* propertyServer = dynamic_cast<IProperty*>(evtSelector());
756 if(!propertyServer) {
757 ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
758 return StatusCode::FAILURE;
759 }
760
761 std::string propertyName("InputCollections");
762 if(m_inpFile.empty()) {
763 std::vector<std::string> vect;
764 StringArrayProperty inputFileList(propertyName, vect);
765 if(propertyServer->getProperty(&inputFileList).isFailure()) {
766 ATH_MSG_ERROR("Failed to get InputCollections property value of the Event Selector");
767 return StatusCode::FAILURE;
768 }
769 if(newFile==inputFileList.value()[0]) {
770 m_inpFile = newFile;
771 return StatusCode::SUCCESS;
772 }
773 }
774 std::vector<std::string> vect{newFile,};
775 StringArrayProperty newInputFileList(std::move(propertyName), vect);
776 if(propertyServer->setProperty(newInputFileList).isFailure()) {
777 ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector");
778 return StatusCode::FAILURE;
779 }
780 m_inpFile=newFile;
781 return StatusCode::SUCCESS;
782}
std::string m_inpFile
Cached name of the input file. To avoid reopening.

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string & randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197{
198 m_randStr = randStr;
199}

◆ subProcessLogs()

void EvtRangeProcessor::subProcessLogs ( std::vector< std::string > & filenames)
overridevirtual

Definition at line 350 of file EvtRangeProcessor.cxx.

351{
352 filenames.clear();
353 for(int i=0; i<m_nprocs; ++i) {
354 std::ostringstream workerIndex;
355 workerIndex << i;
356 std::filesystem::path worker_rundir(m_subprocTopDir);
357 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
358 filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
359 }
360}
int m_nprocs
Number of workers spawned by the master process.
list filenames
Definition grepfile.py:34

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string & rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323{
324 ATH_CHECK(m_ioMgr.retrieve(), -1);
325
326 // update the IoRegistry for the new workdir - make sure we use absolute path
327 std::filesystem::path abs_rundir = std::filesystem::absolute(rundir);
328 ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329
330 return 0;
331}

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry > registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192{
193 m_fdsRegistry = std::move(registry);
194}

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404{
405 ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406 sigset_t mask, oldmask;
407
409
410 sigemptyset (&mask);
411 sigaddset (&mask, SIGUSR1);
412
413 sigprocmask (SIG_BLOCK, &mask, &oldmask);
415 sigsuspend (&oldmask);
416 sigprocmask (SIG_UNBLOCK, &mask, NULL);
417}
#define sigemptyset(x)
Definition SealSignal.h:82
#define sigaddset(x, y)
Definition SealSignal.h:84
int sigset_t
Definition SealSignal.h:80
std::atomic< bool > sig_done

Member Data Documentation

◆ m_activeWorkers

int EvtRangeProcessor::m_activeWorkers {0}
private

Keep track of the number of workers.

Definition at line 69 of file EvtRangeProcessor.h.

69{0};

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_channel2EvtSel

Gaudi::Property<std::string> EvtRangeProcessor::m_channel2EvtSel {this, "Channel2EvtSel", {}}
private

Definition at line 65 of file EvtRangeProcessor.h.

65{this, "Channel2EvtSel", {}};

◆ m_channel2Scatterer

Gaudi::Property<std::string> EvtRangeProcessor::m_channel2Scatterer {this, "Channel2Scatterer", {}}
private

Definition at line 64 of file EvtRangeProcessor.h.

64{this, "Channel2Scatterer", {}};

◆ m_chronoStatSvc

ServiceHandle<IChronoStatSvc> EvtRangeProcessor::m_chronoStatSvc
private

Definition at line 72 of file EvtRangeProcessor.h.

◆ m_debug

Gaudi::Property<bool> EvtRangeProcessor::m_debug {this, "Debug", false}
private

Definition at line 66 of file EvtRangeProcessor.h.

66{this, "Debug", false};

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSeek

SmartIF<IEvtSelectorSeek> EvtRangeProcessor::m_evtSeek
private

Definition at line 74 of file EvtRangeProcessor.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_finQueue

std::deque<pid_t> EvtRangeProcessor::m_finQueue
private

Definition at line 80 of file EvtRangeProcessor.h.

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> EvtRangeProcessor::m_incidentSvc
private

Definition at line 73 of file EvtRangeProcessor.h.

◆ m_inpFile

std::string EvtRangeProcessor::m_inpFile
private

Cached name of the input file. To avoid reopening.

Definition at line 70 of file EvtRangeProcessor.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

103{this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"};

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

86{-1};

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

92{nullptr};

◆ m_nEventsBeforeFork

Gaudi::Property<int> EvtRangeProcessor::m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
private

Definition at line 63 of file EvtRangeProcessor.h.

63{this, "EventsBeforeFork", 0, "Number of events before forking"};

◆ m_nProcessedEvents

std::map<pid_t,int> EvtRangeProcessor::m_nProcessedEvents
private

Definition at line 79 of file EvtRangeProcessor.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

85{-1};

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

91{nullptr};

◆ m_procStates

std::map<pid_t,ProcessState> EvtRangeProcessor::m_procStates
private

Definition at line 81 of file EvtRangeProcessor.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_rankId

int EvtRangeProcessor::m_rankId {-1}
private

Each worker has its own unique RankID from the range (0,...,m_nprocs-1)

Definition at line 68 of file EvtRangeProcessor.h.

68{-1};

◆ m_sharedFailedPidQueue

AthenaInterprocess::SharedQueue* EvtRangeProcessor::m_sharedFailedPidQueue {nullptr}
private

Definition at line 77 of file EvtRangeProcessor.h.

77{nullptr};

◆ m_sharedRankQueue

std::unique_ptr<AthenaInterprocess::SharedQueue> EvtRangeProcessor::m_sharedRankQueue
private

Definition at line 76 of file EvtRangeProcessor.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.


The documentation for this class was generated from the following files: