Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
SharedHiveEvtQueueConsumer Class Referencefinalabstract

#include <SharedHiveEvtQueueConsumer.h>

Inheritance diagram for SharedHiveEvtQueueConsumer:
Collaboration diagram for SharedHiveEvtQueueConsumer:

Public Member Functions

 SharedHiveEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~SharedHiveEvtQueueConsumer () override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process. More...
 
int m_maxEvt {-1}
 Maximum number of events assigned to the job. More...
 
std::string m_subprocTopDir
 Top run directory for subprocesses. More...
 
std::string m_subprocDirPrefix
 For ex. "worker__". More...
 
std::string m_evtSelName
 Name of the event selector. More...
 
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Member Functions

StatusCode initHive ()
 
int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
 
int reopenFd (int fd, const std::string &name)
 

Private Attributes

Gaudi::Property< int > m_nEventsBeforeFork
 
Gaudi::Property< bool > m_debug
 
Gaudi::Property< bool > m_useSharedWriter
 
int m_rankId {-1}
 
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
 
SmartIF< IDataSharem_dataShare
 
SmartIF< IEvtSelectorSeekm_evtSelSeek
 
IEvtSelector::Context * m_evtContext {}
 
AthenaInterprocess::SharedQueuem_sharedEventQueue {}
 
std::unique_ptr< AthenaInterprocess::SharedQueuem_sharedRankQueue
 
std::map< pid_t, int > m_nProcessedEvents
 
std::queue< pid_tm_finQueue
 
SmartIF< IScheduler > m_schedulerSvc
 

Detailed Description

Definition at line 22 of file SharedHiveEvtQueueConsumer.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

67  {
69  , FUNC_EXEC
70  , FUNC_FIN
71  };

Constructor & Destructor Documentation

◆ SharedHiveEvtQueueConsumer()

SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 42 of file SharedHiveEvtQueueConsumer.cxx.

46  , m_chronoStatSvc("ChronoStatSvc", name)
47 {
48  m_subprocDirPrefix = "worker_";
49 }

◆ ~SharedHiveEvtQueueConsumer()

SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer ( )
overridevirtual

Definition at line 53 of file SharedHiveEvtQueueConsumer.cxx.

54 {
55 }

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/5]

virtual StatusCode exec SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [2/5]

int decodeProcessResult SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess::ProcessResult presult,
bool  doFinalize 
)
private

◆ ATLAS_NOT_THREAD_SAFE() [3/5]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [4/5]

virtual int makePool SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [5/5]

virtual StatusCode wait_once SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtual

Reimplemented from AthenaMPToolBase.

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedHiveEvtQueueConsumer::bootstrap_func ( )
overridevirtual

Definition at line 225 of file SharedHiveEvtQueueConsumer.cxx.

