10 #include "GaudiKernel/IEvtSelector.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/IFileMgr.h"
21 #include "yampl/SocketFactory.h"
28 ,
const std::string&
name
29 ,
const IInterface*
parent)
31 , m_processorChannel(
"")
32 , m_eventRangeChannel(
"")
50 if(!
sc.isSuccess())
return sc;
52 return StatusCode::SUCCESS;
57 return StatusCode::SUCCESS;
60 int EvtRangeScatterer::makePool(
int maxevt,
int nprocs,
const std::string& topdir)
65 ATH_MSG_ERROR(
"Invalid number of events requested: " << maxevt);
91 return StatusCode::FAILURE;
96 return StatusCode::FAILURE;
98 return StatusCode::SUCCESS;
106 filenames.push_back(reader_rundir.string()+std::string(
"/AthenaMP.log"));
119 *(
int*)(outwork->
data) = 1;
131 if(
mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
132 ATH_MSG_ERROR(
"Unable to make reader run directory: " << reader_rundir.string() <<
". " <<
fmterror(errno));
140 ATH_MSG_INFO(
"Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
146 ATH_MSG_INFO(
"Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
157 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
160 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
168 IService* evtSelSvc =
dynamic_cast<IService*
>(
m_evtSelector);
170 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
173 if(!evtSelSvc->start().isSuccess()) {
181 if(chdir(reader_rundir.string().c_str())==-1) {
182 ATH_MSG_ERROR(
"Failed to chdir to " << reader_rundir.string());
187 *(
int*)(outwork->
data) = 0;
193 ATH_MSG_INFO(
"Exec function in the AthenaMP Token Scatterer PID=" << getpid());
195 yampl::ISocketFactory* socketFactory =
new yampl::SocketFactory();
198 ATH_MSG_INFO(
"Created CLIENT socket for communicating Event Ranges with the Pilot");
201 yampl::ISocket* socket2Processor = socketFactory->createServerSocket(
yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
202 ATH_MSG_INFO(
"Created SERVER socket to token processors: " << socket2ProcessorName);
205 int procReportPending(0);
209 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
214 std::string strReady(
"Ready for events");
215 std::string strStopProcessing(
"No more events");
216 std::string processorWaitRequest(
"");
224 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors ... ");
225 while(processorWaitRequest.empty()) {
226 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
230 ATH_MSG_INFO(
"One of the processors is ready for the next range");
232 workerPid =
std::atoi(processorWaitRequest.c_str());
240 void* ready_message =
malloc(strReady.size());
241 memcpy(ready_message,strReady.data(),strReady.size());
242 socket2Pilot->send(ready_message,strReady.size());
243 void* eventRangeMessage;
244 std::string strPeerId;
245 ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
246 std::string eventRange((
const char*)eventRangeMessage,eventRangeSize);
247 size_t carRet = eventRange.find(
'\n');
248 if(carRet!=std::string::npos)
249 eventRange.resize(carRet);
252 if(eventRange.find(strStopProcessing)!=std::string::npos) {
253 ATH_MSG_INFO(
"Stopped the loop. Last message from the Event Range Channel: " << eventRange);
256 ATH_MSG_INFO(
"Got Event Range from the pilot: " << eventRange);
264 std::map<std::string,std::string> eventRangeMap;
266 size_t endpos = eventRange.find(
',');
267 while(endpos!=std::string::npos) {
269 std::string
keyValue(eventRange.substr(startpos,endpos-startpos));
270 size_t colonPos =
keyValue.find(
':');
271 std::string strKey =
keyValue.substr(0,colonPos);
272 std::string strVal =
keyValue.substr(colonPos+1);
275 eventRangeMap[strKey]=strVal;
278 endpos = eventRange.find(
',',startpos);
281 std::string
keyValue(eventRange.substr(startpos));
282 size_t colonPos =
keyValue.find(
':');
283 std::string strKey =
keyValue.substr(0,colonPos);
284 std::string strVal =
keyValue.substr(colonPos+1);
287 eventRangeMap[strKey]=strVal;
289 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
290 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
291 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
292 || eventRangeMap.find(
"GUID")==eventRangeMap.end()) {
293 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong format");
298 socket2Pilot->send(errorMessage,
errorStr.size());
303 std::map<std::string,std::string>::const_iterator
it = eventRangeMap.begin(),
304 itEnd = eventRangeMap.end();
310 std::string rangeID = eventRangeMap[
"eventRangeID"];
311 std::string
guid = eventRangeMap[
"GUID"];
312 int startEvent =
std::atoi(eventRangeMap[
"startEvent"].c_str());
313 int lastEvent =
std::atoi(eventRangeMap[
"lastEvent"].c_str());
316 || lastEvent < startEvent) {
317 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong values of range fields");
322 socket2Pilot->send(errorMessage,
errorStr.size());
326 std::string message2ProcessorStr;
327 char* message2Processor(0);
329 std::ostringstream ostr;
331 if(eventRangeMap.find(
"PFN")!=eventRangeMap.end()) {
332 ostr <<
"," <<
"PFN:" << eventRangeMap[
"PFN"];
334 ostr <<
"," << eventRangeMap[
"startEvent"]
335 <<
"," << eventRangeMap[
"lastEvent"];
336 message2ProcessorStr = ostr.str();
341 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors");
342 while(processorWaitRequest.empty()) {
343 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
347 ATH_MSG_INFO(
"One of the processors is ready for the next range");
349 workerPid =
std::atoi(processorWaitRequest.c_str());
357 message2Processor = (
char*)
malloc(message2ProcessorStr.size());
358 memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
359 socket2Processor->send(message2Processor,message2ProcessorStr.size());
364 processorWaitRequest.clear();
366 ATH_MSG_INFO(
"Sent response to the processor : " << message2ProcessorStr);
373 void* emptyMess4Processor(0);
374 if(!processorWaitRequest.empty()) {
376 emptyMess4Processor =
malloc(1);
377 socket2Processor->send(emptyMess4Processor,1);
378 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
379 processorWaitRequest.clear();
384 while(processorWaitRequest.empty()) {
385 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
394 workerPid =
std::atoi(processorWaitRequest.c_str());
399 emptyMess4Processor =
malloc(1);
400 socket2Processor->send(emptyMess4Processor,1);
401 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
402 ATH_MSG_INFO(
"Still " << procReportPending <<
" pending reports");
403 processorWaitRequest.clear();
412 if(
m_appMgr->finalize().isFailure()) {
413 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
420 *(
int*)(outwork->
data) = (all_ok?0:1);
427 delete socket2Processor;
429 delete socketFactory;
439 *(
int*)(outwork->
data) = 0;
451 if(
str.empty())
return;
455 while(
str[
i]==
' ')
i--;
456 if(
i)
str.resize(
i+1);
478 , yampl::ISocket* socket2Pilot
479 ,
int& procReportPending)
481 void* processor_request(0);
482 std::string strPeerId;
483 ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
485 if(processorRequestSize==-1)
return std::string(
"");
487 ATH_MSG_INFO(
"Processor reported event range processing error");
493 errorStr+=std::string(
"Not found in the input file");
496 errorStr+=std::string(
"Seek failed");
499 errorStr+=std::string(
"Failed to process event range");
502 errorStr+=std::string(
"Failed to make output file");
505 errorStr+=std::string(
"Failed to set input file");
512 socket2Pilot->send(errorMessage,
errorStr.size());
514 ATH_MSG_INFO(
"Error reported to the pilot. Reports pending: " << procReportPending);
515 return std::string(
"");
517 std::string strProcessorRequest((
const char*)processor_request,processorRequestSize);
518 ATH_MSG_INFO(
"Received request from a processor: " << strProcessorRequest);
521 void* outpFileNameMessage =
malloc(strProcessorRequest.size());
522 memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
523 socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
525 ATH_MSG_INFO(
"Output file reported to the pilot. Reports pending: " << procReportPending);
526 return std::string(
"");
528 return strProcessorRequest;
532 , yampl::ISocket* socket2Pilot
533 ,
int& procReportPending)
546 socket2Pilot->send(errorMessage,
errorStr.size());