11 #include "GaudiKernel/IEvtSelector.h"
12 #include "GaudiKernel/IIoComponentMgr.h"
13 #include "GaudiKernel/IFileMgr.h"
21 #include "yampl/SocketFactory.h"
28 ,
const std::string&
name
29 ,
const IInterface*
parent)
44 if(!
sc.isSuccess())
return sc;
46 return StatusCode::SUCCESS;
51 return StatusCode::SUCCESS;
54 int EvtRangeScatterer::makePool(
int maxevt,
int nprocs,
const std::string& topdir)
59 ATH_MSG_ERROR(
"Invalid number of events requested: " << maxevt);
85 return StatusCode::FAILURE;
90 return StatusCode::FAILURE;
92 return StatusCode::SUCCESS;
100 filenames.push_back(reader_rundir.string()+std::string(
"/AthenaMP.log"));
113 *(
int*)(outwork->
data) = 1;
125 if(
mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
126 ATH_MSG_ERROR(
"Unable to make reader run directory: " << reader_rundir.string() <<
". " <<
fmterror(errno));
134 ATH_MSG_INFO(
"Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
140 ATH_MSG_INFO(
"Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
151 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
154 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
164 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
167 if(!evtSelSvc->start().isSuccess()) {
175 if(chdir(reader_rundir.string().c_str())==-1) {
176 ATH_MSG_ERROR(
"Failed to chdir to " << reader_rundir.string());
181 *(
int*)(outwork->
data) = 0;
187 ATH_MSG_INFO(
"Exec function in the AthenaMP Token Scatterer PID=" << getpid());
189 yampl::ISocketFactory* socketFactory =
new yampl::SocketFactory();
192 ATH_MSG_INFO(
"Created CLIENT socket for communicating Event Ranges with the Pilot");
195 yampl::ISocket* socket2Processor = socketFactory->createServerSocket(
yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
196 ATH_MSG_INFO(
"Created SERVER socket to token processors: " << socket2ProcessorName);
199 int procReportPending(0);
203 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
208 std::string strReady(
"Ready for events");
209 std::string strStopProcessing(
"No more events");
210 std::string processorWaitRequest(
"");
218 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors ... ");
219 while(processorWaitRequest.empty()) {
220 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
224 ATH_MSG_INFO(
"One of the processors is ready for the next range");
226 workerPid =
std::atoi(processorWaitRequest.c_str());
235 memcpy(ready_message,strReady.data(),strReady.size());
236 socket2Pilot->send(ready_message,strReady.size());
237 void* eventRangeMessage;
238 std::string strPeerId;
239 ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
240 std::string eventRange((
const char*)eventRangeMessage,eventRangeSize);
241 size_t carRet = eventRange.find(
'\n');
242 if(carRet!=std::string::npos)
243 eventRange.resize(carRet);
246 if(eventRange.find(strStopProcessing)!=std::string::npos) {
247 ATH_MSG_INFO(
"Stopped the loop. Last message from the Event Range Channel: " << eventRange);
250 ATH_MSG_INFO(
"Got Event Range from the pilot: " << eventRange);
255 if(eventRange.starts_with(
"[{")) eventRange=eventRange.substr(2);
256 if(eventRange.ends_with(
"}]")) eventRange.resize(eventRange.size()-2);
258 std::map<std::string,std::string> eventRangeMap;
260 size_t endpos = eventRange.find(
',');
261 while(endpos!=std::string::npos) {
263 std::string
keyValue(eventRange.substr(startpos,endpos-startpos));
264 size_t colonPos =
keyValue.find(
':');
265 std::string strKey =
keyValue.substr(0,colonPos);
266 std::string strVal =
keyValue.substr(colonPos+1);
269 eventRangeMap[strKey]=std::move(strVal);
272 endpos = eventRange.find(
',',startpos);
275 std::string
keyValue(eventRange.substr(startpos));
276 size_t colonPos =
keyValue.find(
':');
277 std::string strKey =
keyValue.substr(0,colonPos);
278 std::string strVal =
keyValue.substr(colonPos+1);
281 eventRangeMap[strKey]=std::move(strVal);
283 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
284 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
285 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
286 || eventRangeMap.find(
"GUID")==eventRangeMap.end()) {
287 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong format");
292 socket2Pilot->send(errorMessage,
errorStr.size());
297 std::map<std::string,std::string>::const_iterator
it = eventRangeMap.begin(),
298 itEnd = eventRangeMap.end();
304 std::string rangeID = eventRangeMap[
"eventRangeID"];
305 std::string
guid = eventRangeMap[
"GUID"];
306 int startEvent =
std::atoi(eventRangeMap[
"startEvent"].c_str());
307 int lastEvent =
std::atoi(eventRangeMap[
"lastEvent"].c_str());
310 || lastEvent < startEvent) {
311 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong values of range fields");
316 socket2Pilot->send(errorMessage,
errorStr.size());
320 std::string message2ProcessorStr;
321 char* message2Processor(0);
323 std::ostringstream ostr;
325 if(eventRangeMap.find(
"PFN")!=eventRangeMap.end()) {
326 ostr <<
"," <<
"PFN:" << eventRangeMap[
"PFN"];
328 ostr <<
"," << eventRangeMap[
"startEvent"]
329 <<
"," << eventRangeMap[
"lastEvent"];
330 message2ProcessorStr = ostr.str();
335 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors");
336 while(processorWaitRequest.empty()) {
337 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
341 ATH_MSG_INFO(
"One of the processors is ready for the next range");
343 workerPid =
std::atoi(processorWaitRequest.c_str());
352 memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
353 socket2Processor->send(message2Processor,message2ProcessorStr.size());
358 processorWaitRequest.clear();
360 ATH_MSG_INFO(
"Sent response to the processor : " << message2ProcessorStr);
367 void* emptyMess4Processor(0);
368 if(!processorWaitRequest.empty()) {
371 socket2Processor->send(emptyMess4Processor,1);
372 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
373 processorWaitRequest.clear();
378 while(processorWaitRequest.empty()) {
379 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
388 workerPid =
std::atoi(processorWaitRequest.c_str());
394 socket2Processor->send(emptyMess4Processor,1);
395 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
396 ATH_MSG_INFO(
"Still " << procReportPending <<
" pending reports");
397 processorWaitRequest.clear();
406 if(
m_appMgr->finalize().isFailure()) {
407 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
414 *(
int*)(outwork->
data) = (all_ok?0:1);
421 delete socket2Processor;
423 delete socketFactory;
433 *(
int*)(outwork->
data) = 0;
445 if(
str.empty())
return;
449 while(
str[
i]==
' ')
i--;
450 if(
i)
str.resize(
i+1);
457 if(
str.starts_with(
"u\'")) {
459 if(
str.ends_with(
"\'")) {
463 else if(
str.starts_with(
"\"")) {
465 if(
str.ends_with(
"\"")) {
472 , yampl::ISocket* socket2Pilot
473 ,
int& procReportPending)
475 void* processor_request(0);
476 std::string strPeerId;
477 ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
479 if(processorRequestSize==-1)
return std::string(
"");
481 ATH_MSG_INFO(
"Processor reported event range processing error");
487 errorStr+=std::string(
"Not found in the input file");
490 errorStr+=std::string(
"Seek failed");
493 errorStr+=std::string(
"Failed to process event range");
496 errorStr+=std::string(
"Failed to make output file");
499 errorStr+=std::string(
"Failed to set input file");
506 socket2Pilot->send(errorMessage,
errorStr.size());
508 ATH_MSG_INFO(
"Error reported to the pilot. Reports pending: " << procReportPending);
509 return std::string(
"");
511 std::string strProcessorRequest((
const char*)processor_request,processorRequestSize);
512 ATH_MSG_INFO(
"Received request from a processor: " << strProcessorRequest);
514 if(strProcessorRequest.starts_with(
"/")) {
516 memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
517 socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
519 ATH_MSG_INFO(
"Output file reported to the pilot. Reports pending: " << procReportPending);
520 return std::string(
"");
522 return strProcessorRequest;
526 , yampl::ISocket* socket2Pilot
527 ,
int& procReportPending)
540 socket2Pilot->send(errorMessage,
errorStr.size());