226 {
227  if (m_debug) {
228  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
229  sigset_t mask, oldmask;
230 
232 
233  sigemptyset (&mask);
234  sigaddset (&mask, SIGUSR1);
235 
236  sigprocmask (SIG_BLOCK, &mask, &oldmask);
238  sigsuspend (&oldmask);
239  sigprocmask (SIG_UNBLOCK, &mask, NULL);
240  }
241 
242  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
243  outwork->data = CxxUtils::xmalloc(sizeof(int));
244  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
245  outwork->size = sizeof(int);
246 
247  // ...
248  // (possible) TODO: extend outwork with some error message, which will be eventually
249  // reported in the master proces
250  // ...
251 
252  // ________________________ Get IncidentSvc and fire PostFork ________________________
253  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
254  if (!p_incidentSvc) {
255  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
256  return outwork;
257  }
258  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
259 
260 
261  // ________________________ Get RankID ________________________
262  //
263  if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
264  ATH_MSG_ERROR("Unable to get rank ID!");
265  return outwork;
266  }
267  std::ostringstream workindex;
268  workindex<<m_rankId;
269 
270  // ________________________ Worker dir: mkdir ________________________
271  std::filesystem::path worker_rundir(m_subprocTopDir);
272  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
273  // TODO: this "worker_" can be made configurable too
274 
275  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
276  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
277  return outwork;
278  }
279 
280  // ________________________ Redirect logs ________________________
281  if(redirectLog(worker_rundir.string()))
282  return outwork;
283 
284  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
285 
286  // ________________________ Update Io Registry ____________________________
287  if(updateIoReg(worker_rundir.string()))
288  return outwork;
289 
290  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
291 
292  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
293  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
294  if(std::filesystem::is_regular_file("SimParams.db"))
295  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
296  if(std::filesystem::is_regular_file("DigitParams.db"))
297  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
298  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
299  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
300 
301  // _______________________ Handle saved PFC (if any) ______________________
302  if(handleSavedPfc(abs_worker_rundir))
303  return outwork;
304 
305  // ________________________ reopen descriptors ____________________________
306  if(reopenFds())
307  return outwork;
308 
309  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
310 
311  // ________________________ Make Shared Writer Client ________________________
312 
314  SmartIF<IProperty> propertyServer(m_dataShare);
315  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
316  ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property");
317  return outwork;
318  } else {
319  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
320  }
321  }
322 
323  // ________________________ I/O reinit ________________________
324  ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
325 
326  // ________________________ Event selector restart ________________________
327  SmartIF<IService> evtSelSvc(m_evtSelector);
328  ATH_CHECK( evtSelSvc.isValid(), outwork );
329  ATH_CHECK( evtSelSvc->start(), outwork );
330 
331  // ________________________ Worker dir: chdir ________________________
332  if(chdir(worker_rundir.string().c_str())==-1) {
333  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
334  return outwork;
335  }
336 
337  // ___________________ Fire UpdateAfterFork incident _________________
338  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
339 
340  // Declare success and return
341  *(int*)(outwork->data) = 0;
342  return outwork;
343 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83 { return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedHiveEvtQueueConsumer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 347 of file SharedHiveEvtQueueConsumer.cxx.

348 {
349  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
350 
351  bool all_ok(true);
352 
353  if (!initHive().isSuccess()) {
354  ATH_MSG_FATAL("unable to initialize Hive");
355  all_ok = false;
356  }
357 
358  // Get the value of SkipEvent
359  int skipEvents(0);
360  SmartIF<IProperty> propertyServer(m_evtSelector);
361  if(propertyServer==0) {
362  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
363  all_ok = false;
364  }
365  else {
366  std::string propertyName("SkipEvents");
367  IntegerProperty skipEventsProp(propertyName,skipEvents);
368  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
369  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
370  }
371  else {
372  skipEvents = skipEventsProp.value();
373  }
374  }
375 
376  IHybridProcessorHelper* hybridHelper = dynamic_cast<IHybridProcessorHelper*>(m_evtProcessor.get());
377  if(!hybridHelper) {
378  ATH_MSG_FATAL("Failed to acquire IHybridProcessorHelper interface");
379  all_ok = false;
380  return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
381  }
382  // Reset the application return code.
383  hybridHelper->resetAppReturnCode();
384 
385  int finishedEvts =0;
386  int createdEvts =0;
387  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
388  long evtnumAndChunk(0);
389 // unsigned evtCounter(0);
390  int evtnum(0), chunkSize(1);
391 
392  ATH_MSG_INFO("Starting loop on events");
393 
394  StatusCode sc(StatusCode::SUCCESS);
395 
396  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
397  ATH_MSG_DEBUG("Event queue is empty");
398  usleep(1000);
399  }
400  bool loop_ended = (evtnumAndChunk<0);
401  if(!loop_ended) {
402  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
403  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
404  evtnum = evtnumAndChunk & intmask;
405  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
406  hybridHelper->setCurrentEventNum(++evtnum);
407  }
408 
409  bool no_more_events = false;
410 
411  while(!loop_ended) {
412  ATH_MSG_DEBUG(" -> createdEvts: " << createdEvts);
413 
414  if(!hybridHelper->terminateLoop() // No scheduled loop termination
415  && !no_more_events // We are not yet done getting events
416  && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
417  ATH_MSG_DEBUG("createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots());
418 
419  auto ctx = m_evtProcessor->createEventContext();
420  if(!ctx.valid()) {
421  sc = StatusCode::FAILURE;
422  }
423  else {
424  sc = m_evtProcessor->executeEvent(std::move(ctx));
425  }
426 
427  if (sc.isFailure()) {
428  ATH_MSG_ERROR("Terminating event processing loop due to errors");
429  loop_ended = true;
430  }
431  else {
432  ++createdEvts;
433  if(--chunkSize==0) {
434  // Fetch next chunk
435  while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
436  ATH_MSG_DEBUG("Event queue is empty");
437  usleep(1000);
438  }
439  if(evtnumAndChunk<0) {
440  no_more_events = true;
441  evtnumAndChunk *= -1;
442  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
443  }
444  else {
445  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
446  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
447  evtnum = evtnumAndChunk & intmask;
448  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
449  }
450  }
451  // Advance to the next event
452  if(!no_more_events) {
453  hybridHelper->setCurrentEventNum(++evtnum);
454  }
455  }
456  }
457  else {
458  // all the events were created but not all finished or the slots were
459  // all busy: the scheduler should finish its job
460  ATH_MSG_DEBUG("Draining the scheduler");
461 
462  // Pull out of the scheduler the finished events
463  int ir = hybridHelper->drainScheduler(finishedEvts,true);
464  if(ir < 0) {
465  // some sort of error draining scheduler;
466  loop_ended = true;
467  sc = StatusCode::FAILURE;
468  }
469  else if(ir == 0) {
470  // no more events in scheduler
471  if(no_more_events) {
472  // We are done
473  loop_ended = true;
474  sc = StatusCode::SUCCESS;
475  }
476  }
477  else {
478  // keep going!
479  }
480  }
481  } // end main loop on finished events
482 
483  if(all_ok) {
484  if(m_evtProcessor->executeRun(0).isFailure()) {
485  ATH_MSG_ERROR("Could not finalize the Run");
486  all_ok=false;
487  } else {
488  if(m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents).isFailure()) {
489  ATH_MSG_DEBUG("Seek past maxevt to " << evtnumAndChunk+skipEvents << " returned failure. As expected...");
490  }
491  }
492  }
493 
494  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
495 
496  // Return value: "ERRCODE|Func_Flag|NEvt"
497  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
498  void* outdata = CxxUtils::xmalloc(outsize);
499  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
501  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
502  memcpy((char*)outdata+sizeof(int)+sizeof(func),&createdEvts,sizeof(int));
503 
504  outwork->data = outdata;
505  outwork->size = outsize;
506  // ...
507  // (possible) TODO: extend outwork with some error message, which will be eventually
508  // reported in the master proces
509  // ...
510  return outwork;
511 }

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedHiveEvtQueueConsumer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 516 of file SharedHiveEvtQueueConsumer.cxx.

