186{
187 ATH_MSG_INFO(
"Exec function in the AthenaMP Token Scatterer PID=" << getpid());
188
189 yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
190
191 yampl::ISocket* socket2Pilot = socketFactory->createClientSocket(yampl::Channel(
m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA);
192 ATH_MSG_INFO(
"Created CLIENT socket for communicating Event Ranges with the Pilot");
193
195 yampl::ISocket* socket2Processor = socketFactory->createServerSocket(yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
196 ATH_MSG_INFO(
"Created SERVER socket to token processors: " << socket2ProcessorName);
197
198 bool all_ok=true;
199 int procReportPending(0);
200
201 AthenaInterprocess::SharedQueue* sharedFailedPidQueue(0);
203 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Failed PID Queue");
204 all_ok=false;
205 }
206
207
208 std::string strReady("Ready for events");
209 std::string strStopProcessing("No more events");
210 std::string processorWaitRequest("");
211 int workerPid{-1};
212
214
215 while(all_ok) {
216
218 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors ... ");
219 while(processorWaitRequest.empty()) {
220 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
222 usleep(1000);
223 }
224 ATH_MSG_INFO(
"One of the processors is ready for the next range");
225
226 workerPid = std::atoi(processorWaitRequest.c_str());
230 }
231 }
232
233
235 memcpy(ready_message,strReady.data(),strReady.size());
236 socket2Pilot->send(ready_message,strReady.size());
237 void* eventRangeMessage;
238 std::string strPeerId;
239 ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
240 std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
241 size_t carRet = eventRange.find('\n');
242 if(carRet!=std::string::npos)
243 eventRange.resize(carRet);
244
245
246 if(eventRange.find(strStopProcessing)!=std::string::npos) {
247 ATH_MSG_INFO(
"Stopped the loop. Last message from the Event Range Channel: " << eventRange);
248 break;
249 }
250 ATH_MSG_INFO(
"Got Event Range from the pilot: " << eventRange);
251
252
253
254
255 if(eventRange.starts_with( "[{")) eventRange=eventRange.substr(2);
256 if(eventRange.ends_with("}]")) eventRange.resize(eventRange.size()-2);
257
258 std::map<std::string,std::string> eventRangeMap;
259 size_t startpos(0);
260 size_t endpos = eventRange.find(',');
261 while(endpos!=std::string::npos) {
262
263 std::string
keyValue(eventRange.substr(startpos,endpos-startpos));
264 size_t colonPos =
keyValue.find(
':');
265 std::string strKey =
keyValue.substr(0,colonPos);
266 std::string strVal =
keyValue.substr(colonPos+1);
269 eventRangeMap[strKey]=std::move(strVal);
270
271 startpos = endpos+1;
272 endpos = eventRange.find(',',startpos);
273 }
274
275 std::string
keyValue(eventRange.substr(startpos));
276 size_t colonPos =
keyValue.find(
':');
277 std::string strKey =
keyValue.substr(0,colonPos);
278 std::string strVal =
keyValue.substr(colonPos+1);
281 eventRangeMap[strKey]=std::move(strVal);
282
283 if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
284 || eventRangeMap.find("startEvent")==eventRangeMap.end()
285 || eventRangeMap.find("lastEvent")==eventRangeMap.end()
286 || eventRangeMap.find("GUID")==eventRangeMap.end()) {
287 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong format");
292 socket2Pilot->send(errorMessage,
errorStr.size());
293 continue;
294 }
295 else {
297 std::map<std::string,std::string>::const_iterator
it = eventRangeMap.begin(),
298 itEnd = eventRangeMap.end();
302 }
303
304 std::string rangeID = eventRangeMap["eventRangeID"];
305 std::string
guid = eventRangeMap[
"GUID"];
306 int startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
307 int lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
308 if(rangeID.empty()
310 || lastEvent < startEvent) {
311 std::string
errorStr(
"ERR_ATHENAMP_PARSE \"" + eventRange +
"\": Wrong values of range fields");
316 socket2Pilot->send(errorMessage,
errorStr.size());
317 continue;
318 }
319
320 std::string message2ProcessorStr;
321 char* message2Processor(0);
322
323 std::ostringstream ostr;
324 ostr << rangeID;
325 if(eventRangeMap.find("PFN")!=eventRangeMap.end()) {
326 ostr << "," << "PFN:" << eventRangeMap["PFN"];
327 }
328 ostr << "," << eventRangeMap["startEvent"]
329 << "," << eventRangeMap["lastEvent"];
330 message2ProcessorStr = ostr.str();
331
332
333
335 ATH_MSG_DEBUG(
"Waiting for event range request from one of the processors");
336 while(processorWaitRequest.empty()) {
337 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
339 usleep(1000);
340 }
341 ATH_MSG_INFO(
"One of the processors is ready for the next range");
342
343 workerPid = std::atoi(processorWaitRequest.c_str());
347 }
348 }
349
350
352 memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
353 socket2Processor->send(message2Processor,message2ProcessorStr.size());
354 procReportPending++;
355
356
358 processorWaitRequest.clear();
359
360 ATH_MSG_INFO(
"Sent response to the processor : " << message2ProcessorStr);
361 }
362
363 if(all_ok) {
364
365
366
367 void* emptyMess4Processor(0);
368 if(!processorWaitRequest.empty()) {
369
371 socket2Processor->send(emptyMess4Processor,1);
372 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
373 processorWaitRequest.clear();
374 }
375 bool endLoop{false};
376 while(true) {
378 while(processorWaitRequest.empty()) {
379 processorWaitRequest =
getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
381 endLoop = true;
382 break;
383 }
384 usleep(1000);
385 }
386 if(endLoop) break;
387
388 workerPid = std::atoi(processorWaitRequest.c_str());
392 }
394 socket2Processor->send(emptyMess4Processor,1);
395 ATH_MSG_INFO(
"Set worker PID=" << workerPid <<
" free");
396 ATH_MSG_INFO(
"Still " << procReportPending <<
" pending reports");
397 processorWaitRequest.clear();
398 }
399 }
400
403 all_ok=false;
404 }
405 else {
406 if(
m_appMgr->finalize().isFailure()) {
407 std::cerr << "Unable to finalize AppMgr" << std::endl;
408 all_ok=false;
409 }
410 }
411
412 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
414 *(int*)(outwork->data) = (all_ok?0:1);
415 outwork->size =
sizeof(
int);
416
417
418
419
420
421 delete socket2Processor;
422 delete socket2Pilot;
423 delete socketFactory;
424
425 return outwork;
426}
void trimRangeStrings(std::string &)
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
Pid2RangeID m_pid2RangeID
Gaudi::Property< std::string > m_eventRangeChannel
std::string getNewRangeRequest(yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
Gaudi::Property< bool > m_doCaching
Gaudi::Property< std::string > m_processorChannel
retrieve(aClass, aKey=None)