Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
SharedWriterTool Class Referencefinalabstract

#include <SharedWriterTool.h>

Inheritance diagram for SharedWriterTool:
Collaboration diagram for SharedWriterTool:

Public Member Functions

 SharedWriterTool (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~SharedWriterTool () override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process. More...
 
int m_maxEvt {-1}
 Maximum number of events assigned to the job. More...
 
std::string m_subprocTopDir
 Top run directory for subprocesses. More...
 
std::string m_subprocDirPrefix
 For ex. "worker__". More...
 
std::string m_evtSelName
 Name of the event selector. More...
 
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Member Functions

 SharedWriterTool ()
 
 SharedWriterTool (const SharedWriterTool &)
 
SharedWriterTooloperator= (const SharedWriterTool &)
 
int reopenFd (int fd, const std::string &name)
 

Private Attributes

Gaudi::Property< bool > m_nMotherProcess
 
Gaudi::Property< bool > m_debug
 
int m_rankId
 
AthenaInterprocess::SharedQueuem_sharedRankQueue
 
SmartIF< IConversionSvc > m_cnvSvc
 

Detailed Description

Definition at line 12 of file SharedWriterTool.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

67  {
69  , FUNC_EXEC
70  , FUNC_FIN
71  };

Constructor & Destructor Documentation

◆ SharedWriterTool() [1/3]

SharedWriterTool::SharedWriterTool ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 18 of file SharedWriterTool.cxx.

22  , m_rankId(0)
23  , m_sharedRankQueue(nullptr)
24 {
25  m_subprocDirPrefix = "shared_writer";
26 }

◆ ~SharedWriterTool()

SharedWriterTool::~SharedWriterTool ( )
overridevirtual

Definition at line 28 of file SharedWriterTool.cxx.

29 {
30 }

◆ SharedWriterTool() [2/3]

SharedWriterTool::SharedWriterTool ( )
private

◆ SharedWriterTool() [3/3]

SharedWriterTool::SharedWriterTool ( const SharedWriterTool )
private

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/4]

virtual StatusCode exec SharedWriterTool::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [2/4]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [3/4]

virtual int makePool SharedWriterTool::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [4/4]

virtual StatusCode wait_once AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtualinherited

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedWriterTool::bootstrap_func ( )
overridevirtual

Definition at line 134 of file SharedWriterTool.cxx.