517 {
518  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
519 
520  bool all_ok(true);
521 
522  if(m_appMgr->stop().isFailure()) {
523  ATH_MSG_ERROR("Unable to stop AppMgr");
524  all_ok=false;
525  } else {
526  if(m_appMgr->finalize().isFailure()) {
527  std::cerr << "Unable to finalize AppMgr" << std::endl;
528  all_ok=false;
529  }
530  }
531 
532  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
533 
534  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
535  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
536  void* outdata = CxxUtils::xmalloc(outsize);
537  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
539  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
540  int nEvt = -1;
541  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
542 
543  outwork->data = outdata;
544  outwork->size = outsize;
545 
546  return outwork;
547 }

◆ finalize()

StatusCode SharedHiveEvtQueueConsumer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 86 of file SharedHiveEvtQueueConsumer.cxx.

87 {
88  if (m_evtContext) {
89  ATH_CHECK( evtSelector()->releaseContext (m_evtContext) );
90  m_evtContext = nullptr;
91  }
92 
93  return StatusCode::SUCCESS;
94 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334 {
335  char buf[256];
336  strerror_r(errnum, buf, sizeof(buf));
337  return std::string(buf);
338 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr AthenaMPToolBase::generateOutputReport ( )
overridevirtualinherited

Reimplemented in EvtRangeProcessor, EvtRangeScatterer, and SharedWriterTool.

Definition at line 119 of file AthenaMPToolBase.cxx.

120 {
122 
123  if(m_fileMgrLog.empty()) {
124  ATH_MSG_WARNING(name() << " cannot make output report because FileMgr has not been configured to write log file!");
125  }
126  else {
127  // Collect output files made by the workers
128  std::string line;
129  for(int i=0;i<m_nprocs;++i) {
130  // Get the name of FileMgr log
131  std::ostringstream workindex;
132  workindex<<i;
134  logFilePath /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
135  std::filesystem::path logFile(logFilePath);
137  if(!(std::filesystem::exists(logFile)&&std::filesystem::is_regular_file(logFile))) {
138  ATH_MSG_WARNING(logFile.string() << " either does not exist or is not a regular file. Skipping");
139  continue;
140  }
141 
142  ATH_MSG_DEBUG("FileMgr log file (" << i << ") " << logFile);
143 
144  std::ifstream inpStream(logFile.string().c_str());
145  std::set<std::string> reportedFiles; // Don't report the same file twice
146  while(!inpStream.eof()) {
147  std::getline(inpStream,line);
148  if(line.find("WRITE")!=std::string::npos) {
149  // Parse the entry
150  size_t startpos(0);
151  std::vector<std::string> entries;
152  while(startpos<line.size()) {
153  while(line[startpos]==' ')
154  startpos++;
155 
156  size_t endpos = line.find(' ',startpos);
157  if(endpos==std::string::npos) endpos = line.size();
158  entries.push_back(line.substr(startpos,endpos-startpos));
159  startpos=endpos+1;
160  }
161 
162  // enties[0] is filename
163  std::filesystem::path filenamePath(entries[0]);
164  std::filesystem::path basename = filenamePath.filename();
165  if(reportedFiles.find(basename.string())==reportedFiles.end())
166  reportedFiles.insert(basename.string());
167  else
168  continue;
169  std::filesystem::path absolutename = basename.is_absolute() ? basename : std::filesystem::absolute(std::filesystem::path(logFilePath)/=basename);
170  AthenaMP::AllWorkerOutputsIterator it1 = jobOutputs->find(basename.string());
171  if(it1==jobOutputs->end()) {
172  (*jobOutputs)[basename.string()] = AthenaMP::SingleWorkerOutputs();
173  (*jobOutputs)[basename.string()].reserve(m_nprocs);
174  }
175 
176  AthenaMP::WorkerOutput newOutput;
177  newOutput.filename = absolutename.string();
178  newOutput.technology = entries[1];
179  newOutput.description = entries[2];
180  newOutput.access_mode = entries[3];
181  newOutput.shared = (line.find("SHARED")!=std::string::npos);
182 
183  (*jobOutputs)[basename.string()].push_back(newOutput);
184  }
185  }
186  }
187  }
188  return jobOutputs;
189 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397 {
398  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400  return 0;
401 }

◆ initHive()

StatusCode SharedHiveEvtQueueConsumer::initHive ( )
private

Definition at line 615 of file SharedHiveEvtQueueConsumer.cxx.

615  {
616 
617  if (m_evtProcessor.release().isFailure()) {
618  ATH_MSG_INFO("could not release old EventProcessor ");
619  }
620 
621  ISvcManager* pISM(dynamic_cast<ISvcManager*>(serviceLocator().get()));
622  if (pISM == 0) {
623  ATH_MSG_ERROR("initHive: Could not get SvcManager");
624  } else {
625  if (pISM->removeService(m_evtProcessor.name()).isFailure()) {
626  ATH_MSG_ERROR("initHive: could not remove " << m_evtProcessor.name()
627  << " from SvcManager");
628  }
629  }
630 
631  m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaMtesEventLoopMgr",name());
632 
633  if (m_evtProcessor.retrieve().isFailure()) {
634  ATH_MSG_ERROR("could not setup " << m_evtProcessor.typeAndName());
635  return StatusCode::FAILURE;
636  }
637 
638  m_schedulerSvc = serviceLocator()->service("AvalancheSchedulerSvc");
639 
640  // m_whiteboard = serviceLocator()->service(m_whiteboardName);
641  // if( !m_whiteboard.isValid() ) {
642  // ATH_MSG_FATAL( "Error retrieving " << m_whiteboardName
643  // << " interface IHiveWhiteBoard." );
644  // return StatusCode::FAILURE;
645  // }
646 
647  // m_schedulerSvc = serviceLocator()->service(m_schedulerName);
648  // if ( !m_schedulerSvc.isValid()){
649  // ATH_MSG_FATAL( "Error retrieving SchedulerSvc interface ISchedulerSvc." );
650  // return StatusCode::FAILURE;
651  // }
652  // // Setup algorithm resource pool
653  // m_algResourcePool = serviceLocator()->service("AlgResourcePool");
654  // if( !m_algResourcePool.isValid() ) {
655  // ATH_MSG_FATAL ("Error retrieving AlgResourcePool");
656  // return StatusCode::FAILURE;
657  // }
658 
659  // sc = m_eventStore.retrieve();
660  // if( !sc.isSuccess() ) {
661  // ATH_MSG_FATAL("Error retrieving pointer to StoreGateSvc");
662  // return sc;
663  // }
664 
665 
666  return StatusCode::SUCCESS;
667 
668 }

◆ initialize()

StatusCode SharedHiveEvtQueueConsumer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 59 of file SharedHiveEvtQueueConsumer.cxx.

60 {
61  ATH_MSG_DEBUG("In initialize");
62 
64 
65  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
66  ATH_CHECK( m_evtSelSeek.isValid() );
67  ATH_CHECK( evtSelector()->createContext (m_evtContext) );
68 
69  ATH_CHECK(m_chronoStatSvc.retrieve());
70 
71  SmartIF<IConversionSvc> cnvSvc(serviceLocator()->service("AthenaPoolCnvSvc"));
72  m_dataShare = SmartIF<IDataShare>(cnvSvc);
73  if(!m_dataShare) {
74  if(m_useSharedWriter) {
75  ATH_MSG_ERROR("Error retrieving AthenaPoolCnvSvc " << cnvSvc);
76  return StatusCode::FAILURE;
77  }
78  }
79 
80  return StatusCode::SUCCESS;
81 }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202 {
204  kill(child.getProcessID(),SIGKILL);
205  }
206 }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270 {
271  // Redirect both stdout and stderr to the same file AthenaMP.log
272  int dup2result1(0), dup2result2(0);
273 
274  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275  if(newout==-1) {
276  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277  return -1;
278  }
279  dup2result1 = dup2(newout, STDOUT_FILENO);
280  dup2result2 = dup2(newout, STDERR_FILENO);
281  TEMP_FAILURE_RETRY(close(newout));
282  if(dup2result1==-1) {
283  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284  return -1;
285  }
286  if(dup2result2==-1) {
287  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288  return -1;
289  }
290 
291  if(addTimeStamp) {
292  SmartIF<IProperty> propertyServer(msgSvc());
293  if(propertyServer==0) {
294  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295  return -1;
296  }
297 
298  std::string propertyName("Format");
299  std::string oldFormat("");
300  StringProperty formatProp(propertyName,oldFormat);
301  StatusCode sc = propertyServer->getProperty(&formatProp);
302  if(sc.isFailure()) {
303  ATH_MSG_WARNING("Message Service does not have Format property");
304  }
305  else {
306  oldFormat = formatProp.value();
307  if(oldFormat.find("%t")==std::string::npos) {
308  // Add time stamps
309  std::string newFormat("%t " + oldFormat);
310  StringProperty newFormatProp(propertyName,newFormat);
311  ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312  }
313  else {
314  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315  }
316  }
317  }
318 
319  return 0;
320 }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420 {
421  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422  int old_openflags = fcntl(fd,F_GETFL,0);
423  switch(old_openflags & O_ACCMODE) {
424  case O_RDONLY: {
425  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426  break;
427  }
428  case O_WRONLY: {
429  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430  break;
431  }
432  case O_RDWR: {
433  ATH_MSG_DEBUG("The File Access Mode is RDWR");
434  break;
435  }
436  }
437 
438  int old_descflags = fcntl(fd,F_GETFD,0);
439  off_t oldpos = lseek(fd,0,SEEK_CUR);
440  if(oldpos==-1) {
441  if(errno==ESPIPE) {
442  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443  }
444  else {
445  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446  return -1;
447  }
448  }
449  else {
450  Io::Fd newfd = open(name.c_str(),old_openflags);
451  if(newfd==-1) {
452  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453  return -1;
454  }
455  if(lseek(newfd,oldpos,SEEK_SET)==-1){
456  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457  TEMP_FAILURE_RETRY(close(newfd));
458  return -1;
459  }
460  TEMP_FAILURE_RETRY(close(fd));
461  if(dup2(newfd,fd)==-1) {
462  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463  TEMP_FAILURE_RETRY(close(newfd));
464  return -1;
465  }
466  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468  TEMP_FAILURE_RETRY(close(newfd));
469  return -1;
470  }
471  TEMP_FAILURE_RETRY(close(newfd));
472  }
473  return 0;
474 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341 {
342  // Reopen file descriptors.
343  // First go over all open files, which have been registered with the FileMgr
344  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345  std::set<int> fdLog;
346 
347  // Query the FileMgr contents
348  std::vector<const Io::FileAttr*> filemgrFiles;
349  std::vector<const Io::FileAttr*>::const_iterator itFile;
350  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351  if(filenum!=filemgrFiles.size())
352  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353 
354  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355  ATH_MSG_DEBUG("* " << **itFile);
356  const std::string& filename = (**itFile).name();
357  Io::Fd fd = (**itFile).fd();
358 
359  if(fd==-1) {
360  // It is legal to have fd=-1 for remote inputs
361  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362  // So, this hopefully is a temporary patch
363  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364  continue;
365  }
366 
367  if(reopenFd(fd,filename))
368  return -1;
369 
370  fdLog.insert(fd);
371  }
372 
373  // Check the FdsRegistry
374  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377  }
378  else {
379  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380 
381  if(regEntry.fd==-1) {
382  // Same protection as the one above
383  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384  continue;
385  }
386 
387  if(reopenFd(regEntry.fd,regEntry.name))
388  return -1;
389 
390  fdLog.insert(regEntry.fd);
391  }
392  }
393  return 0;
394 }

