![Logo](../../ATLAS-Logo-Square-Blue-RGB.png) |
ATLAS Offline Software
|
#include <SharedHiveEvtQueueConsumer.h>
|
| SharedHiveEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent) |
|
virtual | ~SharedHiveEvtQueueConsumer () override |
|
virtual StatusCode | initialize () override |
|
virtual StatusCode | finalize () override |
|
virtual int makePool | ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override |
|
virtual StatusCode exec | ATLAS_NOT_THREAD_SAFE () override |
|
virtual StatusCode wait_once | ATLAS_NOT_THREAD_SAFE (pid_t &pid) override |
|
virtual void | reportSubprocessStatuses () override |
|
virtual void | subProcessLogs (std::vector< std::string > &) override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | exec_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | fin_func () override |
|
virtual AthenaMP::AllWorkerOutputs_ptr | generateOutputReport () override |
|
virtual void | useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override |
|
virtual void | setRandString (const std::string &randStr) override |
|
virtual void | killChildren () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func ()=0 |
|
ServiceHandle< StoreGateSvc > & | evtStore () |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | evtStore () const |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | detStore () const |
| The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
virtual StatusCode | sysInitialize () override |
| Perform system initialization for an algorithm. More...
|
|
virtual StatusCode | sysStart () override |
| Handle START transition. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | inputHandles () const override |
| Return this algorithm's input handles. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | outputHandles () const override |
| Return this algorithm's output handles. More...
|
|
Gaudi::Details::PropertyBase & | declareProperty (Gaudi::Property< T > &t) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKey &hndl, const std::string &doc, const SG::VarHandleKeyType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleBase &hndl, const std::string &doc, const SG::VarHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKeyArray &hndArr, const std::string &doc, const SG::VarHandleKeyArrayType &) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc, const SG::NotHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc="none") |
| Declare a new Gaudi property. More...
|
|
void | updateVHKA (Gaudi::Details::PropertyBase &) |
|
MsgStream & | msg () const |
|
MsgStream & | msg (const MSG::Level lvl) const |
|
bool | msgLvl (const MSG::Level lvl) const |
|
virtual std::unique_ptr< ScheduledWork > | operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0 |
|
◆ StoreGateSvc_t
◆ ESRange_Status
Enumerator |
---|
ESRANGE_SUCCESS | |
ESRANGE_NOTFOUND | |
ESRANGE_SEEKFAILED | |
ESRANGE_PROCFAILED | |
ESRANGE_FILENOTMADE | |
ESRANGE_BADINPFILE | |
Definition at line 56 of file AthenaMPToolBase.h.
◆ Func_Flag
◆ SharedHiveEvtQueueConsumer() [1/3]
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~SharedHiveEvtQueueConsumer()
SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer |
( |
| ) |
|
|
overridevirtual |
◆ SharedHiveEvtQueueConsumer() [2/3]
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer |
( |
| ) |
|
|
private |
◆ SharedHiveEvtQueueConsumer() [3/3]
◆ ATLAS_NOT_THREAD_SAFE() [1/5]
virtual StatusCode exec SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
| ) |
|
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [2/5]
◆ ATLAS_NOT_THREAD_SAFE() [3/5]
int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE |
( |
Func_Flag |
flag, |
|
|
pid_t |
pid = 0 |
|
) |
| |
|
protectedinherited |
◆ ATLAS_NOT_THREAD_SAFE() [4/5]
virtual int makePool SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
int |
maxevt, |
|
|
int |
nprocs, |
|
|
const std::string & |
topdir |
|
) |
| |
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [5/5]
virtual StatusCode wait_once SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
pid_t & |
pid | ) |
|
|
overridevirtual |
◆ bootstrap_func() [1/2]
Definition at line 238 of file SharedHiveEvtQueueConsumer.cxx.
241 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
249 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
251 sigsuspend (&oldmask);
252 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
256 outwork->data =
malloc(
sizeof(
int));
257 *(
int*)(outwork->data) = 1;
258 outwork->size =
sizeof(
int);
266 IIncidentSvc* p_incidentSvc(0);
267 if(!serviceLocator()->service(
"IncidentSvc", p_incidentSvc).isSuccess()) {
271 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
280 std::ostringstream workindex;
288 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
289 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
297 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
303 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
307 if(std::filesystem::is_regular_file(
"SimParams.db"))
308 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
309 if(std::filesystem::is_regular_file(
"DigitParams.db"))
310 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
311 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
312 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
322 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
327 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_dataShare);
328 if (propertyServer==0 || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
329 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
332 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
337 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
345 IService* evtSelSvc =
dynamic_cast<IService*
>(
m_evtSelector);
347 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
350 if(!evtSelSvc->start().isSuccess()) {
358 if(chdir(worker_rundir.string().c_str())==-1) {
359 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
367 *(
int*)(outwork->data) = 0;
◆ bootstrap_func() [2/2]
◆ declareGaudiProperty() [1/4]
specialization for handling Gaudi::Property<SG::VarHandleKeyArray>
Definition at line 170 of file AthCommonDataStore.h.
175 hndl.documentation());
◆ declareGaudiProperty() [2/4]
specialization for handling Gaudi::Property<SG::VarHandleKey>
Definition at line 156 of file AthCommonDataStore.h.
161 hndl.documentation());
◆ declareGaudiProperty() [3/4]
specialization for handling Gaudi::Property<SG::VarHandleBase>
Definition at line 184 of file AthCommonDataStore.h.
189 hndl.documentation());
◆ declareGaudiProperty() [4/4]
◆ declareProperty() [1/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleBase
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 245 of file AthCommonDataStore.h.
250 this->declare(hndl.
vhKey());
251 hndl.
vhKey().setOwner(
this);
◆ declareProperty() [2/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleKey
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 221 of file AthCommonDataStore.h.
◆ declareProperty() [3/6]
◆ declareProperty() [4/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This is the generic version, for types that do not derive from SG::VarHandleKey
. It just forwards to the base class version of declareProperty
.
Definition at line 333 of file AthCommonDataStore.h.
◆ declareProperty() [5/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This dispatches to either the generic declareProperty
or the one for VarHandle/Key/KeyArray.
Definition at line 352 of file AthCommonDataStore.h.
◆ declareProperty() [6/6]
◆ detStore()
◆ evtSelector()
IEvtSelector* AthenaMPToolBase::evtSelector |
( |
| ) |
|
|
inlineprotectedinherited |
◆ evtStore() [1/2]
◆ evtStore() [2/2]
◆ exec_func()
Implements AthenaMPToolBase.
Definition at line 373 of file SharedHiveEvtQueueConsumer.cxx.
375 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
386 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
387 if(propertyServer==0) {
392 std::string propertyName(
"SkipEvents");
393 IntegerProperty skipEventsProp(propertyName,
skipEvents);
394 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
395 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
404 ATH_MSG_FATAL(
"Failed to acquire IHybridProcessorHelper interface");
406 return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
413 long intmask =
pow(0x100,
sizeof(
int))-1;
414 long evtnumAndChunk(0);
426 bool loop_ended = (evtnumAndChunk<0);
428 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
430 evtnum = evtnumAndChunk & intmask;
435 bool no_more_events =
false;
447 sc = StatusCode::FAILURE;
453 if (
sc.isFailure()) {
454 ATH_MSG_ERROR(
"Terminating event processing loop due to errors");
465 if(evtnumAndChunk<0) {
466 no_more_events =
true;
467 evtnumAndChunk *= -1;
468 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
471 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
473 evtnum = evtnumAndChunk & intmask;
478 if(!no_more_events) {
493 sc = StatusCode::FAILURE;
500 sc = StatusCode::SUCCESS;
525 *(
int*)(
outdata) = (all_ok?0:1);
527 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
528 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&createdEvts,
sizeof(
int));
531 outwork->size = outsize;
◆ extraDeps_update_handler()
Add StoreName to extra input/output deps as needed.
use the logic of the VarHandleKey to parse the DataObjID keys supplied via the ExtraInputs and ExtraOuputs Properties to add the StoreName if it's not explicitly given
◆ fin_func()
Implements AthenaMPToolBase.
Definition at line 542 of file SharedHiveEvtQueueConsumer.cxx.
544 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
552 if(
m_appMgr->finalize().isFailure()) {
553 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
563 *(
int*)(
outdata) = (all_ok?0:1);
565 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
567 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
570 outwork->size = outsize;
◆ finalize()
StatusCode SharedHiveEvtQueueConsumer::finalize |
( |
| ) |
|
|
overridevirtual |
◆ fmterror()
std::string AthenaMPToolBase::fmterror |
( |
int |
errnum | ) |
|
|
protectedinherited |
Definition at line 362 of file AthenaMPToolBase.cxx.
365 strerror_r(errnum, buf,
sizeof(buf));
366 return std::string(buf);
◆ generateOutputReport()
Implements IAthenaMPTool.
Reimplemented in EvtRangeProcessor, EvtRangeScatterer, SharedEvtQueueProvider, and SharedWriterTool.
Definition at line 128 of file AthenaMPToolBase.cxx.
133 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
140 std::ostringstream workindex;
153 std::ifstream inpStream(
logFile.string().c_str());
154 std::set<std::string> reportedFiles;
155 while(!inpStream.eof()) {
156 std::getline(inpStream,
line);
157 if(
line.find(
"WRITE")!=std::string::npos) {
160 std::vector<std::string>
entries;
161 while(startpos<
line.size()) {
162 while(
line[startpos]==
' ')
165 size_t endpos =
line.find(
' ',startpos);
166 if(endpos==std::string::npos) endpos =
line.size();
167 entries.push_back(
line.substr(startpos,endpos-startpos));
174 if(reportedFiles.find(
basename.string())==reportedFiles.end())
175 reportedFiles.insert(
basename.string());
180 if(
it1==jobOutputs->end()) {
186 newOutput.
filename = absolutename.string();
190 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
192 (*jobOutputs)[
basename.string()].push_back(newOutput);
◆ handleSavedPfc()
int AthenaMPToolBase::handleSavedPfc |
( |
const std::filesystem::path & |
dest_path | ) |
|
|
protectedinherited |
Definition at line 425 of file AthenaMPToolBase.cxx.
427 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
428 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
◆ initHive()
StatusCode SharedHiveEvtQueueConsumer::initHive |
( |
| ) |
|
|
private |
Definition at line 641 of file SharedHiveEvtQueueConsumer.cxx.
647 ISvcManager* pISM(
dynamic_cast<ISvcManager*
>(serviceLocator().
get()));
653 <<
" from SvcManager");
661 return StatusCode::FAILURE;
664 m_schedulerSvc = serviceLocator()->service(
"AvalancheSchedulerSvc");
692 return StatusCode::SUCCESS;
◆ initialize()
StatusCode SharedHiveEvtQueueConsumer::initialize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 65 of file SharedHiveEvtQueueConsumer.cxx.
76 return StatusCode::FAILURE;
82 IConversionSvc* cnvSvc = 0;
83 sc = serviceLocator()->service(
"AthenaPoolCnvSvc",cnvSvc);
87 ATH_MSG_ERROR(
"Error retrieving AthenaPoolCnvSvc " << cnvSvc);
88 return StatusCode::FAILURE;
92 return StatusCode::SUCCESS;
◆ inputHandles()
Return this algorithm's input handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ interfaceID()
static const InterfaceID& IAthenaMPTool::interfaceID |
( |
| ) |
|
|
inlinestaticinherited |
◆ killChildren()
void AthenaMPToolBase::killChildren |
( |
| ) |
|
|
overridevirtualinherited |
◆ msg() [1/2]
◆ msg() [2/2]
◆ msgLvl()
◆ operator()
virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator |
( |
| ) |
const & |
|
pure virtualinherited |
◆ operator=()
◆ outputHandles()
Return this algorithm's output handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ redirectLog()
int AthenaMPToolBase::redirectLog |
( |
const std::string & |
rundir, |
|
|
bool |
addTimeStamp = true |
|
) |
| |
|
protectedinherited |
Definition at line 278 of file AthenaMPToolBase.cxx.
281 int dup2result1(0), dup2result2(0);
283 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
288 dup2result1 = dup2(newout, STDOUT_FILENO);
289 dup2result2 = dup2(newout, STDERR_FILENO);
290 TEMP_FAILURE_RETRY(close(newout));
291 if(dup2result1==-1) {
295 if(dup2result2==-1) {
301 IMessageSvc* messageSvc(0);
302 StatusCode sc = serviceLocator()->service(
"MessageSvc",messageSvc);
308 IProperty* propertyServer =
dynamic_cast<IProperty*
>(messageSvc);
309 if(propertyServer==0) {
314 std::string propertyName(
"Format");
315 std::string oldFormat(
"");
316 StringProperty formatProp(propertyName,oldFormat);
317 sc = propertyServer->getProperty(&formatProp);
322 oldFormat = formatProp.value();
323 if(oldFormat.find(
"%t")==std::string::npos) {
325 std::string newFormat(
"%t " + oldFormat);
326 StringProperty newFormatProp(propertyName,newFormat);
327 if(propertyServer->setProperty(newFormatProp).isFailure()) {
328 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
333 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
◆ renounce()
◆ renounceArray()
◆ reopenFd()
int AthenaMPToolBase::reopenFd |
( |
int |
fd, |
|
|
const std::string & |
name |
|
) |
| |
|
privateinherited |
Definition at line 448 of file AthenaMPToolBase.cxx.
451 int old_openflags = fcntl(
fd,F_GETFL,0);
452 switch(old_openflags & O_ACCMODE) {
467 int old_descflags = fcntl(
fd,F_GETFD,0);
468 off_t oldpos = lseek(
fd,0,SEEK_CUR);
484 if(lseek(newfd,oldpos,SEEK_SET)==-1){
486 TEMP_FAILURE_RETRY(close(newfd));
489 TEMP_FAILURE_RETRY(close(
fd));
490 if(dup2(newfd,
fd)==-1) {
491 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
492 TEMP_FAILURE_RETRY(close(newfd));
495 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
496 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
497 TEMP_FAILURE_RETRY(close(newfd));
500 TEMP_FAILURE_RETRY(close(newfd));
◆ reopenFds()
int AthenaMPToolBase::reopenFds |
( |
| ) |
|
|
protectedinherited |
Definition at line 369 of file AthenaMPToolBase.cxx.
377 std::vector<const Io::FileAttr*> filemgrFiles;
378 std::vector<const Io::FileAttr*>::const_iterator itFile;
379 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
380 if(filenum!=filemgrFiles.size())
381 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
383 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
385 const std::string&
filename = (**itFile).name();
392 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
404 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
405 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
408 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
410 if(regEntry.fd==-1) {
412 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
416 if(
reopenFd(regEntry.fd,regEntry.name))
419 fdLog.insert(regEntry.fd);
◆ reportSubprocessStatuses()
void SharedHiveEvtQueueConsumer::reportSubprocessStatuses |
( |
| ) |
|
|
overridevirtual |
◆ setRandString()
void AthenaMPToolBase::setRandString |
( |
const std::string & |
randStr | ) |
|
|
overridevirtualinherited |
◆ subProcessLogs()
void SharedHiveEvtQueueConsumer::subProcessLogs |
( |
std::vector< std::string > & |
filenames | ) |
|
|
overridevirtual |
◆ sysInitialize()
◆ sysStart()
Handle START transition.
We override this in order to make sure that conditions handle keys can cache a pointer to the conditions container.
◆ updateIoReg()
int AthenaMPToolBase::updateIoReg |
( |
const std::string & |
rundir | ) |
|
|
protectedinherited |
◆ updateVHKA()
◆ useFdsRegistry()
◆ waitForSignal()
void AthenaMPToolBase::waitForSignal |
( |
| ) |
|
|
protectedinherited |
Definition at line 432 of file AthenaMPToolBase.cxx.
434 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
442 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
444 sigsuspend (&oldmask);
445 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
◆ m_appMgr
◆ m_chronoStatSvc
ServiceHandle<IChronoStatSvc> SharedHiveEvtQueueConsumer::m_chronoStatSvc |
|
private |
◆ m_dataShare
IDataShare* SharedHiveEvtQueueConsumer::m_dataShare {} |
|
private |
◆ m_debug
Gaudi::Property<bool> SharedHiveEvtQueueConsumer::m_debug |
|
private |
Initial value:{
this, "Debug", false,
"Perform extra debugging if true. The default is false."}
Definition at line 61 of file SharedHiveEvtQueueConsumer.h.
◆ m_detStore
◆ m_evtContext
IEvtSelector::Context* SharedHiveEvtQueueConsumer::m_evtContext {} |
|
private |
◆ m_evtProcessor
◆ m_evtSelector
IEvtSelector* AthenaMPToolBase::m_evtSelector |
|
protectedinherited |
◆ m_evtSelName
std::string AthenaMPToolBase::m_evtSelName |
|
protectedinherited |
◆ m_evtSelSeek
◆ m_evtStore
◆ m_fdsRegistry
◆ m_fileMgr
◆ m_fileMgrLog
std::string AthenaMPToolBase::m_fileMgrLog |
|
protectedinherited |
◆ m_finQueue
std::queue<pid_t> SharedHiveEvtQueueConsumer::m_finQueue |
|
private |
◆ m_ioMgr
◆ m_isPileup
Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"} |
|
protectedinherited |
◆ m_nEventsBeforeFork
Gaudi::Property<int> SharedHiveEvtQueueConsumer::m_nEventsBeforeFork |
|
private |
Initial value:{
this, "EventsBeforeFork", 0,
"The number of events before forking the workers. The default is 0."}
Definition at line 57 of file SharedHiveEvtQueueConsumer.h.
◆ m_nProcessedEvents
std::map<pid_t,int> SharedHiveEvtQueueConsumer::m_nProcessedEvents |
|
private |
◆ m_nprocs
int AthenaMPToolBase::m_nprocs |
|
protectedinherited |
◆ m_processGroup
◆ m_randStr
std::string AthenaMPToolBase::m_randStr |
|
protectedinherited |
◆ m_rankId
int SharedHiveEvtQueueConsumer::m_rankId {} |
|
private |
◆ m_schedulerSvc
SmartIF<IScheduler> SharedHiveEvtQueueConsumer::m_schedulerSvc |
|
private |
◆ m_sharedEventQueue
◆ m_sharedRankQueue
◆ m_subprocDirPrefix
std::string AthenaMPToolBase::m_subprocDirPrefix |
|
protectedinherited |
◆ m_subprocTopDir
std::string AthenaMPToolBase::m_subprocTopDir |
|
protectedinherited |
◆ m_useSharedWriter
Gaudi::Property<bool> SharedHiveEvtQueueConsumer::m_useSharedWriter |
|
private |
Initial value:{
this, "UseSharedWriter", false,
"Use SharedWriter to merge worker outputs on-the-fly if true. The default is false."}
Definition at line 65 of file SharedHiveEvtQueueConsumer.h.
◆ m_varHandleArraysDeclared
◆ m_vhka
The documentation for this class was generated from the following files:
const std::vector< ProcessStatus > & getStatuses() const
virtual void setCurrentEventNum(int num)=0
path
python interpreter configuration --------------------------------------—
def mkdir(path, recursive=True)
SmartIF< IScheduler > m_schedulerSvc
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
const std::vector< Process > & getChildren() const
StoreGateSvc_t m_evtStore
Pointer to StoreGate (event store by default)
std::vector< SG::VarHandleKeyArray * > m_vhka
bool try_receive_basic(T &)
IEvtSelectorSeek * m_evtSelSeek
AthenaInterprocess::SharedQueue * m_sharedRankQueue
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
virtual void setOwner(IDataHandleHolder *o)=0
#define COPY_FILE_HACK(_src, _dest)
Abstract interface for sharing data.
std::map< pid_t, int > m_nProcessedEvents
Helper interface for implementing hybrid MP+MT. Used by the Hybrid Shared Event Queue Consumer MP too...
AthenaInterprocess::SharedQueue * m_sharedEventQueue
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
std::vector< HWIdentifier >::iterator it1
::StatusCode StatusCode
StatusCode definition for legacy code.
StoreGateSvc_t m_detStore
Pointer to StoreGate (detector store by default)
virtual void renounce()=0
std::conditional< std::is_base_of< SG::VarHandleKeyArray, T >::value, VarHandleKeyArrayType, type2 >::type type
IEvtSelector::Context * m_evtContext
std::atomic< bool > sig_done
int ir
counter of the current depth
#define ATH_MSG_WARNING(x)
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual int drainScheduler(int &finishedEvts, bool report)=0
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
virtual void resetAppReturnCode()=0
SG::VarHandleKey & vhKey()
Return a non-const reference to the HandleKey.
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Gaudi::Details::PropertyBase & declareGaudiProperty(Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
specialization for handling Gaudi::Property<SG::VarHandleKey>
Gaudi::Property< bool > m_useSharedWriter
Gaudi::Property< bool > m_debug
virtual bool terminateLoop()=0