11#include "GaudiKernel/IEvtSelector.h"
12#include "GaudiKernel/IIoComponentMgr.h"
13#include "GaudiKernel/IFileMgr.h"
14#include "GaudiKernel/IChronoStatSvc.h"
15#include "GaudiKernel/ISvcLocator.h"
16#include "GaudiKernel/IIncidentSvc.h"
17#include "GaudiKernel/FileIncident.h"
18#include "GaudiKernel/Timing.h"
31#include "yampl/SocketFactory.h"
35 ,
const std::string& name
36 ,
const IInterface* parent)
58 return StatusCode::SUCCESS;
61int EvtRangeProcessor::makePool(
int,
int nprocs,
const std::string& topdir)
65 if(nprocs==0 || nprocs<-1) {
66 ATH_MSG_ERROR(
"Invalid value for the nprocs parameter: " << nprocs);
80 std::ostringstream rankQueueName;
81 rankQueueName <<
"EvtRangeProcessor_RankQueue_" << getpid() <<
"_" <<
m_randStr;
105StatusCode EvtRangeProcessor::exec()
111 return StatusCode::SUCCESS;
114StatusCode EvtRangeProcessor::wait_once(
pid_t& pid)
123 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
124 return StatusCode::FAILURE;
139 return StatusCode::FAILURE;
145 switch(itProcState->second) {
148 ATH_MSG_ERROR(
"Worker with process ID=" << pid <<
" failed at initialization!");
149 return StatusCode::FAILURE;
156 ATH_MSG_ERROR(
"Failed to report the crashed pid to the Event Range Scatterer");
157 return StatusCode::FAILURE;
161 if(startProcess().isSuccess()) {
168 return StatusCode::FAILURE;
179 return StatusCode::FAILURE;
189 ATH_MSG_ERROR(
"Detected unexpected state " << itProcState->second <<
" of failed worker with PID=" << pid);
190 return StatusCode::FAILURE;
198 ATH_MSG_ERROR(
"Failed to release the Event Range Scatterer");
199 return StatusCode::FAILURE;
212 return StatusCode::FAILURE;
219 AthenaInterprocess::ProcessResult* presult =
m_processGroup->pullOneResult();
221 if((
unsigned)(presult->
output.
size)>=
sizeof(
int)) {
223 const AthenaInterprocess::ScheduledWork&
output = presult->
output;
229 ATH_MSG_ERROR(
"Unable to find PID=" << childPid <<
" in the Proc States map!");
232 return StatusCode::FAILURE;
235 ATH_MSG_DEBUG(
"Decoding the output of PID=" << childPid <<
" with the size=" <<
output.size);
241 ATH_MSG_ERROR(
"Problem scheduling execution on PID=" << childPid);
244 return StatusCode::FAILURE;
252 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
257 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
259 ATH_MSG_DEBUG(
"PID=" << childPid <<
" processed " << nevt <<
" events");
263 ATH_MSG_DEBUG(
"Added PID=" << childPid <<
" to the finalization queue");
269 ATH_MSG_ERROR(
"Problem scheduling finalization on PID=" << childPid);
272 return StatusCode::FAILURE;
275 ATH_MSG_INFO(
"Scheduled finalization of PID=" << childPid);
285 if(pidFront==childPid) {
290 ATH_MSG_ERROR(
"Failed to set the process PID=" << pidFront <<
" free");
293 return StatusCode::FAILURE;
298 ATH_MSG_DEBUG(
"PID=" << childPid <<
" removed from the queue");
305 return StatusCode::FAILURE;
314 ATH_MSG_ERROR(
"Finalized PID=" << childPid <<
" while PID=" << pid <<
" was expected");
317 return StatusCode::FAILURE;
329 return StatusCode::SUCCESS;
335 const std::vector<AthenaInterprocess::ProcessStatus>& statuses =
m_processGroup->getStatuses();
336 for(
size_t i=0; i<statuses.size(); ++i) {
339 std::ostringstream ostr;
345 <<
". Status " << ((statuses[i].exitcode)?
"FAILURE":
"SUCCESS")
346 <<
". Number of events processed: " << ostr.str());
354 std::ostringstream workerIndex;
358 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
374 *(
int*)(outwork->data) = 1;
375 outwork->size =
sizeof(int);
387 std::ostringstream workindex;
395 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
396 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
405 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
412 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
415 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
416 if(std::filesystem::is_regular_file(
"SimParams.db"))
417 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
418 if(std::filesystem::is_regular_file(
"DigitParams.db"))
419 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
420 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
421 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
431 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
435 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
443 IService* evtSelSvc =
dynamic_cast<IService*
>(
evtSelector());
445 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
448 if(!evtSelSvc->start().isSuccess()) {
457 const std::list<IService*>& service_list = serviceLocator()->getServices();
458 std::list<IService*>::const_iterator itSvc = service_list.begin(),
459 itSvcLast = service_list.end();
460 for(;itSvc!=itSvcLast;++itSvc) {
461 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(*itSvc);
463 if((*itSvc)->start().isSuccess())
464 ATH_MSG_DEBUG(
"Restarted event selector " << (*itSvc)->name());
466 ATH_MSG_ERROR(
"Failed to restart event selector " << (*itSvc)->name());
474 if(chdir(worker_rundir.string().c_str())==-1) {
475 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
480 *(
int*)(outwork->data) = 0;
486 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
489 int nEventsProcessed(0);
491 std::queue<std::string> queueTokens;
494 yampl::ISocketFactory* socketFactory =
new yampl::SocketFactory();
496 yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
497 ATH_MSG_INFO(
"Created CLIENT socket to the Scatterer: " << socket2ScattererName);
498 std::ostringstream pidstr;
502 std::string ping = pidstr.str() + std::string(
" ready for event processing");
506 memcpy(message2scatterer,ping.data(),ping.size());
507 socket2Scatterer->send(message2scatterer,ping.size());
508 ATH_MSG_INFO(
"Sent a welcome message to the Scatterer");
512 char *responseBuffer(0);
513 std::string strPeerId;
514 ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
516 if(responseSize==1) {
517 ATH_MSG_INFO(
"Empty range received. Terminating the loop");
521 std::string responseStr(responseBuffer,responseSize);
522 ATH_MSG_INFO(
"Received response from the Scatterer : " << responseStr);
525 System::ProcessTime time_start = System::getProcessTime();
528 size_t endpos = responseStr.find(
',');
529 while(endpos!=std::string::npos) {
530 queueTokens.push(responseStr.substr(startpos,endpos-startpos));
532 endpos = responseStr.find(
',',startpos);
534 queueTokens.push(responseStr.substr(startpos));
536 std::string rangeID = queueTokens.front();
540 m_incidentSvc->fireIncident(FileIncident(name(),
"NextEventRange",rangeID));
555 std::string filename(
"");
556 if(queueTokens.front().find(
"PFN:")==0) {
559 filename = queueTokens.front().substr(4);
561 ATH_MSG_WARNING(
"Failed to set input file for the range: " << rangeID);
564 m_incidentSvc->fireIncident(FileIncident(name(),
"NextEventRange",
"dummy"));
571 int startEvent = std::atoi(queueTokens.front().c_str());
573 int endEvent = std::atoi(queueTokens.front().c_str());
575 ATH_MSG_INFO(
"Range fields. File Name: " << (filename.empty()?
"N/A":filename)
576 <<
", First Event:" << startEvent
577 <<
", Last Event:" << endEvent);
580 IEvtSelector::Context* ctx =
nullptr;
581 if (
evtSelector()->createContext (ctx).isFailure()) {
586 for(
int i(startEvent-1); i<endEvent; ++i) {
588 if(
sc.isRecoverable()) {
589 ATH_MSG_WARNING(
"Event " << i <<
" from range: " << rangeID <<
" not in the input file");
593 else if(
sc.isFailure()) {
594 ATH_MSG_WARNING(
"Failed to seek to " << i <<
" in range: " << rangeID);
604 ATH_MSG_WARNING(
"Failed to process the event " << i <<
" in range:" << rangeID);
614 if (
evtSelector()->releaseContext (ctx).isFailure()) {
619 m_incidentSvc->fireIncident(FileIncident(name(),
"NextEventRange",
"dummy"));
626 std::string strOutpFile;
628 for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
629 if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
630 if(strOutpFile.empty()) {
631 strOutpFile = fdIt->path().string();
634 strOutpFile += (std::string(
",")+fdIt->path().string());
640 System::ProcessTime time_delta = System::getProcessTime() - time_start;
643 if(!strOutpFile.empty()) {
648 std::ostringstream outputReportStream;
649 outputReportStream << strOutpFile
651 <<
",CPU:" << time_delta.cpuTime<System::Sec>()
652 <<
",WALL:" << time_delta.elapsedTime<System::Sec>();
653 std::string outputFileReport = outputReportStream.str();
657 memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
658 socket2Scatterer->send(message2scatterer,outputFileReport.size());
659 ATH_MSG_INFO(
"Reported the output " << outputFileReport);
663 ATH_MSG_WARNING(
"Failed to make an output file for range: " << rangeID);
677 *(
int*)(outdata) = 0;
679 memcpy((
char*)outdata+
sizeof(
int),&func,
sizeof(func));
680 memcpy((
char*)outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
682 outwork->data = outdata;
683 outwork->size = outsize;
689 delete socket2Scatterer;
690 delete socketFactory;
697 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
703 if(
m_appMgr->finalize().isFailure()) {
704 std::cout <<
"Unable to finalize AppMgr" << std::endl;
713 *(
int*)(outdata) = 0;
715 memcpy((
char*)outdata+
sizeof(
int),&func,
sizeof(func));
717 memcpy((
char*)outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
719 outwork->data = outdata;
720 outwork->size = outsize;
725StatusCode EvtRangeProcessor::startProcess()
732 return StatusCode::FAILURE;
738 return StatusCode::FAILURE;
743 return StatusCode::FAILURE;
747 return StatusCode::SUCCESS;
752 if(
m_inpFile == newFile)
return StatusCode::SUCCESS;
755 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
evtSelector());
756 if(!propertyServer) {
757 ATH_MSG_ERROR(
"Unable to dyn-cast the event selector to IProperty");
758 return StatusCode::FAILURE;
761 std::string propertyName(
"InputCollections");
763 std::vector<std::string> vect;
764 StringArrayProperty inputFileList(propertyName, vect);
765 if(propertyServer->getProperty(&inputFileList).isFailure()) {
766 ATH_MSG_ERROR(
"Failed to get InputCollections property value of the Event Selector");
767 return StatusCode::FAILURE;
769 if(newFile==inputFileList.value()[0]) {
771 return StatusCode::SUCCESS;
774 std::vector<std::string> vect{newFile,};
775 StringArrayProperty newInputFileList(std::move(propertyName), vect);
776 if(propertyServer->setProperty(newInputFileList).isFailure()) {
777 ATH_MSG_ERROR(
"Unable to update " << newInputFileList.name() <<
" property on the Event Selector");
778 return StatusCode::FAILURE;
781 return StatusCode::SUCCESS;
786 pid_t pid = getpid();
789 memcpy(message2scatterer,&pid,
sizeof(
pid_t));
791 socket->send(message2scatterer,messageSize);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
Extension to IEvtSelector to allow for seeking.
StatusCode setNewInputFile(const std::string &newFile)
std::string m_inpFile
Cached name of the input file. To avoid reopening.
std::deque< pid_t > m_finQueue
virtual StatusCode initialize() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Gaudi::Property< bool > m_debug
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
virtual void subProcessLogs(std::vector< std::string > &) override
virtual void reportSubprocessStatuses() override
ServiceHandle< IIncidentSvc > m_incidentSvc
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
std::map< pid_t, int > m_nProcessedEvents
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
EvtRangeProcessor(const std::string &type, const std::string &name, const IInterface *parent)
virtual ~EvtRangeProcessor() override
Gaudi::Property< std::string > m_channel2Scatterer
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
std::map< pid_t, ProcessState > m_procStates
SmartIF< IEvtSelectorSeek > m_evtSeek
int m_activeWorkers
Keep track of the number of workers.
#define COPY_FILE_HACK(_src, _dest)
const std::string process
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
void * xmalloc(size_t size)
Trapping version of malloc.
::StatusCode StatusCode
StatusCode definition for legacy code.
retrieve(aClass, aKey=None)
Trapping version of malloc.