ATLAS Offline Software
Loading...
Searching...
No Matches
EvtRangeScatterer Class Referencefinalabstract

#include <EvtRangeScatterer.h>

Inheritance diagram for EvtRangeScatterer:
Collaboration diagram for EvtRangeScatterer:

Public Member Functions

 EvtRangeScatterer (const std::string &type, const std::string &name, const IInterface *parent)
virtual ~EvtRangeScatterer () override
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
virtual void subProcessLogs (std::vector< std::string > &) override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
virtual void reportSubprocessStatuses () override
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
virtual void setRandString (const std::string &randStr) override
virtual void setMaxEvt (int maxEvt) override
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
virtual void killChildren () override
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS , ESRANGE_NOTFOUND , ESRANGE_SEEKFAILED , ESRANGE_PROCFAILED ,
  ESRANGE_FILENOTMADE , ESRANGE_BADINPFILE
}
enum  Func_Flag { FUNC_BOOTSTRAP , FUNC_EXEC , FUNC_FIN }

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
int updateIoReg (const std::string &rundir)
std::string fmterror (int errnum)
int reopenFds ()
int handleSavedPfc (const std::filesystem::path &dest_path)
void waitForSignal ()
IEvtSelector * evtSelector ()

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process.
int m_maxEvt {-1}
 Maximum number of events assigned to the job.
std::string m_subprocTopDir
 Top run directory for subprocesses.
std::string m_subprocDirPrefix
 For ex. "worker__".
std::string m_evtSelName
 Name of the event selector.
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
ServiceHandle< IEventProcessor > m_evtProcessor
ServiceHandle< IAppMgrUI > m_appMgr
ServiceHandle< IFileMgr > m_fileMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
SmartIF< IEvtSelector > m_evtSelector
std::string m_fileMgrLog
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
std::string m_randStr
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}

Private Member Functions

 EvtRangeScatterer ()
 EvtRangeScatterer (const EvtRangeScatterer &)
EvtRangeScattereroperator= (const EvtRangeScatterer &)
void trimRangeStrings (std::string &)
std::string getNewRangeRequest (yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
pid_t pollFailedPidQueue (AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
int reopenFd (int fd, const std::string &name)

Private Attributes

Gaudi::Property< std::string > m_processorChannel {this, "ProcessorChannel", ""}
Gaudi::Property< std::string > m_eventRangeChannel {this, "EventRangeChannel", ""}
Gaudi::Property< bool > m_doCaching {this, "DoCaching", false}
Pid2RangeID m_pid2RangeID

Detailed Description

Definition at line 18 of file EvtRangeScatterer.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

Constructor & Destructor Documentation

◆ EvtRangeScatterer() [1/3]

EvtRangeScatterer::EvtRangeScatterer ( const std::string & type,
const std::string & name,
const IInterface * parent )

Definition at line 27 of file EvtRangeScatterer.cxx.

30 : AthenaMPToolBase(type,name,parent)
31{
32 m_subprocDirPrefix = "range_scatterer";
33}
std::string m_subprocDirPrefix
For ex. "worker__".

◆ ~EvtRangeScatterer()

EvtRangeScatterer::~EvtRangeScatterer ( )
overridevirtual

Definition at line 35 of file EvtRangeScatterer.cxx.

36{
37}

◆ EvtRangeScatterer() [2/3]

EvtRangeScatterer::EvtRangeScatterer ( )
private

◆ EvtRangeScatterer() [3/3]

EvtRangeScatterer::EvtRangeScatterer ( const EvtRangeScatterer & )
private

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/4]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag flag,
pid_t pid = 0 )
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [2/4]

virtual StatusCode wait_once AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( pid_t & pid)
overridevirtualinherited

◆ ATLAS_NOT_THREAD_SAFE() [3/4]

virtual StatusCode exec EvtRangeScatterer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [4/4]