◆ reportSubprocessStatuses()

void SharedHiveEvtQueueConsumer::reportSubprocessStatuses ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 190 of file SharedHiveEvtQueueConsumer.cxx.

191 {
192  ATH_MSG_INFO("Statuses of event processors");
193  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
194  for(size_t i=0; i<statuses.size(); ++i) {
195  // Get the number of events processed by this worker
196  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
197  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
198  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
199  << ". Number of events processed: ";
200  if(it==m_nProcessedEvents.end())
201  msg(MSG::INFO) << "N/A" << endmsg;
202  else
203  msg(MSG::INFO) << it->second << endmsg;
204  }
205 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197 {
198  m_randStr = randStr;
199 }

◆ subProcessLogs()

void SharedHiveEvtQueueConsumer::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Definition at line 210 of file SharedHiveEvtQueueConsumer.cxx.

211 {
212  filenames.clear();
213  for(int i=0; i<m_nprocs; ++i) {
214  std::ostringstream workerIndex;
215  workerIndex << i;
216  std::filesystem::path worker_rundir(m_subprocTopDir);
217  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
218  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
219  }
220 }

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323 {
324  ATH_CHECK(m_ioMgr.retrieve(), -1);
325 
326  // update the IoRegistry for the new workdir - make sure we use absolute path
328  ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329 
330  return 0;
331 }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192 {
194 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404 {
405  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406  sigset_t mask, oldmask;
407 
409 
410  sigemptyset (&mask);
411  sigaddset (&mask, SIGUSR1);
412 
413  sigprocmask (SIG_BLOCK, &mask, &oldmask);
415  sigsuspend (&oldmask);
416  sigprocmask (SIG_UNBLOCK, &mask, NULL);
417 }

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_chronoStatSvc

ServiceHandle<IChronoStatSvc> SharedHiveEvtQueueConsumer::m_chronoStatSvc
private

Definition at line 72 of file SharedHiveEvtQueueConsumer.h.

◆ m_dataShare

SmartIF<IDataShare> SharedHiveEvtQueueConsumer::m_dataShare
private

Definition at line 73 of file SharedHiveEvtQueueConsumer.h.

◆ m_debug

Gaudi::Property<bool> SharedHiveEvtQueueConsumer::m_debug
private
Initial value:
{
this, "Debug", false,
"Perform extra debugging if true. The default is false."}

Definition at line 61 of file SharedHiveEvtQueueConsumer.h.

◆ m_evtContext

IEvtSelector::Context* SharedHiveEvtQueueConsumer::m_evtContext {}
private

Definition at line 75 of file SharedHiveEvtQueueConsumer.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_evtSelSeek

SmartIF<IEvtSelectorSeek> SharedHiveEvtQueueConsumer::m_evtSelSeek
private

Definition at line 74 of file SharedHiveEvtQueueConsumer.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_finQueue

std::queue<pid_t> SharedHiveEvtQueueConsumer::m_finQueue
private

Definition at line 81 of file SharedHiveEvtQueueConsumer.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_nEventsBeforeFork

Gaudi::Property<int> SharedHiveEvtQueueConsumer::m_nEventsBeforeFork
private
Initial value:
{
this, "EventsBeforeFork", 0,
"The number of events before forking the workers. The default is 0."}

Definition at line 57 of file SharedHiveEvtQueueConsumer.h.

◆ m_nProcessedEvents

std::map<pid_t,int> SharedHiveEvtQueueConsumer::m_nProcessedEvents
private

Definition at line 80 of file SharedHiveEvtQueueConsumer.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_rankId

int SharedHiveEvtQueueConsumer::m_rankId {-1}
private

Definition at line 70 of file SharedHiveEvtQueueConsumer.h.

◆ m_schedulerSvc

SmartIF<IScheduler> SharedHiveEvtQueueConsumer::m_schedulerSvc
private

Definition at line 83 of file SharedHiveEvtQueueConsumer.h.

◆ m_sharedEventQueue

AthenaInterprocess::SharedQueue* SharedHiveEvtQueueConsumer::m_sharedEventQueue {}
private

Definition at line 77 of file SharedHiveEvtQueueConsumer.h.

◆ m_sharedRankQueue

std::unique_ptr<AthenaInterprocess::SharedQueue> SharedHiveEvtQueueConsumer::m_sharedRankQueue
private

Definition at line 78 of file SharedHiveEvtQueueConsumer.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.

◆ m_useSharedWriter

Gaudi::Property<bool> SharedHiveEvtQueueConsumer::m_useSharedWriter
private
Initial value:
{
this, "UseSharedWriter", false,
"Use SharedWriter to merge worker outputs on-the-fly if true. The default is false."}

Definition at line 65 of file SharedHiveEvtQueueConsumer.h.


The documentation for this class was generated from the following files:
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:116
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
IHybridProcessorHelper::setCurrentEventNum
virtual void setCurrentEventNum(int num)=0
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:64
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
collListGuids.line
string line
Definition: collListGuids.py:77
AthenaMP::WorkerOutput::description
std::string description
Definition: IAthenaMPTool.h:24
SharedHiveEvtQueueConsumer::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
Definition: SharedHiveEvtQueueConsumer.h:83
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Name of the event selector.
Definition: AthenaMPToolBase.h:89
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:61
AthenaMPToolBase::m_fileMgrLog
std::string m_fileMgrLog
Definition: AthenaMPToolBase.h:99
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:460
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:30
AthenaMP::SingleWorkerOutputs
std::vector< WorkerOutput > SingleWorkerOutputs
Definition: IAthenaMPTool.h:28
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedHiveEvtQueueConsumer::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: SharedHiveEvtQueueConsumer.h:80
IHybridProcessorHelper
Helper interface for implementing hybrid MP+MT. Used by the Hybrid Shared Event Queue Consumer MP too...
Definition: IHybridProcessorHelper.h:16
sigset_t
int sigset_t
Definition: SealSignal.h:80
SharedHiveEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedHiveEvtQueueConsumer.h:77
SharedHiveEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedHiveEvtQueueConsumer.h:74
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
SharedHiveEvtQueueConsumer::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: SharedHiveEvtQueueConsumer.h:78
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
SharedHiveEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedHiveEvtQueueConsumer.h:73
DetDescrDictionaryDict::it1
std::vector< HWIdentifier >::iterator it1
Definition: DetDescrDictionaryDict.h:17
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:67
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
python.LArMinBiasAlgConfig.int
int
Definition: LArMinBiasAlgConfig.py:59
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:419
SharedHiveEvtQueueConsumer_d::pauseForDebug
void pauseForDebug(int)
Definition: SharedHiveEvtQueueConsumer.cxx:35
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
AthenaMP::WorkerOutput::shared
bool shared
Definition: IAthenaMPTool.h:26
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:83
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:28
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:59
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMP::WorkerOutput::access_mode
std::string access_mode
Definition: IAthenaMPTool.h:25
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:92
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
SharedHiveEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedHiveEvtQueueConsumer.h:75
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:62
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
AthenaInterprocess::Process
Definition: Process.h:17
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
SharedHiveEvtQueueConsumer_d::sig_done
std::atomic< bool > sig_done
Definition: SharedHiveEvtQueueConsumer.cxx:34
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
ir
int ir
counter of the current depth
Definition: fastadd.cxx:49
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Maximum number of events assigned to the job.
Definition: AthenaMPToolBase.h:86
SharedHiveEvtQueueConsumer::initHive
StatusCode initHive()
Definition: SharedHiveEvtQueueConsumer.cxx:615
SharedHiveEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedHiveEvtQueueConsumer.h:70
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaMP::WorkerOutput::technology
std::string technology
Definition: IAthenaMPTool.h:23
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
SharedHiveEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedHiveEvtQueueConsumer.h:72
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:21
IHybridProcessorHelper::drainScheduler
virtual int drainScheduler(int &finishedEvts, bool report)=0
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
IHybridProcessorHelper::resetAppReturnCode
virtual void resetAppReturnCode()=0
entries
double entries
Definition: listroot.cxx:49
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:60
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
jetMakeRefSamples.logFile
string logFile
Definition: jetMakeRefSamples.py:57
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:96
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
SharedHiveEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedHiveEvtQueueConsumer.h:65
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
SharedHiveEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedHiveEvtQueueConsumer.h:61
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:63
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:100
AthenaMP::WorkerOutput::filename
std::string filename
Definition: IAthenaMPTool.h:22
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:27
IHybridProcessorHelper::terminateLoop
virtual bool terminateLoop()=0
ServiceHandle< IEventProcessor >
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:94
beamspotman.basename
basename
Definition: beamspotman.py:640