10 #include "GaudiKernel/IEvtSelector.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/IFileMgr.h"
20 #include "yampl/SocketFactory.h"
27 ,
const std::string&
name
28 ,
const IInterface*
parent)
30 , m_processorChannel(
"")
31 , m_eventRangeChannel(
"")
49 if(!
sc.isSuccess())
return sc;
51 return StatusCode::SUCCESS;
56 return StatusCode::SUCCESS;
59 int EvtRangeScatterer::makePool(
int maxevt,
int nprocs,
const std::string& topdir)
64 ATH_MSG_ERROR(
"Invalid number of events requested: " << maxevt);
90 return StatusCode::FAILURE;
95 return StatusCode::FAILURE;
97 return StatusCode::SUCCESS;
105 filenames.push_back(reader_rundir.string()+std::string(
"/AthenaMP.log"));
118 *(
int*)(outwork->
data) = 1;
130 if(
mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
131 ATH_MSG_ERROR(
"Unable to make reader run directory: " << reader_rundir.string() <<
". " <<
fmterror(errno));
139 ATH_MSG_INFO(
"Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
145 ATH_MSG_INFO(
"Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
156 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
159 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
169 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
172 if(!evtSelSvc->start().isSuccess()) {
180 if(chdir(reader_rundir.string().c_str())==-1) {
181 ATH_MSG_ERROR(
"Failed to chdir to " << reader_rundir.string());
186 *(
int*)(outwork->
data) = 0;
192 ATH_MSG_INFO(
"Exec function in the AthenaMP Token Scatterer PID=" << getpid());
194 yampl::ISocketFactory* socketFactory =
new yampl::SocketFactory();
197 ATH_MSG_INFO(
"Created CLIENT socket for communicating Event Ranges with the Pilot");
200 yampl::ISocket* socket2Processor = socketFactory->createServerSocket(
yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
201 ATH_MSG_INFO(
"Created SERVER socket to token processors: " << socket2ProcessorName);
204 int procReportPending(0);
208 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
213 std::string strReady(
"Ready for events");
214 std::string strStopProcessing(
"No more events");
215 std::string processorWaitRequest(
"");
223 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors ... ");
224 while(processorWaitRequest.empty()) {
225 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
229 ATH_MSG_INFO(
"One of the processors is ready for the next range");
231 workerPid =
std::atoi(processorWaitRequest.c_str());
239 void* ready_message =
malloc(strReady.size());
240 memcpy(ready_message,strReady.data(),strReady.size());
241 socket2Pilot->send(ready_message,strReady.size());
242 void* eventRangeMessage;
243 std::string strPeerId;
244 ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
245 std::string eventRange((
const char*)eventRangeMessage,eventRangeSize);
246 size_t carRet = eventRange.find(
'\n');
247 if(carRet!=std::string::npos)
248 eventRange.resize(carRet);
251 if(eventRange.find(strStopProcessing)!=std::string::npos) {
252 ATH_MSG_INFO(
"Stopped the loop. Last message from the Event Range Channel: " << eventRange);
255 ATH_MSG_INFO(
"Got Event Range from the pilot: " << eventRange);
260 if(eventRange.starts_with(
"[{")) eventRange=eventRange.substr(2);
261 if(eventRange.ends_with(
"}]")) eventRange.resize(eventRange.size()-2);
263 std::map<std::string,std::string> eventRangeMap;
265 size_t endpos = eventRange.find(
',');
266 while(endpos!=std::string::npos) {
268 std::string
keyValue(eventRange.substr(startpos,endpos-startpos));
269 size_t colonPos =
keyValue.find(
':');
270 std::string strKey =
keyValue.substr(0,colonPos);
271 std::string strVal =
keyValue.substr(colonPos+1);
274 eventRangeMap[strKey]=strVal;
277 endpos = eventRange.find(
',',startpos);
280 std::string
keyValue(eventRange.substr(startpos));
281 size_t colonPos =
keyValue.find(
':');
282 std::string strKey =
keyValue.substr(0,colonPos);
283 std::string strVal =
keyValue.substr(colonPos+1);
286 eventRangeMap[strKey]=strVal;
288 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
289 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
290 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
291 || eventRangeMap.find(
"GUID")==eventRangeMap.end()) {
292 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong format");
297 socket2Pilot->send(errorMessage,
errorStr.size());
302 std::map<std::string,std::string>::const_iterator
it = eventRangeMap.begin(),
303 itEnd = eventRangeMap.end();
309 std::string rangeID = eventRangeMap[
"eventRangeID"];
310 std::string
guid = eventRangeMap[
"GUID"];
311 int startEvent =
std::atoi(eventRangeMap[
"startEvent"].c_str());
312 int lastEvent =
std::atoi(eventRangeMap[
"lastEvent"].c_str());
315 || lastEvent < startEvent) {
316 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong values of range fields");
321 socket2Pilot->send(errorMessage,
errorStr.size());
325 std::string message2ProcessorStr;
326 char* message2Processor(0);
328 std::ostringstream ostr;
330 if(eventRangeMap.find(
"PFN")!=eventRangeMap.end()) {
331 ostr <<
"," <<
"PFN:" << eventRangeMap[
"PFN"];
333 ostr <<
"," << eventRangeMap[
"startEvent"]
334 <<
"," << eventRangeMap[
"lastEvent"];
335 message2ProcessorStr = ostr.str();
340 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors");
341 while(processorWaitRequest.empty()) {
342 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
346 ATH_MSG_INFO(
"One of the processors is ready for the next range");
348 workerPid =
std::atoi(processorWaitRequest.c_str());
356 message2Processor = (
char*)
malloc(message2ProcessorStr.size());
357 memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
358 socket2Processor->send(message2Processor,message2ProcessorStr.size());
363 processorWaitRequest.clear();
365 ATH_MSG_INFO(
"Sent response to the processor : " << message2ProcessorStr);
372 void* emptyMess4Processor(0);
373 if(!processorWaitRequest.empty()) {
375 emptyMess4Processor =
malloc(1);
376 socket2Processor->send(emptyMess4Processor,1);
377 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
378 processorWaitRequest.clear();
383 while(processorWaitRequest.empty()) {
384 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
393 workerPid =
std::atoi(processorWaitRequest.c_str());
398 emptyMess4Processor =
malloc(1);
399 socket2Processor->send(emptyMess4Processor,1);
400 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
401 ATH_MSG_INFO(
"Still " << procReportPending <<
" pending reports");
402 processorWaitRequest.clear();
411 if(
m_appMgr->finalize().isFailure()) {
412 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
419 *(
int*)(outwork->
data) = (all_ok?0:1);
426 delete socket2Processor;
428 delete socketFactory;
438 *(
int*)(outwork->
data) = 0;
450 if(
str.empty())
return;
454 while(
str[
i]==
' ')
i--;
455 if(
i)
str.resize(
i+1);
462 if(
str.starts_with(
"u\'")) {
464 if(
str.ends_with(
"\'")) {
468 else if(
str.starts_with(
"\"")) {
470 if(
str.ends_with(
"\"")) {
477 , yampl::ISocket* socket2Pilot
478 ,
int& procReportPending)
480 void* processor_request(0);
481 std::string strPeerId;
482 ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
484 if(processorRequestSize==-1)
return std::string(
"");
486 ATH_MSG_INFO(
"Processor reported event range processing error");
492 errorStr+=std::string(
"Not found in the input file");
495 errorStr+=std::string(
"Seek failed");
498 errorStr+=std::string(
"Failed to process event range");
501 errorStr+=std::string(
"Failed to make output file");
504 errorStr+=std::string(
"Failed to set input file");
511 socket2Pilot->send(errorMessage,
errorStr.size());
513 ATH_MSG_INFO(
"Error reported to the pilot. Reports pending: " << procReportPending);
514 return std::string(
"");
516 std::string strProcessorRequest((
const char*)processor_request,processorRequestSize);
517 ATH_MSG_INFO(
"Received request from a processor: " << strProcessorRequest);
519 if(strProcessorRequest.starts_with(
"/")) {
520 void* outpFileNameMessage =
malloc(strProcessorRequest.size());
521 memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
522 socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
524 ATH_MSG_INFO(
"Output file reported to the pilot. Reports pending: " << procReportPending);
525 return std::string(
"");
527 return strProcessorRequest;
531 , yampl::ISocket* socket2Pilot
532 ,
int& procReportPending)
545 socket2Pilot->send(errorMessage,
errorStr.size());