virtual int makePool EvtRangeScatterer::ATLAS_NOT_THREAD_SAFE ( int maxevt,
int nprocs,
const std::string & topdir )
overridevirtual

◆ bootstrap_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::bootstrap_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 109 of file EvtRangeScatterer.cxx.

110{
111 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
112 outwork->data = CxxUtils::xmalloc(sizeof(int));
113 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
114 outwork->size = sizeof(int);
115
116 // ...
117 // (possible) TODO: extend outwork with some error message, which will be eventually
118 // reported in the master proces
119 // ...
120
121 // Reader dir: mkdir
122 std::filesystem::path reader_rundir(m_subprocTopDir);
123 reader_rundir /= std::filesystem::path(m_subprocDirPrefix);
124
125 if(mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
126 ATH_MSG_ERROR("Unable to make reader run directory: " << reader_rundir.string() << ". " << fmterror(errno));
127 return outwork;
128 }
129
130 // Redirect logs
131 if(redirectLog(reader_rundir.string()))
132 return outwork;
133
134 ATH_MSG_INFO("Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
135
136 // Update Io Registry
137 if(updateIoReg(reader_rundir.string()))
138 return outwork;
139
140 ATH_MSG_INFO("Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
141
142 // _______________________ Handle saved PFC (if any) ______________________
143 std::filesystem::path abs_reader_rundir = std::filesystem::absolute(reader_rundir);
144 if(handleSavedPfc(abs_reader_rundir))
145 return outwork;
146
147 // Reopen file descriptors
148 if(reopenFds())
149 return outwork;
150
151 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
152
153 // ________________________ I/O reinit ________________________
154 if(!m_ioMgr->io_reinitialize().isSuccess()) {
155 ATH_MSG_ERROR("Failed to reinitialize I/O");
156 return outwork;
157 } else {
158 ATH_MSG_DEBUG("Successfully reinitialized I/O");
159 }
160
161 // Start the event selector
162 SmartIF<IService> evtSelSvc(m_evtSelector);
163 if(!evtSelSvc) {
164 ATH_MSG_ERROR("Failed to dyncast event selector to IService");
165 return outwork;
166 }
167 if(!evtSelSvc->start().isSuccess()) {
168 ATH_MSG_ERROR("Failed to restart the event selector");
169 return outwork;
170 } else {
171 ATH_MSG_DEBUG("Successfully restarted the event selector");
172 }
173
174 // Reader dir: chdir
175 if(chdir(reader_rundir.string().c_str())==-1) {
176 ATH_MSG_ERROR("Failed to chdir to " << reader_rundir.string());
177 return outwork;
178 }
179
180 // Declare success and return
181 *(int*)(outwork->data) = 0;
182 return outwork;
183}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
SmartIF< IEvtSelector > m_evtSelector
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IIoComponentMgr > m_ioMgr
std::string fmterror(int errnum)
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
mkdir(path, recursive=True)

◆ evtSelector()

IEvtSelector * AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83{ return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 185 of file EvtRangeScatterer.cxx.

186{
187 ATH_MSG_INFO("Exec function in the AthenaMP Token Scatterer PID=" << getpid());
188
189 yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
190 // Create a socket to communicate with the Pilot
191 yampl::ISocket* socket2Pilot = socketFactory->createClientSocket(yampl::Channel(m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA);
192 ATH_MSG_INFO("Created CLIENT socket for communicating Event Ranges with the Pilot");
193 // Create a socket to communicate with EvtRangeProcessors
194 std::string socket2ProcessorName = m_processorChannel.value() + std::string("_") + m_randStr;
195 yampl::ISocket* socket2Processor = socketFactory->createServerSocket(yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
196 ATH_MSG_INFO("Created SERVER socket to token processors: " << socket2ProcessorName);
197
198 bool all_ok=true;
199 int procReportPending(0); // Keep track of how many output files are yet to be reported by Token Processors
200
201 AthenaInterprocess::SharedQueue* sharedFailedPidQueue(0);
202 if(detStore()->retrieve(sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
203 ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
204 all_ok=false;
205 }
206
207 // Communication protocol with the Pilot
208 std::string strReady("Ready for events");
209 std::string strStopProcessing("No more events");
210 std::string processorWaitRequest("");
211 int workerPid{-1};
212
213 ATH_MSG_INFO("Starting main loop");
214
215 while(all_ok) {
216 // NO CACHING MODE: first get a request from one of the processors and only after that request the next event range from the pilot
217 if(!m_doCaching && processorWaitRequest.empty()) {
218 ATH_MSG_DEBUG("Waiting for event range request from one of the processors ... ");
219 while(processorWaitRequest.empty()) {
220 processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
221 pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
222 usleep(1000);
223 }
224 ATH_MSG_INFO("One of the processors is ready for the next range");
225 // Get PID from the request and Update m_pid2RangeID
226 workerPid = std::atoi(processorWaitRequest.c_str());
227 auto it = m_pid2RangeID.find(workerPid);
228 if(it!=m_pid2RangeID.end()) {
229 m_pid2RangeID.erase(it);
230 }
231 }
232
233 // Signal the Pilot that AthenaMP is ready for event processing
234 void* ready_message = CxxUtils::xmalloc(strReady.size());
235 memcpy(ready_message,strReady.data(),strReady.size());
236 socket2Pilot->send(ready_message,strReady.size());
237 void* eventRangeMessage;
238 std::string strPeerId;
239 ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
240 std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
241 size_t carRet = eventRange.find('\n');
242 if(carRet!=std::string::npos)
243 eventRange.resize(carRet);
244
245 // Break the loop if no more ranges are expected
246 if(eventRange.find(strStopProcessing)!=std::string::npos) {
247 ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
248 break;
249 }
250 ATH_MSG_INFO("Got Event Range from the pilot: " << eventRange);
251
252 // Parse the Event Range string
253 // Expected the following format: [{KEY:VALUE[,KEY:VALUE]}]
254 // First get rid of the leading '[{' and the trailing '}]'
255 if(eventRange.starts_with( "[{")) eventRange=eventRange.substr(2);
256 if(eventRange.ends_with("}]")) eventRange.resize(eventRange.size()-2);
257
258 std::map<std::string,std::string> eventRangeMap;
259 size_t startpos(0);
260 size_t endpos = eventRange.find(',');
261 while(endpos!=std::string::npos) {
262 // Get the Key-Value pair
263 std::string keyValue(eventRange.substr(startpos,endpos-startpos));
264 size_t colonPos = keyValue.find(':');
265 std::string strKey = keyValue.substr(0,colonPos);
266 std::string strVal = keyValue.substr(colonPos+1);
267 trimRangeStrings(strKey);
268 trimRangeStrings(strVal);
269 eventRangeMap[strKey]=std::move(strVal);
270 // Next iteration
271 startpos = endpos+1;
272 endpos = eventRange.find(',',startpos);
273 }
274 // Get the final Key-Value pair
275 std::string keyValue(eventRange.substr(startpos));
276 size_t colonPos = keyValue.find(':');
277 std::string strKey = keyValue.substr(0,colonPos);
278 std::string strVal = keyValue.substr(colonPos+1);
279 trimRangeStrings(strKey);
280 trimRangeStrings(strVal);
281 eventRangeMap[strKey]=std::move(strVal);
282
283 if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
284 || eventRangeMap.find("startEvent")==eventRangeMap.end()
285 || eventRangeMap.find("lastEvent")==eventRangeMap.end()
286 || eventRangeMap.find("GUID")==eventRangeMap.end()) {
287 std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong format");
288 ATH_MSG_ERROR(errorStr);
289 ATH_MSG_INFO("Ignoring this event range ");
290 void* errorMessage = CxxUtils::xmalloc(errorStr.size());
291 memcpy(errorMessage,errorStr.data(),errorStr.size());
292 socket2Pilot->send(errorMessage,errorStr.size());
293 continue;
294 }
295 else {
296 ATH_MSG_DEBUG("*** Decoded Event Range ***");
297 std::map<std::string,std::string>::const_iterator it = eventRangeMap.begin(),
298 itEnd = eventRangeMap.end();
299 for(;it!=itEnd;++it)
300 ATH_MSG_DEBUG(it->first << ":" << it->second);
301 ATH_MSG_DEBUG("*** ***");
302 }
303
304 std::string rangeID = eventRangeMap["eventRangeID"];
305 std::string guid = eventRangeMap["GUID"];
306 int startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
307 int lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
308 if(rangeID.empty()
309 || guid.empty()
310 || lastEvent < startEvent) {
311 std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong values of range fields");
312 ATH_MSG_ERROR(errorStr);
313 ATH_MSG_INFO("Ignoring this event range ");
314 void* errorMessage = CxxUtils::xmalloc(errorStr.size());
315 memcpy(errorMessage,errorStr.data(),errorStr.size());
316 socket2Pilot->send(errorMessage,errorStr.size());
317 continue;
318 }
319
320 std::string message2ProcessorStr;
321 char* message2Processor(0);
322
323 std::ostringstream ostr;
324 ostr << rangeID;
325 if(eventRangeMap.find("PFN")!=eventRangeMap.end()) {
326 ostr << "," << "PFN:" << eventRangeMap["PFN"];
327 }
328 ostr << "," << eventRangeMap["startEvent"]
329 << "," << eventRangeMap["lastEvent"];
330 message2ProcessorStr = ostr.str();
331
332 // CACHING MODE: first get an event range from the pilot, transform it into the tokens
333 // and only after that wait for a new range request by one of the processors
334 if(m_doCaching) {
335 ATH_MSG_DEBUG("Waiting for event range request from one of the processors");
336 while(processorWaitRequest.empty()) {
337 processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
338 pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
339 usleep(1000);
340 }
341 ATH_MSG_INFO("One of the processors is ready for the next range");
342 // Get PID from the request and Update m_pid2RangeID
343 workerPid = std::atoi(processorWaitRequest.c_str());
344 auto it = m_pid2RangeID.find(workerPid);
345 if(it!=m_pid2RangeID.end()) {
346 m_pid2RangeID.erase(it);
347 }
348 }
349
350 // Send to the Processor: RangeID,evtToken[,evtToken]
351 message2Processor = (char*)CxxUtils::xmalloc(message2ProcessorStr.size());
352 memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
353 socket2Processor->send(message2Processor,message2ProcessorStr.size());
354 procReportPending++;
355
356 // Get PID from the request and Update m_pid2RangeID
357 m_pid2RangeID[workerPid] = std::move(rangeID);
358 processorWaitRequest.clear();
359
360 ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
361 }
362
363 if(all_ok) {
364 // We are done distributing event tokens.
365 // Tell the workers that the event processing is over
366 // i.e. send out m_nprocs empty messages
367 void* emptyMess4Processor(0);
368 if(!processorWaitRequest.empty()) {
369 // We already have one processor waiting for the answer
370 emptyMess4Processor = CxxUtils::xmalloc(1);
371 socket2Processor->send(emptyMess4Processor,1);
372 ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
373 processorWaitRequest.clear();
374 }
375 bool endLoop{false};
376 while(true) {
377 ATH_MSG_DEBUG("Going to set another processor free");
378 while(processorWaitRequest.empty()) {
379 processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
380 if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
381 endLoop = true;
382 break;
383 }
384 usleep(1000);
385 }
386 if(endLoop) break;
387 // Remove worker from m_pid2RangeID
388 workerPid = std::atoi(processorWaitRequest.c_str());
389 auto it = m_pid2RangeID.find(workerPid);
390 if(it!=m_pid2RangeID.end()) {
391 m_pid2RangeID.erase(it);
392 }
393 emptyMess4Processor = CxxUtils::xmalloc(1);
394 socket2Processor->send(emptyMess4Processor,1);
395 ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
396 ATH_MSG_INFO("Still " << procReportPending << " pending reports");
397 processorWaitRequest.clear();
398 }
399 }
400
401 if(m_appMgr->stop().isFailure()) {
402 ATH_MSG_ERROR("Unable to stop AppMgr");
403 all_ok=false;
404 }
405 else {
406 if(m_appMgr->finalize().isFailure()) {
407 std::cerr << "Unable to finalize AppMgr" << std::endl;
408 all_ok=false;
409 }
410 }
411
412 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
413 outwork->data = CxxUtils::xmalloc(sizeof(int));
414 *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
415 outwork->size = sizeof(int);
416
417 // ...
418 // (possible) TODO: extend outwork with some error message, which will be eventually
419 // reported in the master proces
420 // ...
421 delete socket2Processor;
422 delete socket2Pilot;
423 delete socketFactory;
424
425 return outwork;
426}
ServiceHandle< IAppMgrUI > m_appMgr
void trimRangeStrings(std::string &)
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
Pid2RangeID m_pid2RangeID
Gaudi::Property< std::string > m_eventRangeChannel
std::string getNewRangeRequest(yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
Gaudi::Property< bool > m_doCaching
Gaudi::Property< std::string > m_processorChannel
retrieve(aClass, aKey=None)
Definition PyKernel.py:110

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 428 of file EvtRangeScatterer.cxx.

429{
430 // Dummy
431 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
432 outwork->data = CxxUtils::xmalloc(sizeof(int));
433 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
434 outwork->size = sizeof(int);
435 return outwork;
436}

◆ finalize()

StatusCode EvtRangeScatterer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 49 of file EvtRangeScatterer.cxx.

50{
51 return StatusCode::SUCCESS;
52}

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334{
335 char buf[256];
336 strerror_r(errnum, buf, sizeof(buf));
337 return std::string(buf);
338}

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr EvtRangeScatterer::generateOutputReport ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 103 of file EvtRangeScatterer.cxx.

104{
106 return jobOutputs;
107}
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr

◆ getNewRangeRequest()

std::string EvtRangeScatterer::getNewRangeRequest ( yampl::ISocket * socket2Processor,
yampl::ISocket * socket2Pilot,
int & procReportPending )
private

Definition at line 471 of file EvtRangeScatterer.cxx.

474{
475 void* processor_request(0);
476 std::string strPeerId;
477 ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
478
479 if(processorRequestSize==-1) return std::string("");
480 if(processorRequestSize==sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status)) {
481 ATH_MSG_INFO("Processor reported event range processing error");
482 pid_t pid = *((pid_t*)processor_request);
483 AthenaMPToolBase::ESRange_Status status = *reinterpret_cast<AthenaMPToolBase::ESRange_Status*>((pid_t*)processor_request+1);
484 std::string errorStr("ERR_ATHENAMP_PROCESS "+ m_pid2RangeID[pid] + ": ");
485 switch(status) {
487 errorStr+=std::string("Not found in the input file");
488 break;
490 errorStr+=std::string("Seek failed");
491 break;
493 errorStr+=std::string("Failed to process event range");
494 break;
496 errorStr+=std::string("Failed to make output file");
497 break;
499 errorStr+=std::string("Failed to set input file");
500 break;
501 default:
502 break;
503 }
504 void* errorMessage = CxxUtils::xmalloc(errorStr.size());
505 memcpy(errorMessage,errorStr.data(),errorStr.size());
506 socket2Pilot->send(errorMessage,errorStr.size());
507 procReportPending--;
508 ATH_MSG_INFO("Error reported to the pilot. Reports pending: " << procReportPending);
509 return std::string("");
510 }
511 std::string strProcessorRequest((const char*)processor_request,processorRequestSize);
512 ATH_MSG_INFO("Received request from a processor: " << strProcessorRequest);
513 // Decode the request. If it contains output file name then pass it over to the pilot and return empty string
514 if(strProcessorRequest.starts_with( "/")) {
515 void* outpFileNameMessage = CxxUtils::xmalloc(strProcessorRequest.size());
516 memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
517 socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
518 procReportPending--;
519 ATH_MSG_INFO("Output file reported to the pilot. Reports pending: " << procReportPending);
520 return std::string("");
521 }
522 return strProcessorRequest;
523}
int32_t pid_t
status
Definition merge.py:16

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path & dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397{
398 if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399 COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400 return 0;
401}
#define COPY_FILE_HACK(_src, _dest)

◆ initialize()

StatusCode EvtRangeScatterer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 39 of file EvtRangeScatterer.cxx.

40{
41 ATH_MSG_DEBUG("In initialize");
42
44 if(!sc.isSuccess()) return sc;
45
46 return StatusCode::SUCCESS;
47}
static Double_t sc
virtual StatusCode initialize() override
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202{
203 for(const AthenaInterprocess::Process& child : m_processGroup->getChildren()) {
204 kill(child.getProcessID(),SIGKILL);
205 }
206}
AthenaInterprocess::ProcessGroup * m_processGroup

◆ operator()

virtual std::unique_ptr< ScheduledWork > AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ operator=()

◆ pollFailedPidQueue()

pid_t EvtRangeScatterer::pollFailedPidQueue ( AthenaInterprocess::SharedQueue * sharedFailedPidQueue,
yampl::ISocket * socket2Pilot,
int & procReportPending )
private

Definition at line 525 of file EvtRangeScatterer.cxx.

528{
529 pid_t pid{0};
530 if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
531 && pid!=-1) {
532 ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
533 auto itPid = m_pid2RangeID.find(pid);
534 if(itPid!=m_pid2RangeID.end()) {
535 ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
536
537 std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
538 void* errorMessage = CxxUtils::xmalloc(errorStr.size());
539 memcpy(errorMessage,errorStr.data(),errorStr.size());
540 socket2Pilot->send(errorMessage,errorStr.size());
541 --procReportPending;
542 m_pid2RangeID.erase(pid);
543 }
544 ATH_MSG_INFO("Reports pending: " << procReportPending);
545 }
546 return pid;
547}
#define ATH_MSG_WARNING(x)

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string & rundir,
bool addTimeStamp = true )
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270{
271 // Redirect both stdout and stderr to the same file AthenaMP.log
272 int dup2result1(0), dup2result2(0);
273
274 int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275 if(newout==-1) {
276 ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277 return -1;
278 }
279 dup2result1 = dup2(newout, STDOUT_FILENO);
280 dup2result2 = dup2(newout, STDERR_FILENO);
281 TEMP_FAILURE_RETRY(close(newout));
282 if(dup2result1==-1) {
283 ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284 return -1;
285 }
286 if(dup2result2==-1) {
287 ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288 return -1;
289 }
290
291 if(addTimeStamp) {
292 SmartIF<IProperty> propertyServer(msgSvc());
293 if(propertyServer==0) {
294 ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295 return -1;
296 }
297
298 std::string propertyName("Format");
299 std::string oldFormat("");
300 StringProperty formatProp(propertyName,oldFormat);
301 StatusCode sc = propertyServer->getProperty(&formatProp);
302 if(sc.isFailure()) {
303 ATH_MSG_WARNING("Message Service does not have Format property");
304 }
305 else {
306 oldFormat = formatProp.value();
307 if(oldFormat.find("%t")==std::string::npos) {
308 // Add time stamps
309 std::string newFormat("%t " + oldFormat);
310 StringProperty newFormatProp(std::move(propertyName),newFormat);
311 ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312 }
313 else {
314 ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315 }
316 }
317 }
318
319 return 0;
320}
#define ATH_CHECK
Evaluate an expression and check for errors.
msgSvc
Provide convenience handles for various services.
Definition StdJOSetup.py:36
@ open
Definition BinningType.h:40

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int fd,
const std::string & name )
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420{
421 ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422 int old_openflags = fcntl(fd,F_GETFL,0);
423 switch(old_openflags & O_ACCMODE) {
424 case O_RDONLY: {
425 ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426 break;
427 }
428 case O_WRONLY: {
429 ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430 break;
431 }
432 case O_RDWR: {
433 ATH_MSG_DEBUG("The File Access Mode is RDWR");
434 break;
435 }
436 }
437
438 int old_descflags = fcntl(fd,F_GETFD,0);
439 off_t oldpos = lseek(fd,0,SEEK_CUR);
440 if(oldpos==-1) {
441 if(errno==ESPIPE) {
442 ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443 }
444 else {
445 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446 return -1;
447 }
448 }
449 else {
450 Io::Fd newfd = open(name.c_str(),old_openflags);
451 if(newfd==-1) {
452 ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453 return -1;
454 }
455 if(lseek(newfd,oldpos,SEEK_SET)==-1){
456 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457 TEMP_FAILURE_RETRY(close(newfd));
458 return -1;
459 }
460 TEMP_FAILURE_RETRY(close(fd));
461 if(dup2(newfd,fd)==-1) {
462 ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463 TEMP_FAILURE_RETRY(close(newfd));
464 return -1;
465 }
466 if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467 ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468 TEMP_FAILURE_RETRY(close(newfd));
469 return -1;
470 }
471 TEMP_FAILURE_RETRY(close(newfd));
472 }
473 return 0;
474}

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341{
342 // Reopen file descriptors.
343 // First go over all open files, which have been registered with the FileMgr
344 // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345 std::set<int> fdLog;
346
347 // Query the FileMgr contents
348 std::vector<const Io::FileAttr*> filemgrFiles;
349 std::vector<const Io::FileAttr*>::const_iterator itFile;
350 unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351 if(filenum!=filemgrFiles.size())
352 ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353
354 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355 ATH_MSG_DEBUG("* " << **itFile);
356 const std::string& filename = (**itFile).name();
357 Io::Fd fd = (**itFile).fd();
358
359 if(fd==-1) {
360 // It is legal to have fd=-1 for remote inputs
361 // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362 // So, this hopefully is a temporary patch
363 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364 continue;
365 }
366
367 if(reopenFd(fd,filename))
368 return -1;
369
370 fdLog.insert(fd);
371 }
372
373 // Check the FdsRegistry
374 for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376 ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377 }
378 else {
379 ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380
381 if(regEntry.fd==-1) {
382 // Same protection as the one above
383 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384 continue;
385 }
386
387 if(reopenFd(regEntry.fd,regEntry.name))
388 return -1;
389
390 fdLog.insert(regEntry.fd);
391 }
392 }
393 return 0;
394}
int reopenFd(int fd, const std::string &name)
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
ServiceHandle< IFileMgr > m_fileMgr