135 {
136  // It's possible to debug SharedWriter just like any other AthenaMP worker.
137  // The following procedure provides a minimal explanation on how this can be achieved:
138  //
139  // Terminal #1:
140  // * Run athena w/ debugging enabled, e.g. athena.py --debugWorker --stdcmalloc --nprocs=8 [...]
141  // * In this mode, workers will be stopped after fork(), waiting for SIGUSR1 to be resumed
142  // * Find the PID of the worker to be debugged (printed by the job in stdout)
143  //
144  // Terminal #2:
145  // * Attach gdb to the relevant worker, i.e. gdb python PID
146  // * Once the symbols are loaded, one can perform any gdb action such as setting breakpoints etc.
147  // * Once ready, send SIGUSR1 to the worker to resume work, i.e. signal SIGUSR1 (in gdb)
148  //
149  // Terminal #3:
150  // * Send SIGUSR1 to the remaining workers (easiest to use htop)
151  //
152  // However, note that sometimes Shared I/O infrastructure struggles with timing problems,
153  // such as server/client(s) starting/stopping too early/later. Debugging can change this
154  // behavior so please keep this in mind.
155  if(m_debug) waitForSignal();
156 
157  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
158  outwork->data = malloc(sizeof(int));
159  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
160  outwork->size = sizeof(int);
161 
162  // ...
163  // (possible) TODO: extend outwork with some error message, which will be eventually
164  // reported in the master proces
165  // ...
166 
167  // ________________________ Get RankID ________________________
168  //
170  ATH_MSG_ERROR("Unable to get rank ID!");
171  return outwork;
172  }
173  // Writer dir: mkdir
174  std::filesystem::path writer_rundir(m_subprocTopDir);
175  writer_rundir /= std::filesystem::path(m_subprocDirPrefix);
176 
177  if(mkdir(writer_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
178  ATH_MSG_ERROR("Unable to make writer run directory: " << writer_rundir.string() << ". " << fmterror(errno));
179  return outwork;
180  }
181 
182  // __________ Redirect logs unless we want to attach debugger ____________
183  if(!m_debug) {
184  if(redirectLog(writer_rundir.string()))
185  return outwork;
186 
187  ATH_MSG_INFO("Logs redirected in the AthenaMP Shared Writer PID=" << getpid());
188  }
189 
190  // Update Io Registry
191  if(updateIoReg(writer_rundir.string()))
192  return outwork;
193 
194  ATH_MSG_INFO("Io registry updated in the AthenaMP Shared Writer PID=" << getpid());
195 
196  // _______________________ Handle saved PFC (if any) ______________________
197  std::filesystem::path abs_writer_rundir = std::filesystem::absolute(writer_rundir);
198  if(handleSavedPfc(abs_writer_rundir))
199  return outwork;
200 
201  // Reopen file descriptors
202  if(reopenFds())
203  return outwork;
204 
205  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Shared Writer PID=" << getpid());
206 
207  // Try to initialize AthenaRootSharedWriterSvc early on
208  SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
209  if(!sharedWriterSvc) {
210  ATH_MSG_WARNING("Error retrieving AthenaRootSharedWriterSvc from SharedWriterTool::bootstrap_func()");
211  }
212 
213  // Use IDataShare to make ConversionSvc a Share Server
214  SmartIF<IDataShare> cnvSvc(m_cnvSvc);
215  if (!cnvSvc || !cnvSvc->makeServer(-m_nprocs - 1 - 1024 * m_rankId).isSuccess()) {
216  ATH_MSG_ERROR("Failed to make the conversion service a share server");
217  return outwork;
218  }
219  else {
220  ATH_MSG_DEBUG("Successfully made the conversion service a share server");
221  }
222 
223  // ________________________ I/O reinit ________________________
224  if(!m_ioMgr->io_reinitialize().isSuccess()) {
225  ATH_MSG_ERROR("Failed to reinitialize I/O");
226  return outwork;
227  } else {
228  ATH_MSG_DEBUG("Successfully reinitialized I/O");
229  }
230 
231  // Writer dir: chdir
232  if(chdir(writer_rundir.string().c_str())==-1) {
233  ATH_MSG_ERROR("Failed to chdir to " << writer_rundir.string());
234  return outwork;
235  }
236 
237  // Declare success and return
238  *(int*)(outwork->data) = 0;
239  return outwork;
240 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83 { return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedWriterTool::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 242 of file SharedWriterTool.cxx.

243 {
244  ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid());
245  bool all_ok=true;
246 
247  SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
248  if(!sharedWriterSvc) {
249  ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc");
250  all_ok=false;
251  } else if(!sharedWriterSvc->share(m_nprocs, m_nMotherProcess.value()).isSuccess()) {
252  ATH_MSG_ERROR("Exec function could not share data");
253  all_ok=false;
254  }
255  AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc.get());
256  if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) {
257  ATH_MSG_ERROR("Exec function could not disconnectOutput");
258  all_ok=false;
259  }
260 
261  if(m_appMgr->stop().isFailure()) {
262  ATH_MSG_ERROR("Unable to stop AppMgr");
263  all_ok=false;
264  }
265  else {
266  if(m_appMgr->finalize().isFailure()) {
267  std::cerr << "Unable to finalize AppMgr" << std::endl;
268  all_ok=false;
269  }
270  }
271 
272  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
273  outwork->data = malloc(sizeof(int));
274  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
275  outwork->size = sizeof(int);
276 
277  // ...
278  // (possible) TODO: extend outwork with some error message, which will be eventually
279  // reported in the master proces
280  // ...
281  return outwork;
282 }

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedWriterTool::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 284 of file SharedWriterTool.cxx.

285 {
286  // Dummy
287  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
288  outwork->data = malloc(sizeof(int));
289  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
290  outwork->size = sizeof(int);
291  return outwork;
292 }

◆ finalize()

StatusCode SharedWriterTool::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 43 of file SharedWriterTool.cxx.

