187 ATH_MSG_INFO(
"Exec function in the AthenaMP Token Scatterer PID=" << getpid());
189 yampl::ISocketFactory* socketFactory =
new yampl::SocketFactory();
192 ATH_MSG_INFO(
"Created CLIENT socket for communicating Event Ranges with the Pilot");
195 yampl::ISocket* socket2Processor = socketFactory->createServerSocket(
yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
196 ATH_MSG_INFO(
"Created SERVER socket to token processors: " << socket2ProcessorName);
199 int procReportPending(0);
203 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
208 std::string strReady(
"Ready for events");
209 std::string strStopProcessing(
"No more events");
210 std::string processorWaitRequest(
"");
218 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors ... ");
219 while(processorWaitRequest.empty()) {
220 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
224 ATH_MSG_INFO(
"One of the processors is ready for the next range");
226 workerPid =
std::atoi(processorWaitRequest.c_str());
235 memcpy(ready_message,strReady.data(),strReady.size());
236 socket2Pilot->send(ready_message,strReady.size());
237 void* eventRangeMessage;
238 std::string strPeerId;
239 ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
240 std::string eventRange((
const char*)eventRangeMessage,eventRangeSize);
241 size_t carRet = eventRange.find(
'\n');
242 if(carRet!=std::string::npos)
243 eventRange.resize(carRet);
246 if(eventRange.find(strStopProcessing)!=std::string::npos) {
247 ATH_MSG_INFO(
"Stopped the loop. Last message from the Event Range Channel: " << eventRange);
250 ATH_MSG_INFO(
"Got Event Range from the pilot: " << eventRange);
255 if(eventRange.starts_with(
"[{")) eventRange=eventRange.substr(2);
256 if(eventRange.ends_with(
"}]")) eventRange.resize(eventRange.size()-2);
258 std::map<std::string,std::string> eventRangeMap;
260 size_t endpos = eventRange.find(
',');
261 while(endpos!=std::string::npos) {
263 std::string
keyValue(eventRange.substr(startpos,endpos-startpos));
264 size_t colonPos =
keyValue.find(
':');
265 std::string strKey =
keyValue.substr(0,colonPos);
266 std::string strVal =
keyValue.substr(colonPos+1);
269 eventRangeMap[strKey]=std::move(strVal);
272 endpos = eventRange.find(
',',startpos);
275 std::string
keyValue(eventRange.substr(startpos));
276 size_t colonPos =
keyValue.find(
':');
277 std::string strKey =
keyValue.substr(0,colonPos);
278 std::string strVal =
keyValue.substr(colonPos+1);
281 eventRangeMap[strKey]=std::move(strVal);
283 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
284 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
285 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
286 || eventRangeMap.find(
"GUID")==eventRangeMap.end()) {
287 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong format");
292 socket2Pilot->send(errorMessage,
errorStr.size());
297 std::map<std::string,std::string>::const_iterator
it = eventRangeMap.begin(),
298 itEnd = eventRangeMap.end();
304 std::string rangeID = eventRangeMap[
"eventRangeID"];
305 std::string
guid = eventRangeMap[
"GUID"];
306 int startEvent =
std::atoi(eventRangeMap[
"startEvent"].c_str());
307 int lastEvent =
std::atoi(eventRangeMap[
"lastEvent"].c_str());
310 || lastEvent < startEvent) {
311 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong values of range fields");
316 socket2Pilot->send(errorMessage,
errorStr.size());
320 std::string message2ProcessorStr;
321 char* message2Processor(0);
323 std::ostringstream ostr;
325 if(eventRangeMap.find(
"PFN")!=eventRangeMap.end()) {
326 ostr <<
"," <<
"PFN:" << eventRangeMap[
"PFN"];
328 ostr <<
"," << eventRangeMap[
"startEvent"]
329 <<
"," << eventRangeMap[
"lastEvent"];
330 message2ProcessorStr = ostr.str();
335 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors");
336 while(processorWaitRequest.empty()) {
337 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
341 ATH_MSG_INFO(
"One of the processors is ready for the next range");
343 workerPid =
std::atoi(processorWaitRequest.c_str());
352 memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
353 socket2Processor->send(message2Processor,message2ProcessorStr.size());
358 processorWaitRequest.clear();
360 ATH_MSG_INFO(
"Sent response to the processor : " << message2ProcessorStr);
367 void* emptyMess4Processor(0);
368 if(!processorWaitRequest.empty()) {
371 socket2Processor->send(emptyMess4Processor,1);
372 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
373 processorWaitRequest.clear();
378 while(processorWaitRequest.empty()) {
379 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
388 workerPid =
std::atoi(processorWaitRequest.c_str());
394 socket2Processor->send(emptyMess4Processor,1);
395 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
396 ATH_MSG_INFO(
"Still " << procReportPending <<
" pending reports");
397 processorWaitRequest.clear();
406 if(
m_appMgr->finalize().isFailure()) {
407 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
414 *(
int*)(outwork->data) = (all_ok?0:1);
415 outwork->size =
sizeof(
int);
421 delete socket2Processor;
423 delete socketFactory;