Loading [MathJax]/jax/output/SVG/config.js
|
ATLAS Offline Software
|
#include <SharedEvtQueueConsumer.h>
|
| SharedEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent) |
|
virtual | ~SharedEvtQueueConsumer () override |
|
virtual StatusCode | initialize () override |
|
virtual StatusCode | finalize () override |
|
virtual int makePool | ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override |
|
virtual StatusCode exec | ATLAS_NOT_THREAD_SAFE () override |
|
virtual StatusCode wait_once | ATLAS_NOT_THREAD_SAFE (pid_t &pid) override |
|
virtual void | reportSubprocessStatuses () override |
|
virtual void | subProcessLogs (std::vector< std::string > &) override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | exec_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | fin_func () override |
|
virtual AthenaMP::AllWorkerOutputs_ptr | generateOutputReport () override |
|
virtual void | useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override |
|
virtual void | setRandString (const std::string &randStr) override |
|
virtual void | killChildren () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func ()=0 |
|
ServiceHandle< StoreGateSvc > & | evtStore () |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | evtStore () const |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | detStore () const |
| The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
virtual StatusCode | sysInitialize () override |
| Perform system initialization for an algorithm. More...
|
|
virtual StatusCode | sysStart () override |
| Handle START transition. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | inputHandles () const override |
| Return this algorithm's input handles. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | outputHandles () const override |
| Return this algorithm's output handles. More...
|
|
Gaudi::Details::PropertyBase & | declareProperty (Gaudi::Property< T > &t) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKey &hndl, const std::string &doc, const SG::VarHandleKeyType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleBase &hndl, const std::string &doc, const SG::VarHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKeyArray &hndArr, const std::string &doc, const SG::VarHandleKeyArrayType &) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc, const SG::NotHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc="none") |
| Declare a new Gaudi property. More...
|
|
void | updateVHKA (Gaudi::Details::PropertyBase &) |
|
MsgStream & | msg () const |
|
MsgStream & | msg (const MSG::Level lvl) const |
|
bool | msgLvl (const MSG::Level lvl) const |
|
virtual std::unique_ptr< ScheduledWork > | operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0 |
|
Definition at line 21 of file SharedEvtQueueConsumer.h.
◆ StoreGateSvc_t
◆ TimeValType
◆ ESRange_Status
Enumerator |
---|
ESRANGE_SUCCESS | |
ESRANGE_NOTFOUND | |
ESRANGE_SEEKFAILED | |
ESRANGE_PROCFAILED | |
ESRANGE_FILENOTMADE | |
ESRANGE_BADINPFILE | |
Definition at line 56 of file AthenaMPToolBase.h.
◆ Func_Flag
◆ SharedEvtQueueConsumer() [1/3]
SharedEvtQueueConsumer::SharedEvtQueueConsumer |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~SharedEvtQueueConsumer()
SharedEvtQueueConsumer::~SharedEvtQueueConsumer |
( |
| ) |
|
|
overridevirtual |
◆ SharedEvtQueueConsumer() [2/3]
SharedEvtQueueConsumer::SharedEvtQueueConsumer |
( |
| ) |
|
|
private |
◆ SharedEvtQueueConsumer() [3/3]
◆ ATLAS_NOT_THREAD_SAFE() [1/5]
virtual StatusCode exec SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
| ) |
|
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [2/5]
◆ ATLAS_NOT_THREAD_SAFE() [3/5]
int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE |
( |
Func_Flag |
flag, |
|
|
pid_t |
pid = 0 |
|
) |
| |
|
protectedinherited |
◆ ATLAS_NOT_THREAD_SAFE() [4/5]
virtual int makePool SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
int |
maxevt, |
|
|
int |
nprocs, |
|
|
const std::string & |
topdir |
|
) |
| |
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [5/5]
virtual StatusCode wait_once SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
pid_t & |
pid | ) |
|
|
overridevirtual |
◆ bootstrap_func() [1/2]
Definition at line 285 of file SharedEvtQueueConsumer.cxx.
290 outwork->data =
malloc(
sizeof(
int));
291 *(
int*)(outwork->data) = 1;
292 outwork->size =
sizeof(
int);
296 std::map<IService*,int> bkgEvtSelectors;
299 for(IService* ptrSvc : serviceLocator()->getServices()) {
300 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
308 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
313 bkgEvtSelectors.emplace(ptrSvc,0);
326 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
331 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
340 std::ostringstream workindex;
348 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
358 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
365 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
369 if(std::filesystem::is_regular_file(
"SimParams.db"))
370 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
371 if(std::filesystem::is_regular_file(
"DigitParams.db"))
372 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
373 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
374 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
384 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
390 ATH_MSG_ERROR(
"Failed to make the event selector a share client");
394 ATH_MSG_DEBUG(
"Successfully made the event selector a share client");
400 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
401 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
405 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
410 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
421 if(!propertyServer) {
426 std::string propertyName(
"SkipEvents");
428 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
440 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
443 if(!evtSelSvc->start().isSuccess()) {
458 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
469 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
470 if(evtsel->start().isSuccess()) {
472 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
474 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
480 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
495 if(
line.empty())
continue;
499 int rank = std::stoi(
line,&
idx);
501 msg(MSG::INFO) <<
"This worker will proces the following events #";
504 int evtnum = std::stoi(
line,&
idx);
506 msg(MSG::INFO) <<
" " << evtnum;
524 if(chdir(worker_rundir.string().c_str())==-1) {
525 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
533 *(
int*)(outwork->data) = 0;
◆ bootstrap_func() [2/2]
◆ declareGaudiProperty() [1/4]
specialization for handling Gaudi::Property<SG::VarHandleKeyArray>
Definition at line 170 of file AthCommonDataStore.h.
175 hndl.documentation());
◆ declareGaudiProperty() [2/4]
specialization for handling Gaudi::Property<SG::VarHandleKey>
Definition at line 156 of file AthCommonDataStore.h.
161 hndl.documentation());
◆ declareGaudiProperty() [3/4]
specialization for handling Gaudi::Property<SG::VarHandleBase>
Definition at line 184 of file AthCommonDataStore.h.
189 hndl.documentation());
◆ declareGaudiProperty() [4/4]
◆ declareProperty() [1/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleBase
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 245 of file AthCommonDataStore.h.
250 this->declare(hndl.
vhKey());
251 hndl.
vhKey().setOwner(
this);
253 return PBASE::declareProperty(
name,hndl,
doc);
◆ declareProperty() [2/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleKey
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 221 of file AthCommonDataStore.h.
229 return PBASE::declareProperty(
name,hndl,
doc);
◆ declareProperty() [3/6]
◆ declareProperty() [4/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This is the generic version, for types that do not derive from SG::VarHandleKey
. It just forwards to the base class version of declareProperty
.
Definition at line 333 of file AthCommonDataStore.h.
338 return PBASE::declareProperty(
name, property,
doc);
◆ declareProperty() [5/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This dispatches to either the generic declareProperty
or the one for VarHandle/Key/KeyArray.
Definition at line 352 of file AthCommonDataStore.h.
◆ declareProperty() [6/6]
◆ detStore()
◆ evtSelector()
IEvtSelector* AthenaMPToolBase::evtSelector |
( |
| ) |
|
|
inlineprotectedinherited |
◆ evtStore() [1/2]
◆ evtStore() [2/2]
◆ exec_func()
Implements AthenaMPToolBase.
Definition at line 537 of file SharedEvtQueueConsumer.cxx.
539 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
543 long intmask =
pow(0x100,
sizeof(
int))-1;
545 int nEventsProcessed(0);
546 long evtnumAndChunk(0);
548 unsigned evtCounter(0);
562 while(evtnumAndChunk>0) {
567 evtnumAndChunk *= -1;
570 System::ProcessTime time_start = System::getProcessTime();
574 bool firstOrder(
true);
586 evtnum = *predefinedEvt;
588 fs << (firstOrder?
":":
",") << evtnum;
591 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
600 if(evtnumAndChunk<=0) {
601 evtnumAndChunk *= -1;
602 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
605 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
607 evtnum = evtnumAndChunk & intmask;
612 fs << (firstOrder?
":":
",") << evtnum+
i;
666 System::ProcessTime time_delta = System::getProcessTime() - time_start;
667 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
693 *(
int*)(
outdata) = (all_ok?0:1);
695 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
696 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
697 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
699 outwork->size = outsize;
◆ extraDeps_update_handler()
Add StoreName to extra input/output deps as needed.
use the logic of the VarHandleKey to parse the DataObjID keys supplied via the ExtraInputs and ExtraOuputs Properties to add the StoreName if it's not explicitly given
◆ fin_func()
Implements AthenaMPToolBase.
Definition at line 707 of file SharedEvtQueueConsumer.cxx.
709 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
718 if(
m_appMgr->finalize().isFailure()) {
719 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
729 *(
int*)(
outdata) = (all_ok?0:1);
731 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
733 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
735 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
738 outwork->size = outsize;
◆ finalize()
StatusCode SharedEvtQueueConsumer::finalize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 121 of file SharedEvtQueueConsumer.cxx.
131 srand((
unsigned)
time(0));
132 std::ostringstream randname;
134 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
139 std::filesystem::rename(ordersFile,ordersFileBakpath);
145 std::ostringstream workerIndex;
149 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
150 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
151 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
153 while(fs_worker.good()) {
154 std::getline(fs_worker,
line);
168 return StatusCode::SUCCESS;
◆ fmterror()
std::string AthenaMPToolBase::fmterror |
( |
int |
errnum | ) |
|
|
protectedinherited |
Definition at line 358 of file AthenaMPToolBase.cxx.
361 strerror_r(errnum, buf,
sizeof(buf));
362 return std::string(buf);
◆ generateOutputReport()
Implements IAthenaMPTool.
Reimplemented in EvtRangeProcessor, EvtRangeScatterer, SharedEvtQueueProvider, and SharedWriterTool.
Definition at line 131 of file AthenaMPToolBase.cxx.
136 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
143 std::ostringstream workindex;
156 std::ifstream inpStream(
logFile.string().c_str());
157 std::set<std::string> reportedFiles;
158 while(!inpStream.eof()) {
159 std::getline(inpStream,
line);
160 if(
line.find(
"WRITE")!=std::string::npos) {
163 std::vector<std::string>
entries;
164 while(startpos<
line.size()) {
165 while(
line[startpos]==
' ')
168 size_t endpos =
line.find(
' ',startpos);
169 if(endpos==std::string::npos) endpos =
line.size();
170 entries.push_back(
line.substr(startpos,endpos-startpos));
177 if(reportedFiles.find(
basename.string())==reportedFiles.end())
178 reportedFiles.insert(
basename.string());
183 if(
it1==jobOutputs->end()) {
189 newOutput.
filename = absolutename.string();
193 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
195 (*jobOutputs)[
basename.string()].push_back(newOutput);
◆ handleSavedPfc()
int AthenaMPToolBase::handleSavedPfc |
( |
const std::filesystem::path & |
dest_path | ) |
|
|
protectedinherited |
Definition at line 421 of file AthenaMPToolBase.cxx.
423 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
424 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
◆ initialize()
StatusCode SharedEvtQueueConsumer::initialize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 73 of file SharedEvtQueueConsumer.cxx.
84 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85 return StatusCode::FAILURE;
100 return StatusCode::FAILURE;
107 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolCnvSvc"));
111 return StatusCode::FAILURE;
118 return StatusCode::SUCCESS;
◆ inputHandles()
Return this algorithm's input handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ interfaceID()
static const InterfaceID& IAthenaMPTool::interfaceID |
( |
| ) |
|
|
inlinestaticinherited |
◆ killChildren()
void AthenaMPToolBase::killChildren |
( |
| ) |
|
|
overridevirtualinherited |
◆ msg() [1/2]
◆ msg() [2/2]
◆ msgLvl()
◆ operator()
virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator |
( |
| ) |
const & |
|
pure virtualinherited |
◆ operator=()
◆ outputHandles()
Return this algorithm's output handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ redirectLog()
int AthenaMPToolBase::redirectLog |
( |
const std::string & |
rundir, |
|
|
bool |
addTimeStamp = true |
|
) |
| |
|
protectedinherited |
Definition at line 281 of file AthenaMPToolBase.cxx.
284 int dup2result1(0), dup2result2(0);
286 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
291 dup2result1 = dup2(newout, STDOUT_FILENO);
292 dup2result2 = dup2(newout, STDERR_FILENO);
293 TEMP_FAILURE_RETRY(close(newout));
294 if(dup2result1==-1) {
298 if(dup2result2==-1) {
304 SmartIF<IProperty> propertyServer(
msgSvc());
305 if(propertyServer==0) {
310 std::string propertyName(
"Format");
311 std::string oldFormat(
"");
312 StringProperty formatProp(propertyName,oldFormat);
313 StatusCode sc = propertyServer->getProperty(&formatProp);
318 oldFormat = formatProp.value();
319 if(oldFormat.find(
"%t")==std::string::npos) {
321 std::string newFormat(
"%t " + oldFormat);
322 StringProperty newFormatProp(propertyName,newFormat);
323 if(propertyServer->setProperty(newFormatProp).isFailure()) {
324 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
329 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
◆ renounce()
◆ renounceArray()
◆ reopenFd()
int AthenaMPToolBase::reopenFd |
( |
int |
fd, |
|
|
const std::string & |
name |
|
) |
| |
|
privateinherited |
Definition at line 444 of file AthenaMPToolBase.cxx.
447 int old_openflags = fcntl(
fd,F_GETFL,0);
448 switch(old_openflags & O_ACCMODE) {
463 int old_descflags = fcntl(
fd,F_GETFD,0);
464 off_t oldpos = lseek(
fd,0,SEEK_CUR);
480 if(lseek(newfd,oldpos,SEEK_SET)==-1){
482 TEMP_FAILURE_RETRY(close(newfd));
485 TEMP_FAILURE_RETRY(close(
fd));
486 if(dup2(newfd,
fd)==-1) {
487 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
488 TEMP_FAILURE_RETRY(close(newfd));
491 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
492 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
493 TEMP_FAILURE_RETRY(close(newfd));
496 TEMP_FAILURE_RETRY(close(newfd));
◆ reopenFds()
int AthenaMPToolBase::reopenFds |
( |
| ) |
|
|
protectedinherited |
Definition at line 365 of file AthenaMPToolBase.cxx.
373 std::vector<const Io::FileAttr*> filemgrFiles;
374 std::vector<const Io::FileAttr*>::const_iterator itFile;
375 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
376 if(filenum!=filemgrFiles.size())
377 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
379 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
381 const std::string&
filename = (**itFile).name();
388 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
400 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
401 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
404 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
406 if(regEntry.fd==-1) {
408 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
412 if(
reopenFd(regEntry.fd,regEntry.name))
415 fdLog.insert(regEntry.fd);
◆ reportSubprocessStatuses()
void SharedEvtQueueConsumer::reportSubprocessStatuses |
( |
| ) |
|
|
overridevirtual |
◆ setRandString()
void AthenaMPToolBase::setRandString |
( |
const std::string & |
randStr | ) |
|
|
overridevirtualinherited |
◆ subProcessLogs()
void SharedEvtQueueConsumer::subProcessLogs |
( |
std::vector< std::string > & |
filenames | ) |
|
|
overridevirtual |
◆ sysInitialize()
◆ sysStart()
Handle START transition.
We override this in order to make sure that conditions handle keys can cache a pointer to the conditions container.
◆ updateIoReg()
int AthenaMPToolBase::updateIoReg |
( |
const std::string & |
rundir | ) |
|
|
protectedinherited |
◆ updateVHKA()
◆ useFdsRegistry()
◆ waitForSignal()
void AthenaMPToolBase::waitForSignal |
( |
| ) |
|
|
protectedinherited |
Definition at line 428 of file AthenaMPToolBase.cxx.
430 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
438 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
440 sigsuspend (&oldmask);
441 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
◆ m_appMgr
◆ m_chronoStatSvc
ServiceHandle<IChronoStatSvc> SharedEvtQueueConsumer::m_chronoStatSvc |
|
private |
◆ m_dataShare
SmartIF<IDataShare> SharedEvtQueueConsumer::m_dataShare |
|
private |
◆ m_debug
bool SharedEvtQueueConsumer::m_debug |
|
private |
◆ m_detStore
◆ m_eventOrders
std::vector<int> SharedEvtQueueConsumer::m_eventOrders |
|
private |
◆ m_eventOrdersFile
std::string SharedEvtQueueConsumer::m_eventOrdersFile |
|
private |
◆ m_eventStat
◆ m_evtContext
IEvtSelector::Context* SharedEvtQueueConsumer::m_evtContext |
|
private |
◆ m_evtProcessor
◆ m_evtSeek
SmartIF<IEventSeek> SharedEvtQueueConsumer::m_evtSeek |
|
private |
◆ m_evtSelector
SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector |
|
protectedinherited |
◆ m_evtSelName
std::string AthenaMPToolBase::m_evtSelName |
|
protectedinherited |
◆ m_evtSelSeek
◆ m_evtShare
SmartIF<IEventShare> SharedEvtQueueConsumer::m_evtShare |
|
private |
◆ m_evtStore
◆ m_fdsRegistry
◆ m_fileMgr
◆ m_fileMgrLog
std::string AthenaMPToolBase::m_fileMgrLog |
|
protectedinherited |
◆ m_finQueue
std::queue<pid_t> SharedEvtQueueConsumer::m_finQueue |
|
private |
◆ m_ioMgr
◆ m_isPileup
Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"} |
|
protectedinherited |
◆ m_isRoundRobin
bool SharedEvtQueueConsumer::m_isRoundRobin |
|
private |
◆ m_masterPid
pid_t SharedEvtQueueConsumer::m_masterPid |
|
private |
◆ m_nEventsBeforeFork
int SharedEvtQueueConsumer::m_nEventsBeforeFork |
|
private |
◆ m_nprocs
int AthenaMPToolBase::m_nprocs |
|
protectedinherited |
◆ m_nSkipEvents
int SharedEvtQueueConsumer::m_nSkipEvents |
|
private |
◆ m_processGroup
◆ m_randStr
std::string AthenaMPToolBase::m_randStr |
|
protectedinherited |
◆ m_rankId
int SharedEvtQueueConsumer::m_rankId |
|
private |
◆ m_readEventOrders
bool SharedEvtQueueConsumer::m_readEventOrders |
|
private |
◆ m_sharedEventQueue
◆ m_sharedRankQueue
◆ m_subprocDirPrefix
std::string AthenaMPToolBase::m_subprocDirPrefix |
|
protectedinherited |
◆ m_subprocTopDir
std::string AthenaMPToolBase::m_subprocTopDir |
|
protectedinherited |
◆ m_useSharedReader
bool SharedEvtQueueConsumer::m_useSharedReader |
|
private |
◆ m_useSharedWriter
bool SharedEvtQueueConsumer::m_useSharedWriter |
|
private |
◆ m_varHandleArraysDeclared
◆ m_vhka
The documentation for this class was generated from the following files:
const std::vector< ProcessStatus > & getStatuses() const
path
python interpreter configuration --------------------------------------—
def mkdir(path, recursive=True)
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
SmartIF< IEventSeek > m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
const std::vector< Process > & getChildren() const
StoreGateSvc_t m_evtStore
Pointer to StoreGate (event store by default)
std::vector< SG::VarHandleKeyArray * > m_vhka
bool try_receive_basic(T &)
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
virtual void setOwner(IDataHandleHolder *o)=0
#define COPY_FILE_HACK(_src, _dest)
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
msgSvc
Provide convenience handles for various services.
SmartIF< IDataShare > m_dataShare
std::vector< HWIdentifier >::iterator it1
::StatusCode StatusCode
StatusCode definition for legacy code.
std::vector< int > m_eventOrders
StoreGateSvc_t m_detStore
Pointer to StoreGate (detector store by default)
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual void renounce()=0
std::conditional< std::is_base_of< SG::VarHandleKeyArray, T >::value, VarHandleKeyArrayType, type2 >::type type
SmartIF< IEventShare > m_evtShare
AthenaInterprocess::SharedQueue * m_sharedEventQueue
def time(flags, cells_name, *args, **kw)
#define ATH_MSG_WARNING(x)
IEvtSelector::Context * m_evtContext
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
SG::VarHandleKey & vhKey()
Return a non-const reference to the HandleKey.
constexpr int pow(int base, int exp) noexcept
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Gaudi::Details::PropertyBase & declareGaudiProperty(Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
specialization for handling Gaudi::Property<SG::VarHandleKey>
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Abstract interface for seeking for an event selector.
System::ProcessTime::TimeValueType TimeValType
std::string m_eventOrdersFile