Loading [MathJax]/extensions/tex2jax.js
 |
ATLAS Offline Software
|
#include <SharedHiveEvtQueueConsumer.h>
◆ ESRange_Status
Enumerator |
---|
ESRANGE_SUCCESS | |
ESRANGE_NOTFOUND | |
ESRANGE_SEEKFAILED | |
ESRANGE_PROCFAILED | |
ESRANGE_FILENOTMADE | |
ESRANGE_BADINPFILE | |
Definition at line 58 of file AthenaMPToolBase.h.
◆ Func_Flag
◆ SharedHiveEvtQueueConsumer()
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~SharedHiveEvtQueueConsumer()
SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer |
( |
| ) |
|
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [1/5]
virtual StatusCode exec SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
| ) |
|
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [2/5]
◆ ATLAS_NOT_THREAD_SAFE() [3/5]
int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE |
( |
Func_Flag |
flag, |
|
|
pid_t |
pid = 0 |
|
) |
| |
|
protectedinherited |
◆ ATLAS_NOT_THREAD_SAFE() [4/5]
virtual int makePool SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
int |
maxevt, |
|
|
int |
nprocs, |
|
|
const std::string & |
topdir |
|
) |
| |
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [5/5]
virtual StatusCode wait_once SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
pid_t & |
pid | ) |
|
|
overridevirtual |
◆ bootstrap_func() [1/2]
Definition at line 225 of file SharedHiveEvtQueueConsumer.cxx.
228 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
236 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
238 sigsuspend (&oldmask);
239 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
244 *(
int*)(outwork->data) = 1;
245 outwork->size =
sizeof(
int);
253 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
254 if (!p_incidentSvc) {
258 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
267 std::ostringstream workindex;
275 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
276 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
284 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
290 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
294 if(std::filesystem::is_regular_file(
"SimParams.db"))
295 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
296 if(std::filesystem::is_regular_file(
"DigitParams.db"))
297 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
298 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
299 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
309 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
315 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
316 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
319 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
328 ATH_CHECK( evtSelSvc.isValid(), outwork );
329 ATH_CHECK( evtSelSvc->start(), outwork );
332 if(chdir(worker_rundir.string().c_str())==-1) {
333 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
341 *(
int*)(outwork->data) = 0;
◆ bootstrap_func() [2/2]
◆ evtSelector()
IEvtSelector* AthenaMPToolBase::evtSelector |
( |
| ) |
|
|
inlineprotectedinherited |
◆ exec_func()
Implements AthenaMPToolBase.
Definition at line 347 of file SharedHiveEvtQueueConsumer.cxx.
349 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
361 if(propertyServer==0) {
366 std::string propertyName(
"SkipEvents");
367 IntegerProperty skipEventsProp(propertyName,
skipEvents);
368 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
369 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
378 ATH_MSG_FATAL(
"Failed to acquire IHybridProcessorHelper interface");
380 return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
387 long intmask =
pow(0x100,
sizeof(
int))-1;
388 long evtnumAndChunk(0);
390 int evtnum(0), chunkSize(1);
400 bool loop_ended = (evtnumAndChunk<0);
402 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
403 chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
404 evtnum = evtnumAndChunk & intmask;
405 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
409 bool no_more_events =
false;
421 sc = StatusCode::FAILURE;
427 if (
sc.isFailure()) {
428 ATH_MSG_ERROR(
"Terminating event processing loop due to errors");
439 if(evtnumAndChunk<0) {
440 no_more_events =
true;
441 evtnumAndChunk *= -1;
442 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
445 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
446 chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
447 evtnum = evtnumAndChunk & intmask;
448 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
452 if(!no_more_events) {
467 sc = StatusCode::FAILURE;
474 sc = StatusCode::SUCCESS;
499 *(
int*)(
outdata) = (all_ok?0:1);
501 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
502 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&createdEvts,
sizeof(
int));
505 outwork->size = outsize;
◆ fin_func()
Implements AthenaMPToolBase.
Definition at line 516 of file SharedHiveEvtQueueConsumer.cxx.
518 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
526 if(
m_appMgr->finalize().isFailure()) {
527 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
537 *(
int*)(
outdata) = (all_ok?0:1);
539 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
541 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
544 outwork->size = outsize;
◆ finalize()
StatusCode SharedHiveEvtQueueConsumer::finalize |
( |
| ) |
|
|
overridevirtual |
◆ fmterror()
std::string AthenaMPToolBase::fmterror |
( |
int |
errnum | ) |
|
|
protectedinherited |
Definition at line 333 of file AthenaMPToolBase.cxx.
336 strerror_r(errnum, buf,
sizeof(buf));
337 return std::string(buf);
◆ generateOutputReport()
Reimplemented in EvtRangeProcessor, EvtRangeScatterer, and SharedWriterTool.
Definition at line 119 of file AthenaMPToolBase.cxx.
124 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
131 std::ostringstream workindex;
144 std::ifstream inpStream(
logFile.string().c_str());
145 std::set<std::string> reportedFiles;
146 while(!inpStream.eof()) {
147 std::getline(inpStream,
line);
148 if(
line.find(
"WRITE")!=std::string::npos) {
151 std::vector<std::string>
entries;
152 while(startpos<
line.size()) {
153 while(
line[startpos]==
' ')
156 size_t endpos =
line.find(
' ',startpos);
157 if(endpos==std::string::npos) endpos =
line.size();
158 entries.push_back(
line.substr(startpos,endpos-startpos));
165 if(reportedFiles.find(
basename.string())==reportedFiles.end())
166 reportedFiles.insert(
basename.string());
171 if(
it1==jobOutputs->end()) {
177 newOutput.
filename = absolutename.string();
181 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
183 (*jobOutputs)[
basename.string()].push_back(newOutput);
◆ handleSavedPfc()
int AthenaMPToolBase::handleSavedPfc |
( |
const std::filesystem::path & |
dest_path | ) |
|
|
protectedinherited |
Definition at line 396 of file AthenaMPToolBase.cxx.
398 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
399 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
◆ initHive()
StatusCode SharedHiveEvtQueueConsumer::initHive |
( |
| ) |
|
|
private |
Definition at line 615 of file SharedHiveEvtQueueConsumer.cxx.
621 ISvcManager* pISM(
dynamic_cast<ISvcManager*
>(serviceLocator().
get()));
627 <<
" from SvcManager");
635 return StatusCode::FAILURE;
638 m_schedulerSvc = serviceLocator()->service(
"AvalancheSchedulerSvc");
666 return StatusCode::SUCCESS;
◆ initialize()
StatusCode SharedHiveEvtQueueConsumer::initialize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 59 of file SharedHiveEvtQueueConsumer.cxx.
71 SmartIF<IConversionSvc> cnvSvc(serviceLocator()->service(
"AthenaPoolCnvSvc"));
75 ATH_MSG_ERROR(
"Error retrieving AthenaPoolCnvSvc " << cnvSvc);
76 return StatusCode::FAILURE;
80 return StatusCode::SUCCESS;
◆ killChildren()
void AthenaMPToolBase::killChildren |
( |
| ) |
|
|
overridevirtualinherited |
◆ operator()
virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator |
( |
| ) |
const & |
|
pure virtualinherited |
◆ redirectLog()
int AthenaMPToolBase::redirectLog |
( |
const std::string & |
rundir, |
|
|
bool |
addTimeStamp = true |
|
) |
| |
|
protectedinherited |
Definition at line 269 of file AthenaMPToolBase.cxx.
272 int dup2result1(0), dup2result2(0);
274 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
279 dup2result1 = dup2(newout, STDOUT_FILENO);
280 dup2result2 = dup2(newout, STDERR_FILENO);
281 TEMP_FAILURE_RETRY(close(newout));
282 if(dup2result1==-1) {
286 if(dup2result2==-1) {
292 SmartIF<IProperty> propertyServer(
msgSvc());
293 if(propertyServer==0) {
298 std::string propertyName(
"Format");
299 std::string oldFormat(
"");
300 StringProperty formatProp(propertyName,oldFormat);
301 StatusCode sc = propertyServer->getProperty(&formatProp);
306 oldFormat = formatProp.value();
307 if(oldFormat.find(
"%t")==std::string::npos) {
309 std::string newFormat(
"%t " + oldFormat);
310 StringProperty newFormatProp(propertyName,newFormat);
311 ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
314 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
◆ reopenFd()
int AthenaMPToolBase::reopenFd |
( |
int |
fd, |
|
|
const std::string & |
name |
|
) |
| |
|
privateinherited |
Definition at line 419 of file AthenaMPToolBase.cxx.
422 int old_openflags = fcntl(
fd,F_GETFL,0);
423 switch(old_openflags & O_ACCMODE) {
438 int old_descflags = fcntl(
fd,F_GETFD,0);
439 off_t oldpos = lseek(
fd,0,SEEK_CUR);
455 if(lseek(newfd,oldpos,SEEK_SET)==-1){
457 TEMP_FAILURE_RETRY(close(newfd));
460 TEMP_FAILURE_RETRY(close(
fd));
461 if(dup2(newfd,
fd)==-1) {
462 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
463 TEMP_FAILURE_RETRY(close(newfd));
466 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
467 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
468 TEMP_FAILURE_RETRY(close(newfd));
471 TEMP_FAILURE_RETRY(close(newfd));
◆ reopenFds()
int AthenaMPToolBase::reopenFds |
( |
| ) |
|
|
protectedinherited |
Definition at line 340 of file AthenaMPToolBase.cxx.
348 std::vector<const Io::FileAttr*> filemgrFiles;
349 std::vector<const Io::FileAttr*>::const_iterator itFile;
350 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
351 if(filenum!=filemgrFiles.size())
352 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
354 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
356 const std::string&
filename = (**itFile).name();
363 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
375 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
379 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
381 if(regEntry.fd==-1) {
383 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
387 if(
reopenFd(regEntry.fd,regEntry.name))
390 fdLog.insert(regEntry.fd);
◆ reportSubprocessStatuses()
void SharedHiveEvtQueueConsumer::reportSubprocessStatuses |
( |
| ) |
|
|
overridevirtual |
◆ setMaxEvt()
virtual void AthenaMPToolBase::setMaxEvt |
( |
int |
maxEvt | ) |
|
|
inlineoverridevirtualinherited |
◆ setMPRunStop()
|
inlineoverridevirtualinherited |
◆ setRandString()
void AthenaMPToolBase::setRandString |
( |
const std::string & |
randStr | ) |
|
|
overridevirtualinherited |
◆ subProcessLogs()
void SharedHiveEvtQueueConsumer::subProcessLogs |
( |
std::vector< std::string > & |
filenames | ) |
|
|
overridevirtual |
◆ updateIoReg()
int AthenaMPToolBase::updateIoReg |
( |
const std::string & |
rundir | ) |
|
|
protectedinherited |
◆ useFdsRegistry()
◆ waitForSignal()
void AthenaMPToolBase::waitForSignal |
( |
| ) |
|
|
protectedinherited |
Definition at line 403 of file AthenaMPToolBase.cxx.
405 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
413 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
415 sigsuspend (&oldmask);
416 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
◆ m_appMgr
◆ m_chronoStatSvc
ServiceHandle<IChronoStatSvc> SharedHiveEvtQueueConsumer::m_chronoStatSvc |
|
private |
◆ m_dataShare
SmartIF<IDataShare> SharedHiveEvtQueueConsumer::m_dataShare |
|
private |
◆ m_debug
Gaudi::Property<bool> SharedHiveEvtQueueConsumer::m_debug |
|
private |
Initial value:{
this, "Debug", false,
"Perform extra debugging if true. The default is false."}
Definition at line 61 of file SharedHiveEvtQueueConsumer.h.
◆ m_evtContext
IEvtSelector::Context* SharedHiveEvtQueueConsumer::m_evtContext {} |
|
private |
◆ m_evtProcessor
◆ m_evtSelector
SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector |
|
protectedinherited |
◆ m_evtSelName
std::string AthenaMPToolBase::m_evtSelName |
|
protectedinherited |
◆ m_evtSelSeek
◆ m_fdsRegistry
◆ m_fileMgr
◆ m_fileMgrLog
std::string AthenaMPToolBase::m_fileMgrLog |
|
protectedinherited |
◆ m_finQueue
std::queue<pid_t> SharedHiveEvtQueueConsumer::m_finQueue |
|
private |
◆ m_ioMgr
◆ m_isPileup
Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"} |
|
protectedinherited |
◆ m_maxEvt
int AthenaMPToolBase::m_maxEvt {-1} |
|
protectedinherited |
◆ m_mpRunStop
◆ m_nEventsBeforeFork
Gaudi::Property<int> SharedHiveEvtQueueConsumer::m_nEventsBeforeFork |
|
private |
Initial value:{
this, "EventsBeforeFork", 0,
"The number of events before forking the workers. The default is 0."}
Definition at line 57 of file SharedHiveEvtQueueConsumer.h.
◆ m_nProcessedEvents
std::map<pid_t,int> SharedHiveEvtQueueConsumer::m_nProcessedEvents |
|
private |
◆ m_nprocs
int AthenaMPToolBase::m_nprocs {-1} |
|
protectedinherited |
◆ m_processGroup
◆ m_randStr
std::string AthenaMPToolBase::m_randStr |
|
protectedinherited |
◆ m_rankId
int SharedHiveEvtQueueConsumer::m_rankId {-1} |
|
private |
◆ m_schedulerSvc
SmartIF<IScheduler> SharedHiveEvtQueueConsumer::m_schedulerSvc |
|
private |
◆ m_sharedEventQueue
◆ m_sharedRankQueue
◆ m_subprocDirPrefix
std::string AthenaMPToolBase::m_subprocDirPrefix |
|
protectedinherited |
◆ m_subprocTopDir
std::string AthenaMPToolBase::m_subprocTopDir |
|
protectedinherited |
◆ m_useSharedWriter
Gaudi::Property<bool> SharedHiveEvtQueueConsumer::m_useSharedWriter |
|
private |
Initial value:{
this, "UseSharedWriter", false,
"Use SharedWriter to merge worker outputs on-the-fly if true. The default is false."}
Definition at line 65 of file SharedHiveEvtQueueConsumer.h.
The documentation for this class was generated from the following files:
const std::vector< ProcessStatus > & getStatuses() const
virtual void setCurrentEventNum(int num)=0
path
python interpreter configuration --------------------------------------—
def mkdir(path, recursive=True)
SmartIF< IScheduler > m_schedulerSvc
const std::vector< Process > & getChildren() const
bool try_receive_basic(T &)
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
#define COPY_FILE_HACK(_src, _dest)
std::map< pid_t, int > m_nProcessedEvents
Helper interface for implementing hybrid MP+MT. Used by the Hybrid Shared Event Queue Consumer MP too...
AthenaInterprocess::SharedQueue * m_sharedEventQueue
SmartIF< IEvtSelectorSeek > m_evtSelSeek
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
msgSvc
Provide convenience handles for various services.
SmartIF< IDataShare > m_dataShare
std::vector< HWIdentifier >::iterator it1
::StatusCode StatusCode
StatusCode definition for legacy code.
IEvtSelector::Context * m_evtContext
std::atomic< bool > sig_done
int ir
counter of the current depth
#define ATH_MSG_WARNING(x)
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual int drainScheduler(int &finishedEvts, bool report)=0
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
virtual void resetAppReturnCode()=0
void * xmalloc(size_t size)
Trapping version of malloc.
constexpr int pow(int base, int exp) noexcept
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Gaudi::Property< bool > m_useSharedWriter
Gaudi::Property< bool > m_debug
virtual bool terminateLoop()=0