44 {
45  ATH_MSG_DEBUG("In finalize");
46 
47  delete m_sharedRankQueue;
48  return StatusCode::SUCCESS;
49 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 332 of file AthenaMPToolBase.cxx.

333 {
334  char buf[256];
335  strerror_r(errnum, buf, sizeof(buf));
336  return std::string(buf);
337 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr SharedWriterTool::generateOutputReport ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 128 of file SharedWriterTool.cxx.

129 {
131  return jobOutputs;
132 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 395 of file AthenaMPToolBase.cxx.

396 {
397  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
398  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
399  return 0;
400 }

◆ initialize()

StatusCode SharedWriterTool::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 32 of file SharedWriterTool.cxx.

33 {
34  ATH_MSG_DEBUG("In initialize");
35 
37  m_cnvSvc = serviceLocator()->service("AthenaPoolCnvSvc");
38  ATH_CHECK(m_cnvSvc.isValid());
39 
40  return StatusCode::SUCCESS;
41 }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 200 of file AthenaMPToolBase.cxx.

201 {
203  kill(child.getProcessID(),SIGKILL);
204  }
205 }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ operator=()

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 268 of file AthenaMPToolBase.cxx.

269 {
270  // Redirect both stdout and stderr to the same file AthenaMP.log
271  int dup2result1(0), dup2result2(0);
272 
273  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
274  if(newout==-1) {
275  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
276  return -1;
277  }
278  dup2result1 = dup2(newout, STDOUT_FILENO);
279  dup2result2 = dup2(newout, STDERR_FILENO);
280  TEMP_FAILURE_RETRY(close(newout));
281  if(dup2result1==-1) {
282  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
283  return -1;
284  }
285  if(dup2result2==-1) {
286  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
287  return -1;
288  }
289 
290  if(addTimeStamp) {
291  SmartIF<IProperty> propertyServer(msgSvc());
292  if(propertyServer==0) {
293  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
294  return -1;
295  }
296 
297  std::string propertyName("Format");
298  std::string oldFormat("");
299  StringProperty formatProp(propertyName,oldFormat);
300  StatusCode sc = propertyServer->getProperty(&formatProp);
301  if(sc.isFailure()) {
302  ATH_MSG_WARNING("Message Service does not have Format property");
303  }
304  else {
305  oldFormat = formatProp.value();
306  if(oldFormat.find("%t")==std::string::npos) {
307  // Add time stamps
308  std::string newFormat("%t " + oldFormat);
309  StringProperty newFormatProp(propertyName,newFormat);
310  ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
311  }
312  else {
313  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
314  }
315  }
316  }
317 
318  return 0;
319 }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 418 of file AthenaMPToolBase.cxx.

419 {
420  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
421  int old_openflags = fcntl(fd,F_GETFL,0);
422  switch(old_openflags & O_ACCMODE) {
423  case O_RDONLY: {
424  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
425  break;
426  }
427  case O_WRONLY: {
428  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
429  break;
430  }
431  case O_RDWR: {
432  ATH_MSG_DEBUG("The File Access Mode is RDWR");
433  break;
434  }
435  }
436 
437  int old_descflags = fcntl(fd,F_GETFD,0);
438  off_t oldpos = lseek(fd,0,SEEK_CUR);
439  if(oldpos==-1) {
440  if(errno==ESPIPE) {
441  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
442  }
443  else {
444  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
445  return -1;
446  }
447  }
448  else {
449  Io::Fd newfd = open(name.c_str(),old_openflags);
450  if(newfd==-1) {
451  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
452  return -1;
453  }
454  if(lseek(newfd,oldpos,SEEK_SET)==-1){
455  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
456  TEMP_FAILURE_RETRY(close(newfd));
457  return -1;
458  }
459  TEMP_FAILURE_RETRY(close(fd));
460  if(dup2(newfd,fd)==-1) {
461  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
462  TEMP_FAILURE_RETRY(close(newfd));
463  return -1;
464  }
465  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
466  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
467  TEMP_FAILURE_RETRY(close(newfd));
468  return -1;
469  }
470  TEMP_FAILURE_RETRY(close(newfd));
471  }
472  return 0;
473 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 339 of file AthenaMPToolBase.cxx.

340 {
341  // Reopen file descriptors.
342  // First go over all open files, which have been registered with the FileMgr
343  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
344  std::set<int> fdLog;
345 
346  // Query the FileMgr contents
347  std::vector<const Io::FileAttr*> filemgrFiles;
348  std::vector<const Io::FileAttr*>::const_iterator itFile;
349  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
350  if(filenum!=filemgrFiles.size())
351  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
352 
353  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
354  ATH_MSG_DEBUG("* " << **itFile);
355  const std::string& filename = (**itFile).name();
356  Io::Fd fd = (**itFile).fd();
357 
358  if(fd==-1) {
359  // It is legal to have fd=-1 for remote inputs
360  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
361  // So, this hopefully is a temporary patch
362  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
363  continue;
364  }
365 
366  if(reopenFd(fd,filename))
367  return -1;
368 
369  fdLog.insert(fd);
370  }
371 
372  // Check the FdsRegistry
373  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
374  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
375  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
376  }
377  else {
378  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
379 
380  if(regEntry.fd==-1) {
381  // Same protection as the one above
382  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
383  continue;
384  }
385 
386  if(reopenFd(regEntry.fd,regEntry.name))
387  return -1;
388 
389  fdLog.insert(regEntry.fd);
390  }
391  }
392  return 0;
393 }

