ATLAS Offline Software
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
EvtRangeScatterer Class Referencefinalabstract

#include <EvtRangeScatterer.h>

Inheritance diagram for EvtRangeScatterer:
Collaboration diagram for EvtRangeScatterer:

Public Member Functions

 EvtRangeScatterer (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~EvtRangeScatterer () override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process. More...
 
int m_maxEvt {-1}
 Maximum number of events assigned to the job. More...
 
std::string m_subprocTopDir
 Top run directory for subprocesses. More...
 
std::string m_subprocDirPrefix
 For ex. "worker__". More...
 
std::string m_evtSelName
 Name of the event selector. More...
 
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Member Functions

 EvtRangeScatterer ()
 
 EvtRangeScatterer (const EvtRangeScatterer &)
 
EvtRangeScattereroperator= (const EvtRangeScatterer &)
 
void trimRangeStrings (std::string &)
 
std::string getNewRangeRequest (yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
 
pid_t pollFailedPidQueue (AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
 
int reopenFd (int fd, const std::string &name)
 

Private Attributes

Gaudi::Property< std::string > m_processorChannel {this, "ProcessorChannel", ""}
 
Gaudi::Property< std::string > m_eventRangeChannel {this, "EventRangeChannel", ""}
 
Gaudi::Property< bool > m_doCaching {this, "DoCaching", false}
 
Pid2RangeID m_pid2RangeID
 

Detailed Description

Definition at line 18 of file EvtRangeScatterer.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

67  {
69  , FUNC_EXEC
70  , FUNC_FIN
71  };

Constructor & Destructor Documentation

◆ EvtRangeScatterer() [1/3]

EvtRangeScatterer::EvtRangeScatterer ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 27 of file EvtRangeScatterer.cxx.

31 {
32  m_subprocDirPrefix = "range_scatterer";
33 }

◆ ~EvtRangeScatterer()

EvtRangeScatterer::~EvtRangeScatterer ( )
overridevirtual

Definition at line 35 of file EvtRangeScatterer.cxx.

36 {
37 }

◆ EvtRangeScatterer() [2/3]

EvtRangeScatterer::EvtRangeScatterer ( )
private

◆ EvtRangeScatterer() [3/3]

EvtRangeScatterer::EvtRangeScatterer ( const EvtRangeScatterer )
private

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/4]

virtual StatusCode exec EvtRangeScatterer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [2/4]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [3/4]

virtual int makePool EvtRangeScatterer::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [4/4]

virtual StatusCode wait_once AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtualinherited

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::bootstrap_func ( )
overridevirtual

Definition at line 109 of file EvtRangeScatterer.cxx.

110 {
111  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
112  outwork->data = CxxUtils::xmalloc(sizeof(int));
113  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
114  outwork->size = sizeof(int);
115 
116  // ...
117  // (possible) TODO: extend outwork with some error message, which will be eventually
118  // reported in the master proces
119  // ...
120 
121  // Reader dir: mkdir
122  std::filesystem::path reader_rundir(m_subprocTopDir);
123  reader_rundir /= std::filesystem::path(m_subprocDirPrefix);
124 
125  if(mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
126  ATH_MSG_ERROR("Unable to make reader run directory: " << reader_rundir.string() << ". " << fmterror(errno));
127  return outwork;
128  }
129 
130  // Redirect logs
131  if(redirectLog(reader_rundir.string()))
132  return outwork;
133 
134  ATH_MSG_INFO("Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
135 
136  // Update Io Registry
137  if(updateIoReg(reader_rundir.string()))
138  return outwork;
139 
140  ATH_MSG_INFO("Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
141 
142  // _______________________ Handle saved PFC (if any) ______________________
143  std::filesystem::path abs_reader_rundir = std::filesystem::absolute(reader_rundir);
144  if(handleSavedPfc(abs_reader_rundir))
145  return outwork;
146 
147  // Reopen file descriptors
148  if(reopenFds())
149  return outwork;
150 
151  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
152 
153  // ________________________ I/O reinit ________________________
154  if(!m_ioMgr->io_reinitialize().isSuccess()) {
155  ATH_MSG_ERROR("Failed to reinitialize I/O");
156  return outwork;
157  } else {
158  ATH_MSG_DEBUG("Successfully reinitialized I/O");
159  }
160 
161  // Start the event selector
162  SmartIF<IService> evtSelSvc(m_evtSelector);
163  if(!evtSelSvc) {
164  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
165  return outwork;
166  }
167  if(!evtSelSvc->start().isSuccess()) {
168  ATH_MSG_ERROR("Failed to restart the event selector");
169  return outwork;
170  } else {
171  ATH_MSG_DEBUG("Successfully restarted the event selector");
172  }
173 
174  // Reader dir: chdir
175  if(chdir(reader_rundir.string().c_str())==-1) {
176  ATH_MSG_ERROR("Failed to chdir to " << reader_rundir.string());
177  return outwork;
178  }
179 
180  // Declare success and return
181  *(int*)(outwork->data) = 0;
182  return outwork;
183 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83 { return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 185 of file EvtRangeScatterer.cxx.

186 {
187  ATH_MSG_INFO("Exec function in the AthenaMP Token Scatterer PID=" << getpid());
188 
189  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
190  // Create a socket to communicate with the Pilot
191  yampl::ISocket* socket2Pilot = socketFactory->createClientSocket(yampl::Channel(m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA);
192  ATH_MSG_INFO("Created CLIENT socket for communicating Event Ranges with the Pilot");
193  // Create a socket to communicate with EvtRangeProcessors
194  std::string socket2ProcessorName = m_processorChannel.value() + std::string("_") + m_randStr;
195  yampl::ISocket* socket2Processor = socketFactory->createServerSocket(yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
196  ATH_MSG_INFO("Created SERVER socket to token processors: " << socket2ProcessorName);
197 
198  bool all_ok=true;
199  int procReportPending(0); // Keep track of how many output files are yet to be reported by Token Processors
200 
201  AthenaInterprocess::SharedQueue* sharedFailedPidQueue(0);
202  if(detStore()->retrieve(sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
203  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
204  all_ok=false;
205  }
206 
207  // Communication protocol with the Pilot
208  std::string strReady("Ready for events");
209  std::string strStopProcessing("No more events");
210  std::string processorWaitRequest("");
211  int workerPid{-1};
212 
213  ATH_MSG_INFO("Starting main loop");
214 
215  while(all_ok) {
216  // NO CACHING MODE: first get a request from one of the processors and only after that request the next event range from the pilot
217  if(!m_doCaching && processorWaitRequest.empty()) {
218  ATH_MSG_DEBUG("Waiting for event range request from one of the processors ... ");
219  while(processorWaitRequest.empty()) {
220  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
221  pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
222  usleep(1000);
223  }
224  ATH_MSG_INFO("One of the processors is ready for the next range");
225  // Get PID from the request and Update m_pid2RangeID
226  workerPid = std::atoi(processorWaitRequest.c_str());
227  auto it = m_pid2RangeID.find(workerPid);
228  if(it!=m_pid2RangeID.end()) {
229  m_pid2RangeID.erase(it);
230  }
231  }
232 
233  // Signal the Pilot that AthenaMP is ready for event processing
234  void* ready_message = CxxUtils::xmalloc(strReady.size());
235  memcpy(ready_message,strReady.data(),strReady.size());
236  socket2Pilot->send(ready_message,strReady.size());
237  void* eventRangeMessage;
238  std::string strPeerId;
239  ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
240  std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
241  size_t carRet = eventRange.find('\n');
242  if(carRet!=std::string::npos)
243  eventRange.resize(carRet);
244 
245  // Break the loop if no more ranges are expected
246  if(eventRange.find(strStopProcessing)!=std::string::npos) {
247  ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
248  break;
249  }
250  ATH_MSG_INFO("Got Event Range from the pilot: " << eventRange);
251 
252  // Parse the Event Range string
253  // Expected the following format: [{KEY:VALUE[,KEY:VALUE]}]
254  // First get rid of the leading '[{' and the trailing '}]'
255  if(eventRange.starts_with( "[{")) eventRange=eventRange.substr(2);
256  if(eventRange.ends_with("}]")) eventRange.resize(eventRange.size()-2);
257 
258  std::map<std::string,std::string> eventRangeMap;
259  size_t startpos(0);
260  size_t endpos = eventRange.find(',');
261  while(endpos!=std::string::npos) {
262  // Get the Key-Value pair
263  std::string keyValue(eventRange.substr(startpos,endpos-startpos));
264  size_t colonPos = keyValue.find(':');
265  std::string strKey = keyValue.substr(0,colonPos);
266  std::string strVal = keyValue.substr(colonPos+1);
267  trimRangeStrings(strKey);
268  trimRangeStrings(strVal);
269  eventRangeMap[strKey]=std::move(strVal);
270  // Next iteration
271  startpos = endpos+1;
272  endpos = eventRange.find(',',startpos);
273  }
274  // Get the final Key-Value pair
275  std::string keyValue(eventRange.substr(startpos));
276  size_t colonPos = keyValue.find(':');
277  std::string strKey = keyValue.substr(0,colonPos);
278  std::string strVal = keyValue.substr(colonPos+1);
279  trimRangeStrings(strKey);
280  trimRangeStrings(strVal);
281  eventRangeMap[strKey]=std::move(strVal);
282 
283  if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
284  || eventRangeMap.find("startEvent")==eventRangeMap.end()
285  || eventRangeMap.find("lastEvent")==eventRangeMap.end()
286  || eventRangeMap.find("GUID")==eventRangeMap.end()) {
287  std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong format");
289  ATH_MSG_INFO("Ignoring this event range ");
290  void* errorMessage = CxxUtils::xmalloc(errorStr.size());
291  memcpy(errorMessage,errorStr.data(),errorStr.size());
292  socket2Pilot->send(errorMessage,errorStr.size());
293  continue;
294  }
295  else {
296  ATH_MSG_DEBUG("*** Decoded Event Range ***");
297  std::map<std::string,std::string>::const_iterator it = eventRangeMap.begin(),
298  itEnd = eventRangeMap.end();
299  for(;it!=itEnd;++it)
300  ATH_MSG_DEBUG(it->first << ":" << it->second);
301  ATH_MSG_DEBUG("*** ***");
302  }
303 
304  std::string rangeID = eventRangeMap["eventRangeID"];
305  std::string guid = eventRangeMap["GUID"];
306  int startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
307  int lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
308  if(rangeID.empty()
309  || guid.empty()
310  || lastEvent < startEvent) {
311  std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong values of range fields");
313  ATH_MSG_INFO("Ignoring this event range ");
314  void* errorMessage = CxxUtils::xmalloc(errorStr.size());
315  memcpy(errorMessage,errorStr.data(),errorStr.size());
316  socket2Pilot->send(errorMessage,errorStr.size());
317  continue;
318  }
319 
320  std::string message2ProcessorStr;
321  char* message2Processor(0);
322 
323  std::ostringstream ostr;
324  ostr << rangeID;
325  if(eventRangeMap.find("PFN")!=eventRangeMap.end()) {
326  ostr << "," << "PFN:" << eventRangeMap["PFN"];
327  }
328  ostr << "," << eventRangeMap["startEvent"]
329  << "," << eventRangeMap["lastEvent"];
330  message2ProcessorStr = ostr.str();
331 
332  // CACHING MODE: first get an event range from the pilot, transform it into the tokens
333  // and only after that wait for a new range request by one of the processors
334  if(m_doCaching) {
335  ATH_MSG_DEBUG("Waiting for event range request from one of the processors");
336  while(processorWaitRequest.empty()) {
337  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
338  pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
339  usleep(1000);
340  }
341  ATH_MSG_INFO("One of the processors is ready for the next range");
342  // Get PID from the request and Update m_pid2RangeID
343  workerPid = std::atoi(processorWaitRequest.c_str());
344  auto it = m_pid2RangeID.find(workerPid);
345  if(it!=m_pid2RangeID.end()) {
346  m_pid2RangeID.erase(it);
347  }
348  }
349 
350  // Send to the Processor: RangeID,evtToken[,evtToken]
351  message2Processor = (char*)CxxUtils::xmalloc(message2ProcessorStr.size());
352  memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
353  socket2Processor->send(message2Processor,message2ProcessorStr.size());
354  procReportPending++;
355 
356  // Get PID from the request and Update m_pid2RangeID
357  m_pid2RangeID[workerPid] = std::move(rangeID);
358  processorWaitRequest.clear();
359 
360  ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
361  }
362 
363  if(all_ok) {
364  // We are done distributing event tokens.
365  // Tell the workers that the event processing is over
366  // i.e. send out m_nprocs empty messages
367  void* emptyMess4Processor(0);
368  if(!processorWaitRequest.empty()) {
369  // We already have one processor waiting for the answer
370  emptyMess4Processor = CxxUtils::xmalloc(1);
371  socket2Processor->send(emptyMess4Processor,1);
372  ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
373  processorWaitRequest.clear();
374  }
375  bool endLoop{false};
376  while(true) {
377  ATH_MSG_DEBUG("Going to set another processor free");
378  while(processorWaitRequest.empty()) {
379  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
380  if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
381  endLoop = true;
382  break;
383  }
384  usleep(1000);
385  }
386  if(endLoop) break;
387  // Remove worker from m_pid2RangeID
388  workerPid = std::atoi(processorWaitRequest.c_str());
389  auto it = m_pid2RangeID.find(workerPid);
390  if(it!=m_pid2RangeID.end()) {
391  m_pid2RangeID.erase(it);
392  }
393  emptyMess4Processor = CxxUtils::xmalloc(1);
394  socket2Processor->send(emptyMess4Processor,1);
395  ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
396  ATH_MSG_INFO("Still " << procReportPending << " pending reports");
397  processorWaitRequest.clear();
398  }
399  }
400 
401  if(m_appMgr->stop().isFailure()) {
402  ATH_MSG_ERROR("Unable to stop AppMgr");
403  all_ok=false;
404  }
405  else {
406  if(m_appMgr->finalize().isFailure()) {
407  std::cerr << "Unable to finalize AppMgr" << std::endl;
408  all_ok=false;
409  }
410  }
411 
412  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
413  outwork->data = CxxUtils::xmalloc(sizeof(int));
414  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
415  outwork->size = sizeof(int);
416 
417  // ...
418  // (possible) TODO: extend outwork with some error message, which will be eventually
419  // reported in the master proces
420  // ...
421  delete socket2Processor;
422  delete socket2Pilot;
423  delete socketFactory;
424 
425  return outwork;
426 }

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 428 of file EvtRangeScatterer.cxx.

429 {
430  // Dummy
431  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
432  outwork->data = CxxUtils::xmalloc(sizeof(int));
433  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
434  outwork->size = sizeof(int);
435  return outwork;
436 }

◆ finalize()

StatusCode EvtRangeScatterer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 49 of file EvtRangeScatterer.cxx.

50 {
51  return StatusCode::SUCCESS;
52 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334 {
335  char buf[256];
336  strerror_r(errnum, buf, sizeof(buf));
337  return std::string(buf);
338 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr EvtRangeScatterer::generateOutputReport ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 103 of file EvtRangeScatterer.cxx.

104 {
106  return jobOutputs;
107 }

◆ getNewRangeRequest()

std::string EvtRangeScatterer::getNewRangeRequest ( yampl::ISocket *  socket2Processor,
yampl::ISocket *  socket2Pilot,
int &  procReportPending 
)
private

Definition at line 471 of file EvtRangeScatterer.cxx.

474 {
475  void* processor_request(0);
476  std::string strPeerId;
477  ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
478 
479  if(processorRequestSize==-1) return std::string("");
480  if(processorRequestSize==sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status)) {
481  ATH_MSG_INFO("Processor reported event range processing error");
482  pid_t pid = *((pid_t*)processor_request);
483  AthenaMPToolBase::ESRange_Status status = *reinterpret_cast<AthenaMPToolBase::ESRange_Status*>((pid_t*)processor_request+1);
484  std::string errorStr("ERR_ATHENAMP_PROCESS "+ m_pid2RangeID[pid] + ": ");
485  switch(status) {
487  errorStr+=std::string("Not found in the input file");
488  break;
490  errorStr+=std::string("Seek failed");
491  break;
493  errorStr+=std::string("Failed to process event range");
494  break;
496  errorStr+=std::string("Failed to make output file");
497  break;
499  errorStr+=std::string("Failed to set input file");
500  break;
501  default:
502  break;
503  }
504  void* errorMessage = CxxUtils::xmalloc(errorStr.size());
505  memcpy(errorMessage,errorStr.data(),errorStr.size());
506  socket2Pilot->send(errorMessage,errorStr.size());
507  procReportPending--;
508  ATH_MSG_INFO("Error reported to the pilot. Reports pending: " << procReportPending);
509  return std::string("");
510  }
511  std::string strProcessorRequest((const char*)processor_request,processorRequestSize);
512  ATH_MSG_INFO("Received request from a processor: " << strProcessorRequest);
513  // Decode the request. If it contains output file name then pass it over to the pilot and return empty string
514  if(strProcessorRequest.starts_with( "/")) {
515  void* outpFileNameMessage = CxxUtils::xmalloc(strProcessorRequest.size());
516  memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
517  socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
518  procReportPending--;
519  ATH_MSG_INFO("Output file reported to the pilot. Reports pending: " << procReportPending);
520  return std::string("");
521  }
522  return strProcessorRequest;
523 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397 {
398  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400  return 0;
401 }

◆ initialize()

StatusCode EvtRangeScatterer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 39 of file EvtRangeScatterer.cxx.

40 {
41  ATH_MSG_DEBUG("In initialize");
42 
44  if(!sc.isSuccess()) return sc;
45 
46  return StatusCode::SUCCESS;
47 }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202 {
204  kill(child.getProcessID(),SIGKILL);
205  }
206 }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ operator=()

◆ pollFailedPidQueue()

pid_t EvtRangeScatterer::pollFailedPidQueue ( AthenaInterprocess::SharedQueue sharedFailedPidQueue,
yampl::ISocket *  socket2Pilot,
int &  procReportPending 
)
private

Definition at line 525 of file EvtRangeScatterer.cxx.

528 {
529  pid_t pid{0};
530  if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
531  && pid!=-1) {
532  ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
533  auto itPid = m_pid2RangeID.find(pid);
534  if(itPid!=m_pid2RangeID.end()) {
535  ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
536 
537  std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
538  void* errorMessage = CxxUtils::xmalloc(errorStr.size());
539  memcpy(errorMessage,errorStr.data(),errorStr.size());
540  socket2Pilot->send(errorMessage,errorStr.size());
541  --procReportPending;
542  m_pid2RangeID.erase(pid);
543  }
544  ATH_MSG_INFO("Reports pending: " << procReportPending);
545  }
546  return pid;
547 }

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270 {
271  // Redirect both stdout and stderr to the same file AthenaMP.log
272  int dup2result1(0), dup2result2(0);
273 
274  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275  if(newout==-1) {
276  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277  return -1;
278  }
279  dup2result1 = dup2(newout, STDOUT_FILENO);
280  dup2result2 = dup2(newout, STDERR_FILENO);
281  TEMP_FAILURE_RETRY(close(newout));
282  if(dup2result1==-1) {
283  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284  return -1;
285  }
286  if(dup2result2==-1) {
287  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288  return -1;
289  }
290 
291  if(addTimeStamp) {
292  SmartIF<IProperty> propertyServer(msgSvc());
293  if(propertyServer==0) {
294  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295  return -1;
296  }
297 
298  std::string propertyName("Format");
299  std::string oldFormat("");
300  StringProperty formatProp(propertyName,oldFormat);
301  StatusCode sc = propertyServer->getProperty(&formatProp);
302  if(sc.isFailure()) {
303  ATH_MSG_WARNING("Message Service does not have Format property");
304  }
305  else {
306  oldFormat = formatProp.value();
307  if(oldFormat.find("%t")==std::string::npos) {
308  // Add time stamps
309  std::string newFormat("%t " + oldFormat);
310  StringProperty newFormatProp(std::move(propertyName),newFormat);
311  ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312  }
313  else {
314  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315  }
316  }
317  }
318 
319  return 0;
320 }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420 {
421  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422  int old_openflags = fcntl(fd,F_GETFL,0);
423  switch(old_openflags & O_ACCMODE) {
424  case O_RDONLY: {
425  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426  break;
427  }
428  case O_WRONLY: {
429  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430  break;
431  }
432  case O_RDWR: {
433  ATH_MSG_DEBUG("The File Access Mode is RDWR");
434  break;
435  }
436  }
437 
438  int old_descflags = fcntl(fd,F_GETFD,0);
439  off_t oldpos = lseek(fd,0,SEEK_CUR);
440  if(oldpos==-1) {
441  if(errno==ESPIPE) {
442  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443  }
444  else {
445  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446  return -1;
447  }
448  }
449  else {
450  Io::Fd newfd = open(name.c_str(),old_openflags);
451  if(newfd==-1) {
452  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453  return -1;
454  }
455  if(lseek(newfd,oldpos,SEEK_SET)==-1){
456  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457  TEMP_FAILURE_RETRY(close(newfd));
458  return -1;
459  }
460  TEMP_FAILURE_RETRY(close(fd));
461  if(dup2(newfd,fd)==-1) {
462  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463  TEMP_FAILURE_RETRY(close(newfd));
464  return -1;
465  }
466  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468  TEMP_FAILURE_RETRY(close(newfd));
469  return -1;
470  }
471  TEMP_FAILURE_RETRY(close(newfd));
472  }
473  return 0;
474 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341 {
342  // Reopen file descriptors.
343  // First go over all open files, which have been registered with the FileMgr
344  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345  std::set<int> fdLog;
346 
347  // Query the FileMgr contents
348  std::vector<const Io::FileAttr*> filemgrFiles;
349  std::vector<const Io::FileAttr*>::const_iterator itFile;
350  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351  if(filenum!=filemgrFiles.size())
352  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353 
354  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355  ATH_MSG_DEBUG("* " << **itFile);
356  const std::string& filename = (**itFile).name();
357  Io::Fd fd = (**itFile).fd();
358 
359  if(fd==-1) {
360  // It is legal to have fd=-1 for remote inputs
361  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362  // So, this hopefully is a temporary patch
363  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364  continue;
365  }
366 
367  if(reopenFd(fd,filename))
368  return -1;
369 
370  fdLog.insert(fd);
371  }
372 
373  // Check the FdsRegistry
374  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377  }
378  else {
379  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380 
381  if(regEntry.fd==-1) {
382  // Same protection as the one above
383  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384  continue;
385  }
386 
387  if(reopenFd(regEntry.fd,regEntry.name))
388  return -1;
389 
390  fdLog.insert(regEntry.fd);
391  }
392  }
393  return 0;
394 }

◆ reportSubprocessStatuses()

void AthenaMPToolBase::reportSubprocessStatuses ( )
overridevirtualinherited

Reimplemented in EvtRangeProcessor, SharedEvtQueueConsumer, and SharedHiveEvtQueueConsumer.

Definition at line 111 of file AthenaMPToolBase.cxx.

112 {
113  ATH_MSG_INFO("Statuses of sub-processes");
114  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
115  for(size_t i=0; i<statuses.size(); ++i)
116  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS"));
117 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197 {
198  m_randStr = randStr;
199 }

◆ subProcessLogs()

void EvtRangeScatterer::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Definition at line 95 of file EvtRangeScatterer.cxx.

96 {
97  filenames.clear();
100  filenames.push_back(reader_rundir.string()+std::string("/AthenaMP.log"));
101 }

◆ trimRangeStrings()

void EvtRangeScatterer::trimRangeStrings ( std::string &  str)
private

Definition at line 438 of file EvtRangeScatterer.cxx.

439 {
440  size_t i(0);
441  // get rid of leading spaces
442  while(i<str.size() && str[i]==' ') i++;
443  if(i) str = str.substr(i);
444 
445  if(str.empty()) return; // Corner case: string consists only of spaces
446 
447  // get rid of trailing spaces
448  i=str.size()-1;
449  while(str[i]==' ') i--;
450  if(i) str.resize(i+1);
451 
452  // the string might be enclosed by either
453  // "u\'" and "\'"
454  // or
455  // "\"" and "\""
456  // Get rid of them!
457  if(str.starts_with( "u\'")) {
458  str = str.substr(2);
459  if(str.ends_with( "\'")) {
460  str.resize(str.size()-1);
461  }
462  }
463  else if(str.starts_with( "\"")) {
464  str = str.substr(1);
465  if(str.ends_with( "\"")) {
466  str.resize(str.size()-1);
467  }
468  }
469 }

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323 {
324  ATH_CHECK(m_ioMgr.retrieve(), -1);
325 
326  // update the IoRegistry for the new workdir - make sure we use absolute path
328  ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329 
330  return 0;
331 }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192 {
193  m_fdsRegistry = std::move(registry);
194 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404 {
405  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406  sigset_t mask, oldmask;
407 
409 
410  sigemptyset (&mask);
411  sigaddset (&mask, SIGUSR1);
412 
413  sigprocmask (SIG_BLOCK, &mask, &oldmask);
415  sigsuspend (&oldmask);
416  sigprocmask (SIG_UNBLOCK, &mask, NULL);
417 }

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_doCaching

Gaudi::Property<bool> EvtRangeScatterer::m_doCaching {this, "DoCaching", false}
private

Definition at line 65 of file EvtRangeScatterer.h.

◆ m_eventRangeChannel

Gaudi::Property<std::string> EvtRangeScatterer::m_eventRangeChannel {this, "EventRangeChannel", ""}
private

Definition at line 64 of file EvtRangeScatterer.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

◆ m_pid2RangeID

Pid2RangeID EvtRangeScatterer::m_pid2RangeID
private

Definition at line 67 of file EvtRangeScatterer.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

◆ m_processorChannel

Gaudi::Property<std::string> EvtRangeScatterer::m_processorChannel {this, "ProcessorChannel", ""}
private

Definition at line 63 of file EvtRangeScatterer.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.


The documentation for this class was generated from the following files:
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:158
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:115
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
EvtRangeScatterer::getNewRangeRequest
std::string getNewRangeRequest(yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
Definition: EvtRangeScatterer.cxx:471
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:64
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
EvtRangeScatterer::pollFailedPidQueue
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
Definition: EvtRangeScatterer.cxx:525
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:58
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
EvtRangeScatterer::trimRangeStrings
void trimRangeStrings(std::string &)
Definition: EvtRangeScatterer.cxx:438
athena.exitcode
int exitcode
Definition: athena.py:161
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:61
EvtRangeScatterer::m_processorChannel
Gaudi::Property< std::string > m_processorChannel
Definition: EvtRangeScatterer.h:63
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:459
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
sigset_t
int sigset_t
Definition: SealSignal.h:80
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
RunTileMonitoring.keyValue
keyValue
Definition: RunTileMonitoring.py:150
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:419
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:28
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:59
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
EvtRangeScatterer::m_eventRangeChannel
Gaudi::Property< std::string > m_eventRangeChannel
Definition: EvtRangeScatterer.h:64
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:92
pool_uuid.guid
guid
Definition: pool_uuid.py:112
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
python.PyKernel.detStore
detStore
Definition: PyKernel.py:41
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:62
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
checkRpcDigits.errorStr
string errorStr
Definition: checkRpcDigits.py:182
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
AthenaInterprocess::Process
Definition: Process.h:17
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Maximum number of events assigned to the job.
Definition: AthenaMPToolBase.h:86
EvtRangeScatterer::m_pid2RangeID
Pid2RangeID m_pid2RangeID
Definition: EvtRangeScatterer.h:67
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
EvtRangeScatterer::m_doCaching
Gaudi::Property< bool > m_doCaching
Definition: EvtRangeScatterer.h:65
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:23
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
str
Definition: BTagTrackIpAccessor.cxx:11
merge.status
status
Definition: merge.py:16
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:60
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:96
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:63
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:100
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:27