10 #include "GaudiKernel/IEvtSelector.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/IFileMgr.h"
13 #include "GaudiKernel/IChronoStatSvc.h"
14 #include "GaudiKernel/ISvcLocator.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/FileIncident.h"
17 #include "GaudiKernel/Timing.h"
30 #include "yampl/SocketFactory.h"
34 ,
const std::string&
name
35 ,
const IInterface*
parent)
38 , m_nEventsBeforeFork(0)
41 , m_chronoStatSvc(
"ChronoStatSvc",
name)
42 , m_incidentSvc(
"IncidentSvc",
name)
44 , m_channel2Scatterer(
"")
45 , m_channel2EvtSel(
"")
46 , m_sharedRankQueue(0)
47 , m_sharedFailedPidQueue(0)
50 declareInterface<IAthenaMPTool>(
this);
74 return StatusCode::SUCCESS;
80 return StatusCode::SUCCESS;
83 int EvtRangeProcessor::makePool(
int,
int nprocs,
const std::string& topdir)
102 std::ostringstream rankQueueName;
103 rankQueueName <<
"EvtRangeProcessor_RankQueue_" << getpid() <<
"_" <<
m_randStr;
133 return StatusCode::SUCCESS;
145 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
146 return StatusCode::FAILURE;
161 return StatusCode::FAILURE;
167 switch(itProcState->second) {
170 ATH_MSG_ERROR(
"Worker with process ID=" <<
pid <<
" failed at initialization!");
171 return StatusCode::FAILURE;
178 ATH_MSG_ERROR(
"Failed to report the crashed pid to the Event Range Scatterer");
179 return StatusCode::FAILURE;
183 if(startProcess().isSuccess()) {
190 return StatusCode::FAILURE;
201 return StatusCode::FAILURE;
211 ATH_MSG_ERROR(
"Detected unexpected state " << itProcState->second <<
" of failed worker with PID=" <<
pid);
212 return StatusCode::FAILURE;
220 ATH_MSG_ERROR(
"Failed to release the Event Range Scatterer");
221 return StatusCode::FAILURE;
234 return StatusCode::FAILURE;
244 if((
unsigned)(presult->
output.
size)>=
sizeof(
int)) {
252 ATH_MSG_ERROR(
"Unable to find PID=" << childPid <<
" in the Proc States map!");
253 return StatusCode::FAILURE;
256 ATH_MSG_DEBUG(
"Decoding the output of PID=" << childPid <<
" with the size=" <<
output.size);
262 ATH_MSG_ERROR(
"Problem scheduling execution on PID=" << childPid);
263 return StatusCode::FAILURE;
271 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
276 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
278 ATH_MSG_DEBUG(
"PID=" << childPid <<
" processed " << nevt <<
" events");
282 ATH_MSG_DEBUG(
"Added PID=" << childPid <<
" to the finalization queue");
288 ATH_MSG_ERROR(
"Problem scheduling finalization on PID=" << childPid);
289 return StatusCode::FAILURE;
292 ATH_MSG_INFO(
"Scheduled finalization of PID=" << childPid);
302 if(pidFront==childPid) {
307 ATH_MSG_ERROR(
"Failed to set the process PID=" << pidFront <<
" free");
308 return StatusCode::FAILURE;
313 ATH_MSG_DEBUG(
"PID=" << childPid <<
" removed from the queue");
318 return StatusCode::FAILURE;
327 ATH_MSG_ERROR(
"Finalized PID=" << childPid <<
" while PID=" <<
pid <<
" was expected");
328 return StatusCode::FAILURE;
337 if(
res)
return StatusCode::FAILURE;
341 return StatusCode::SUCCESS;
351 std::ostringstream ostr;
358 <<
". Number of events processed: " << ostr.str());
366 std::ostringstream workerIndex;
370 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
386 *(
int*)(outwork->
data) = 1;
399 std::ostringstream workindex;
407 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
408 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
417 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
424 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
428 if(std::filesystem::is_regular_file(
"SimParams.db"))
429 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
430 if(std::filesystem::is_regular_file(
"DigitParams.db"))
431 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
432 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
433 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
443 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
447 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
455 IService* evtSelSvc =
dynamic_cast<IService*
>(
evtSelector());
457 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
460 if(!evtSelSvc->start().isSuccess()) {
469 const std::list<IService*>& service_list = serviceLocator()->getServices();
470 std::list<IService*>::const_iterator itSvc = service_list.begin(),
471 itSvcLast = service_list.end();
472 for(;itSvc!=itSvcLast;++itSvc) {
473 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(*itSvc);
475 if((*itSvc)->start().isSuccess())
476 ATH_MSG_DEBUG(
"Restarted event selector " << (*itSvc)->name());
478 ATH_MSG_ERROR(
"Failed to restart event selector " << (*itSvc)->name());
486 if(chdir(worker_rundir.string().c_str())==-1) {
487 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
492 *(
int*)(outwork->
data) = 0;
498 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
501 int nEventsProcessed(0);
503 std::queue<std::string> queueTokens;
506 yampl::ISocketFactory* socketFactory =
new yampl::SocketFactory();
508 yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(
yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
509 ATH_MSG_INFO(
"Created CLIENT socket to the Scatterer: " << socket2ScattererName);
510 std::ostringstream pidstr;
514 std::string ping = pidstr.str() + std::string(
" ready for event processing");
517 void* message2scatterer =
malloc(ping.size());
518 memcpy(message2scatterer,ping.data(),ping.size());
519 socket2Scatterer->send(message2scatterer,ping.size());
520 ATH_MSG_INFO(
"Sent a welcome message to the Scatterer");
524 char *responseBuffer(0);
525 std::string strPeerId;
526 ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
528 if(responseSize==1) {
529 ATH_MSG_INFO(
"Empty range received. Terminating the loop");
533 std::string responseStr(responseBuffer,responseSize);
534 ATH_MSG_INFO(
"Received response from the Scatterer : " << responseStr);
537 System::ProcessTime time_start = System::getProcessTime();
540 size_t endpos = responseStr.find(
',');
541 while(endpos!=std::string::npos) {
542 queueTokens.push(responseStr.substr(startpos,endpos-startpos));
544 endpos = responseStr.find(
',',startpos);
546 queueTokens.push(responseStr.substr(startpos));
548 std::string rangeID = queueTokens.front();
568 if(queueTokens.front().find(
"PFN:")==0) {
571 filename = queueTokens.front().substr(4);
573 ATH_MSG_WARNING(
"Failed to set input file for the range: " << rangeID);
583 int startEvent =
std::atoi(queueTokens.front().c_str());
585 int endEvent =
std::atoi(queueTokens.front().c_str());
588 <<
", First Event:" << startEvent
589 <<
", Last Event:" << endEvent);
592 IEvtSelector::Context* ctx =
nullptr;
593 if (
evtSelector()->createContext (ctx).isFailure()) {
598 for(
int i(startEvent-1);
i<endEvent; ++
i) {
600 if(
sc.isRecoverable()) {
601 ATH_MSG_WARNING(
"Event " <<
i <<
" from range: " << rangeID <<
" not in the input file");
605 else if(
sc.isFailure()) {
616 ATH_MSG_WARNING(
"Failed to process the event " <<
i <<
" in range:" << rangeID);
626 if (
evtSelector()->releaseContext (ctx).isFailure()) {
638 std::string strOutpFile;
640 for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
641 if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
642 if(strOutpFile.empty()) {
643 strOutpFile = fdIt->path().string();
646 strOutpFile += (std::string(
",")+fdIt->path().string());
652 System::ProcessTime time_delta = System::getProcessTime() - time_start;
655 if(!strOutpFile.empty()) {
660 std::ostringstream outputReportStream;
661 outputReportStream << strOutpFile
663 <<
",CPU:" << time_delta.cpuTime<System::Sec>()
664 <<
",WALL:" << time_delta.elapsedTime<System::Sec>();
665 std::string outputFileReport = outputReportStream.str();
668 message2scatterer =
malloc(outputFileReport.size());
669 memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
670 socket2Scatterer->send(message2scatterer,outputFileReport.size());
671 ATH_MSG_INFO(
"Reported the output " << outputFileReport);
675 ATH_MSG_WARNING(
"Failed to make an output file for range: " << rangeID);
691 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
692 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
695 outwork->
size = outsize;
701 delete socket2Scatterer;
702 delete socketFactory;
709 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
715 if(
m_appMgr->finalize().isFailure()) {
716 std::cout <<
"Unable to finalize AppMgr" << std::endl;
727 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
729 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
732 outwork->
size = outsize;
744 return StatusCode::FAILURE;
750 return StatusCode::FAILURE;
755 return StatusCode::FAILURE;
759 return StatusCode::SUCCESS;
764 if(
m_inpFile == newFile)
return StatusCode::SUCCESS;
767 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
evtSelector());
768 if(!propertyServer) {
769 ATH_MSG_ERROR(
"Unable to dyn-cast the event selector to IProperty");
770 return StatusCode::FAILURE;
773 std::string propertyName(
"InputCollections");
775 std::vector<std::string> vect;
776 StringArrayProperty inputFileList(propertyName, vect);
777 if(propertyServer->getProperty(&inputFileList).isFailure()) {
778 ATH_MSG_ERROR(
"Failed to get InputCollections property value of the Event Selector");
779 return StatusCode::FAILURE;
781 if(newFile==inputFileList.value()[0]) {
783 return StatusCode::SUCCESS;
786 std::vector<std::string> vect{newFile,};
787 StringArrayProperty newInputFileList(propertyName, vect);
788 if(propertyServer->setProperty(newInputFileList).isFailure()) {
789 ATH_MSG_ERROR(
"Unable to update " << newInputFileList.name() <<
" property on the Event Selector");
790 return StatusCode::FAILURE;
793 return StatusCode::SUCCESS;
800 void* message2scatterer =
malloc(messageSize);
801 memcpy(message2scatterer,&
pid,
sizeof(
pid_t));
803 socket->send(message2scatterer,messageSize);