◆ reportSubprocessStatuses()

void AthenaMPToolBase::reportSubprocessStatuses ( )
overridevirtualinherited

Reimplemented in EvtRangeProcessor, SharedEvtQueueConsumer, and SharedHiveEvtQueueConsumer.

Definition at line 110 of file AthenaMPToolBase.cxx.

111 {
112  ATH_MSG_INFO("Statuses of sub-processes");
113  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
114  for(size_t i=0; i<statuses.size(); ++i)
115  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS"));
116 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Definition at line 195 of file AthenaMPToolBase.cxx.

196 {
197  m_randStr = randStr;
198 }

◆ subProcessLogs()

void SharedWriterTool::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Definition at line 120 of file SharedWriterTool.cxx.

121 {
122  filenames.clear();
123  std::filesystem::path writer_rundir(m_subprocTopDir);
124  writer_rundir/= std::filesystem::path(m_subprocDirPrefix);
125  filenames.push_back(writer_rundir.string()+std::string("/AthenaMP.log"));
126 }

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 321 of file AthenaMPToolBase.cxx.

322 {
323  ATH_CHECK(m_ioMgr.retrieve(), -1);
324 
325  // update the IoRegistry for the new workdir - make sure we use absolute path
327  ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
328 
329  return 0;
330 }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Definition at line 190 of file AthenaMPToolBase.cxx.

191 {
193 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 402 of file AthenaMPToolBase.cxx.

403 {
404  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
405  sigset_t mask, oldmask;
406 
408 
409  sigemptyset (&mask);
410  sigaddset (&mask, SIGUSR1);
411 
412  sigprocmask (SIG_BLOCK, &mask, &oldmask);
414  sigsuspend (&oldmask);
415  sigprocmask (SIG_UNBLOCK, &mask, NULL);
416 }

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_cnvSvc

SmartIF<IConversionSvc> SharedWriterTool::m_cnvSvc
private

Definition at line 52 of file SharedWriterTool.h.

◆ m_debug

Gaudi::Property<bool> SharedWriterTool::m_debug
private
Initial value:
{
this, "Debug", false,
"Are we running in debug mode? The default is false"}

Definition at line 45 of file SharedWriterTool.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_nMotherProcess

Gaudi::Property<bool> SharedWriterTool::m_nMotherProcess
private
Initial value:
{
this, "MotherProcess", false,
"Expect mother process to write event data. The default is false."}

Definition at line 42 of file SharedWriterTool.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_rankId

int SharedWriterTool::m_rankId
private

Definition at line 49 of file SharedWriterTool.h.

◆ m_sharedRankQueue

AthenaInterprocess::SharedQueue* SharedWriterTool::m_sharedRankQueue
private

Definition at line 51 of file SharedWriterTool.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.


The documentation for this class was generated from the following files:
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:116
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
SharedWriterTool::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedWriterTool.h:51
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:402
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:64
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:332
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
athena.exitcode
int exitcode
Definition: athena.py:161
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:61
SharedWriterTool::m_rankId
int m_rankId
Definition: SharedWriterTool.h:49
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:460
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
sigset_t
int sigset_t
Definition: SealSignal.h:80
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
python.LArMinBiasAlgConfig.int
int
Definition: LArMinBiasAlgConfig.py:59
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:418
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:27
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:59
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:321
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:47
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:92
SharedWriterTool::m_nMotherProcess
Gaudi::Property< bool > m_nMotherProcess
Definition: SharedWriterTool.h:42
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
SharedWriterTool::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedWriterTool.h:45
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:62
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:268
AthenaInterprocess::Process
Definition: Process.h:17
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Maximum number of events assigned to the job.
Definition: AthenaMPToolBase.h:86
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:408
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:60
SharedWriterTool::m_cnvSvc
SmartIF< IConversionSvc > m_cnvSvc
Definition: SharedWriterTool.h:52
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:395
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:96
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
AthCnvSvc
Definition: AthCnvSvc.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:339
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:63
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:100
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:26