11 #include "GaudiKernel/IEvtSelector.h" 
   12 #include "GaudiKernel/IIoComponentMgr.h" 
   13 #include "GaudiKernel/IFileMgr.h" 
   21 #include "yampl/SocketFactory.h" 
   28                      , 
const std::string& 
name 
   29                      , 
const IInterface* 
parent)
 
   44   if(!
sc.isSuccess()) 
return sc;
 
   46   return StatusCode::SUCCESS;
 
   51   return StatusCode::SUCCESS;
 
   54 int EvtRangeScatterer::makePool(
int maxevt, 
int nprocs, 
const std::string& topdir)
 
   59     ATH_MSG_ERROR(
"Invalid number of events requested: " << maxevt);
 
   85     return StatusCode::FAILURE;
 
   90     return StatusCode::FAILURE;
 
   92   return StatusCode::SUCCESS;
 
  100   filenames.push_back(reader_rundir.string()+std::string(
"/AthenaMP.log"));
 
  113   *(
int*)(outwork->
data) = 1; 
 
  125   if(
mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
 
  126     ATH_MSG_ERROR(
"Unable to make reader run directory: " << reader_rundir.string() << 
". " << 
fmterror(errno));
 
  134   ATH_MSG_INFO(
"Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
 
  140   ATH_MSG_INFO(
"Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
 
  151   ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
 
  154   if(!
m_ioMgr->io_reinitialize().isSuccess()) {
 
  164     ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
 
  167   if(!evtSelSvc->start().isSuccess()) {
 
  175   if(chdir(reader_rundir.string().c_str())==-1) {
 
  176     ATH_MSG_ERROR(
"Failed to chdir to " << reader_rundir.string());
 
  181   *(
int*)(outwork->
data) = 0;
 
  187   ATH_MSG_INFO(
"Exec function in the AthenaMP Token Scatterer PID=" << getpid());
 
  189   yampl::ISocketFactory* socketFactory = 
new yampl::SocketFactory();
 
  192   ATH_MSG_INFO(
"Created CLIENT socket for communicating Event Ranges with the Pilot");
 
  195   yampl::ISocket* socket2Processor = socketFactory->createServerSocket(
yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
 
  196   ATH_MSG_INFO(
"Created SERVER socket to token processors: " << socket2ProcessorName);
 
  199   int procReportPending(0);  
 
  203     ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
 
  208   std::string strReady(
"Ready for events");
 
  209   std::string strStopProcessing(
"No more events");
 
  210   std::string processorWaitRequest(
"");
 
  218       ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors ... ");
 
  219       while(processorWaitRequest.empty()) {
 
  220     processorWaitRequest = 
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
 
  224       ATH_MSG_INFO(
"One of the processors is ready for the next range");
 
  226       workerPid = 
std::atoi(processorWaitRequest.c_str());
 
  235     memcpy(ready_message,strReady.data(),strReady.size());
 
  236     socket2Pilot->send(ready_message,strReady.size());
 
  237     void* eventRangeMessage;
 
  238     std::string strPeerId;
 
  239     ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
 
  240     std::string eventRange((
const char*)eventRangeMessage,eventRangeSize);
 
  241     size_t carRet = eventRange.find(
'\n');
 
  242     if(carRet!=std::string::npos)
 
  243       eventRange.resize(carRet);
 
  246     if(eventRange.find(strStopProcessing)!=std::string::npos) {
 
  247       ATH_MSG_INFO(
"Stopped the loop. Last message from the Event Range Channel: " << eventRange);
 
  250     ATH_MSG_INFO(
"Got Event Range from the pilot: " << eventRange);
 
  255     if(eventRange.starts_with( 
"[{")) eventRange=eventRange.substr(2);
 
  256     if(eventRange.ends_with(
"}]")) eventRange.resize(eventRange.size()-2);
 
  258     std::map<std::string,std::string> eventRangeMap;
 
  260     size_t endpos = eventRange.find(
',');
 
  261     while(endpos!=std::string::npos) {
 
  263       std::string 
keyValue(eventRange.substr(startpos,endpos-startpos));
 
  264       size_t colonPos = 
keyValue.find(
':');
 
  265       std::string strKey = 
keyValue.substr(0,colonPos);
 
  266       std::string strVal = 
keyValue.substr(colonPos+1);
 
  269       eventRangeMap[strKey]=std::move(strVal);
 
  272       endpos = eventRange.find(
',',startpos);
 
  275     std::string 
keyValue(eventRange.substr(startpos));
 
  276     size_t colonPos = 
keyValue.find(
':');
 
  277     std::string strKey = 
keyValue.substr(0,colonPos);
 
  278     std::string strVal = 
keyValue.substr(colonPos+1);
 
  281     eventRangeMap[strKey]=std::move(strVal);
 
  283     if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
 
  284        || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
 
  285        || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
 
  286        || eventRangeMap.find(
"GUID")==eventRangeMap.end()) {
 
  287       std::string 
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange + 
"\": Wrong format");
 
  292       socket2Pilot->send(errorMessage,
errorStr.size());
 
  297       std::map<std::string,std::string>::const_iterator 
it = eventRangeMap.begin(),
 
  298     itEnd = eventRangeMap.end();
 
  304     std::string rangeID = eventRangeMap[
"eventRangeID"];
 
  305     std::string 
guid = eventRangeMap[
"GUID"];
 
  306     int startEvent = 
std::atoi(eventRangeMap[
"startEvent"].c_str());
 
  307     int lastEvent = 
std::atoi(eventRangeMap[
"lastEvent"].c_str());
 
  310        || lastEvent < startEvent) {
 
  311       std::string 
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange + 
"\": Wrong values of range fields");
 
  316       socket2Pilot->send(errorMessage,
errorStr.size());
 
  320     std::string message2ProcessorStr;
 
  321     char* message2Processor(0);
 
  323     std::ostringstream ostr;
 
  325     if(eventRangeMap.find(
"PFN")!=eventRangeMap.end()) {
 
  326       ostr << 
"," << 
"PFN:" << eventRangeMap[
"PFN"];
 
  328     ostr << 
"," << eventRangeMap[
"startEvent"]
 
  329          << 
"," << eventRangeMap[
"lastEvent"];
 
  330     message2ProcessorStr = ostr.str();
 
  335       ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors");
 
  336       while(processorWaitRequest.empty()) {
 
  337     processorWaitRequest = 
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
 
  341       ATH_MSG_INFO(
"One of the processors is ready for the next range");
 
  343       workerPid = 
std::atoi(processorWaitRequest.c_str());
 
  352     memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
 
  353     socket2Processor->send(message2Processor,message2ProcessorStr.size());
 
  358     processorWaitRequest.clear();
 
  360     ATH_MSG_INFO(
"Sent response to the processor : " << message2ProcessorStr);
 
  367     void* emptyMess4Processor(0);
 
  368     if(!processorWaitRequest.empty()) {
 
  371       socket2Processor->send(emptyMess4Processor,1);
 
  372       ATH_MSG_INFO(
"Set worker PID=" << workerPid << 
" free");
 
  373       processorWaitRequest.clear();
 
  378       while(processorWaitRequest.empty()) {
 
  379     processorWaitRequest = 
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
 
  388       workerPid = 
std::atoi(processorWaitRequest.c_str());
 
  394       socket2Processor->send(emptyMess4Processor,1);
 
  395       ATH_MSG_INFO(
"Set worker PID=" << workerPid << 
" free");
 
  396       ATH_MSG_INFO(
"Still " << procReportPending << 
" pending reports");
 
  397       processorWaitRequest.clear();
 
  406     if(
m_appMgr->finalize().isFailure()) {
 
  407       std::cerr << 
"Unable to finalize AppMgr" << std::endl;
 
  414   *(
int*)(outwork->
data) = (all_ok?0:1); 
 
  421   delete socket2Processor;
 
  423   delete socketFactory;
 
  433   *(
int*)(outwork->
data) = 0; 
 
  445   if(
str.empty()) 
return;  
 
  449   while(
str[
i]==
' ') 
i--;
 
  450   if(
i) 
str.resize(
i+1);
 
  457   if(
str.starts_with( 
"u\'")) {
 
  459     if(
str.ends_with( 
"\'")) {
 
  463   else if(
str.starts_with( 
"\"")) {
 
  465     if(
str.ends_with( 
"\"")) {
 
  472                            , yampl::ISocket* socket2Pilot
 
  473                            , 
int& procReportPending)
 
  475   void* processor_request(0);
 
  476   std::string strPeerId;
 
  477   ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
 
  479   if(processorRequestSize==-1) 
return std::string(
"");
 
  481     ATH_MSG_INFO(
"Processor reported event range processing error");
 
  487       errorStr+=std::string(
"Not found in the input file");
 
  490       errorStr+=std::string(
"Seek failed");
 
  493       errorStr+=std::string(
"Failed to process event range");
 
  496       errorStr+=std::string(
"Failed to make output file");
 
  499       errorStr+=std::string(
"Failed to set input file");
 
  506     socket2Pilot->send(errorMessage,
errorStr.size());
 
  508     ATH_MSG_INFO(
"Error reported to the pilot. Reports pending: " << procReportPending);
 
  509     return std::string(
"");
 
  511   std::string strProcessorRequest((
const char*)processor_request,processorRequestSize);
 
  512   ATH_MSG_INFO(
"Received request from a processor: " << strProcessorRequest);
 
  514   if(strProcessorRequest.starts_with( 
"/")) {
 
  516     memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
 
  517     socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
 
  519     ATH_MSG_INFO(
"Output file reported to the pilot. Reports pending: " << procReportPending);
 
  520     return std::string(
"");
 
  522   return strProcessorRequest;
 
  526                         , yampl::ISocket* socket2Pilot
 
  527                         , 
int& procReportPending)
 
  540       socket2Pilot->send(errorMessage,
errorStr.size());