◆ reportSubprocessStatuses()

void AthenaMPToolBase::reportSubprocessStatuses ( )
overridevirtualinherited

Reimplemented in EvtRangeProcessor, SharedEvtQueueConsumer, and SharedHiveEvtQueueConsumer.

Definition at line 111 of file AthenaMPToolBase.cxx.

112{
113 ATH_MSG_INFO("Statuses of sub-processes");
114 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
115 for(size_t i=0; i<statuses.size(); ++i)
116 ATH_MSG_INFO("*** Process PID=" << statuses[i].pid << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS"));
117}
list statuses

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44{m_maxEvt=maxEvt;}
int m_maxEvt
Maximum number of events assigned to the job.

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop * runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45{m_mpRunStop=runStop;}
const AthenaInterprocess::IMPRunStop * m_mpRunStop

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string & randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197{
198 m_randStr = randStr;
199}

◆ subProcessLogs()

void EvtRangeScatterer::subProcessLogs ( std::vector< std::string > & filenames)
overridevirtual

Definition at line 95 of file EvtRangeScatterer.cxx.

96{
97 filenames.clear();
98 std::filesystem::path reader_rundir(m_subprocTopDir);
99 reader_rundir/= std::filesystem::path(m_subprocDirPrefix);
100 filenames.push_back(reader_rundir.string()+std::string("/AthenaMP.log"));
101}
list filenames
Definition grepfile.py:34

◆ trimRangeStrings()

void EvtRangeScatterer::trimRangeStrings ( std::string & str)
private

Definition at line 438 of file EvtRangeScatterer.cxx.

439{
440 size_t i(0);
441 // get rid of leading spaces
442 while(i<str.size() && str[i]==' ') i++;
443 if(i) str = str.substr(i);
444
445 if(str.empty()) return; // Corner case: string consists only of spaces
446
447 // get rid of trailing spaces
448 i=str.size()-1;
449 while(str[i]==' ') i--;
450 if(i) str.resize(i+1);
451
452 // the string might be enclosed by either
453 // "u\'" and "\'"
454 // or
455 // "\"" and "\""
456 // Get rid of them!
457 if(str.starts_with( "u\'")) {
458 str = str.substr(2);
459 if(str.ends_with( "\'")) {
460 str.resize(str.size()-1);
461 }
462 }
463 else if(str.starts_with( "\"")) {
464 str = str.substr(1);
465 if(str.ends_with( "\"")) {
466 str.resize(str.size()-1);
467 }
468 }
469}

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string & rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323{
324 ATH_CHECK(m_ioMgr.retrieve(), -1);
325
326 // update the IoRegistry for the new workdir - make sure we use absolute path
327 std::filesystem::path abs_rundir = std::filesystem::absolute(rundir);
328 ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329
330 return 0;
331}

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry > registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192{
193 m_fdsRegistry = std::move(registry);
194}

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404{
405 ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406 sigset_t mask, oldmask;
407
409
410 sigemptyset (&mask);
411 sigaddset (&mask, SIGUSR1);
412
413 sigprocmask (SIG_BLOCK, &mask, &oldmask);
415 sigsuspend (&oldmask);
416 sigprocmask (SIG_UNBLOCK, &mask, NULL);
417}
#define sigemptyset(x)
Definition SealSignal.h:82
#define sigaddset(x, y)
Definition SealSignal.h:84
int sigset_t
Definition SealSignal.h:80
std::atomic< bool > sig_done

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_doCaching

Gaudi::Property<bool> EvtRangeScatterer::m_doCaching {this, "DoCaching", false}
private

Definition at line 65 of file EvtRangeScatterer.h.

65{this, "DoCaching", false};

◆ m_eventRangeChannel

Gaudi::Property<std::string> EvtRangeScatterer::m_eventRangeChannel {this, "EventRangeChannel", ""}
private

Definition at line 64 of file EvtRangeScatterer.h.

64{this, "EventRangeChannel", ""};

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

103{this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"};

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

86{-1};

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

92{nullptr};

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

85{-1};

◆ m_pid2RangeID

Pid2RangeID EvtRangeScatterer::m_pid2RangeID
private

Definition at line 67 of file EvtRangeScatterer.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

91{nullptr};

◆ m_processorChannel

Gaudi::Property<std::string> EvtRangeScatterer::m_processorChannel {this, "ProcessorChannel", ""}
private

Definition at line 63 of file EvtRangeScatterer.h.

63{this, "ProcessorChannel", ""};

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.


The documentation for this class was generated from the following files: