Loading [MathJax]/extensions/tex2jax.js
 |
ATLAS Offline Software
|
#include <SharedEvtQueueConsumer.h>
|
| SharedEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent) |
|
virtual | ~SharedEvtQueueConsumer () override |
|
virtual StatusCode | initialize () override |
|
virtual StatusCode | finalize () override |
|
virtual int makePool | ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override |
|
virtual StatusCode exec | ATLAS_NOT_THREAD_SAFE () override |
|
virtual StatusCode wait_once | ATLAS_NOT_THREAD_SAFE (pid_t &pid) override |
|
virtual void | reportSubprocessStatuses () override |
|
virtual void | subProcessLogs (std::vector< std::string > &) override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | exec_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | fin_func () override |
|
virtual AthenaMP::AllWorkerOutputs_ptr | generateOutputReport () override |
|
virtual void | useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override |
|
virtual void | setRandString (const std::string &randStr) override |
|
virtual void | setMaxEvt (int maxEvt) override |
|
virtual void | setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override |
|
virtual void | killChildren () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func ()=0 |
|
ServiceHandle< StoreGateSvc > & | evtStore () |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | evtStore () const |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | detStore () const |
| The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
virtual StatusCode | sysInitialize () override |
| Perform system initialization for an algorithm. More...
|
|
virtual StatusCode | sysStart () override |
| Handle START transition. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | inputHandles () const override |
| Return this algorithm's input handles. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | outputHandles () const override |
| Return this algorithm's output handles. More...
|
|
Gaudi::Details::PropertyBase & | declareProperty (Gaudi::Property< T > &t) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKey &hndl, const std::string &doc, const SG::VarHandleKeyType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleBase &hndl, const std::string &doc, const SG::VarHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKeyArray &hndArr, const std::string &doc, const SG::VarHandleKeyArrayType &) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc, const SG::NotHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc="none") |
| Declare a new Gaudi property. More...
|
|
void | updateVHKA (Gaudi::Details::PropertyBase &) |
|
MsgStream & | msg () const |
|
MsgStream & | msg (const MSG::Level lvl) const |
|
bool | msgLvl (const MSG::Level lvl) const |
|
virtual std::unique_ptr< ScheduledWork > | operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0 |
|
Definition at line 21 of file SharedEvtQueueConsumer.h.
◆ StoreGateSvc_t
◆ TimeValType
◆ ESRange_Status
Enumerator |
---|
ESRANGE_SUCCESS | |
ESRANGE_NOTFOUND | |
ESRANGE_SEEKFAILED | |
ESRANGE_PROCFAILED | |
ESRANGE_FILENOTMADE | |
ESRANGE_BADINPFILE | |
Definition at line 59 of file AthenaMPToolBase.h.
◆ Func_Flag
◆ SharedEvtQueueConsumer() [1/3]
SharedEvtQueueConsumer::SharedEvtQueueConsumer |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~SharedEvtQueueConsumer()
SharedEvtQueueConsumer::~SharedEvtQueueConsumer |
( |
| ) |
|
|
overridevirtual |
◆ SharedEvtQueueConsumer() [2/3]
SharedEvtQueueConsumer::SharedEvtQueueConsumer |
( |
| ) |
|
|
private |
◆ SharedEvtQueueConsumer() [3/3]
◆ ATLAS_NOT_THREAD_SAFE() [1/5]
virtual StatusCode exec SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
| ) |
|
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [2/5]
◆ ATLAS_NOT_THREAD_SAFE() [3/5]
int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE |
( |
Func_Flag |
flag, |
|
|
pid_t |
pid = 0 |
|
) |
| |
|
protectedinherited |
◆ ATLAS_NOT_THREAD_SAFE() [4/5]
virtual int makePool SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
int |
maxevt, |
|
|
int |
nprocs, |
|
|
const std::string & |
topdir |
|
) |
| |
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [5/5]
virtual StatusCode wait_once SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
pid_t & |
pid | ) |
|
|
overridevirtual |
◆ bootstrap_func() [1/2]
Definition at line 285 of file SharedEvtQueueConsumer.cxx.
290 outwork->data =
malloc(
sizeof(
int));
291 *(
int*)(outwork->data) = 1;
292 outwork->size =
sizeof(
int);
296 std::map<IService*,int> bkgEvtSelectors;
299 for(IService* ptrSvc : serviceLocator()->getServices()) {
300 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
308 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
313 bkgEvtSelectors.emplace(ptrSvc,0);
326 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
331 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
340 std::ostringstream workindex;
348 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
358 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
365 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
369 if(std::filesystem::is_regular_file(
"SimParams.db"))
370 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
371 if(std::filesystem::is_regular_file(
"DigitParams.db"))
372 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
373 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
374 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
384 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
390 ATH_MSG_ERROR(
"Failed to make the event selector a share client");
394 ATH_MSG_DEBUG(
"Successfully made the event selector a share client");
400 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
401 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
405 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
410 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
421 if(!propertyServer) {
426 std::string propertyName(
"SkipEvents");
428 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
440 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
443 if(!evtSelSvc->start().isSuccess()) {
458 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
469 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
470 if(evtsel->start().isSuccess()) {
472 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
474 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
480 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
495 if(
line.empty())
continue;
499 int rank = std::stoi(
line,&
idx);
501 msg(MSG::INFO) <<
"This worker will proces the following events #";
504 int evtnum = std::stoi(
line,&
idx);
506 msg(MSG::INFO) <<
" " << evtnum;
524 if(chdir(worker_rundir.string().c_str())==-1) {
525 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
533 *(
int*)(outwork->data) = 0;
◆ bootstrap_func() [2/2]
◆ declareGaudiProperty() [1/4]
specialization for handling Gaudi::Property<SG::VarHandleKeyArray>
Definition at line 170 of file AthCommonDataStore.h.
175 hndl.documentation());
◆ declareGaudiProperty() [2/4]
specialization for handling Gaudi::Property<SG::VarHandleKey>
Definition at line 156 of file AthCommonDataStore.h.
161 hndl.documentation());
◆ declareGaudiProperty() [3/4]
specialization for handling Gaudi::Property<SG::VarHandleBase>
Definition at line 184 of file AthCommonDataStore.h.
189 hndl.documentation());
◆ declareGaudiProperty() [4/4]
◆ declareProperty() [1/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleBase
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 245 of file AthCommonDataStore.h.
250 this->declare(hndl.
vhKey());
251 hndl.
vhKey().setOwner(
this);
253 return PBASE::declareProperty(
name,hndl,
doc);
◆ declareProperty() [2/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleKey
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 221 of file AthCommonDataStore.h.
229 return PBASE::declareProperty(
name,hndl,
doc);
◆ declareProperty() [3/6]
◆ declareProperty() [4/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This is the generic version, for types that do not derive from SG::VarHandleKey
. It just forwards to the base class version of declareProperty
.
Definition at line 333 of file AthCommonDataStore.h.
338 return PBASE::declareProperty(
name, property,
doc);
◆ declareProperty() [5/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This dispatches to either the generic declareProperty
or the one for VarHandle/Key/KeyArray.
Definition at line 352 of file AthCommonDataStore.h.
◆ declareProperty() [6/6]
◆ detStore()
◆ evtSelector()
IEvtSelector* AthenaMPToolBase::evtSelector |
( |
| ) |
|
|
inlineprotectedinherited |
◆ evtStore() [1/2]
◆ evtStore() [2/2]
◆ exec_func()
Implements AthenaMPToolBase.
Definition at line 537 of file SharedEvtQueueConsumer.cxx.
539 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
543 long intmask =
pow(0x100,
sizeof(
int))-1;
545 int nEventsProcessed(0);
546 long evtnumAndChunk(0);
548 unsigned evtCounter(0);
549 int evtnum(0), chunkSize(1);
559 System::ProcessTime time_start = System::getProcessTime();
563 bool firstOrder(
true);
575 evtnum = *predefinedEvt;
577 fs << (firstOrder?
":":
",") << evtnum;
580 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
589 if(evtnumAndChunk<=0) {
590 evtnumAndChunk *= -1;
591 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
594 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
595 chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
596 evtnum = evtnumAndChunk & intmask;
597 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
600 for(
int i(0);
i<chunkSize;++
i) {
601 fs << (firstOrder?
":":
",") << evtnum+
i;
640 nEventsProcessed += chunkSize;
646 ATH_MSG_ERROR(
"Unable to process the chunk (" << evtnum <<
"," << evtnum+chunkSize-1 <<
")");
659 System::ProcessTime time_delta = System::getProcessTime() - time_start;
660 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
686 *(
int*)(
outdata) = (all_ok?0:1);
688 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
689 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
690 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
692 outwork->size = outsize;
◆ extraDeps_update_handler()
Add StoreName to extra input/output deps as needed.
use the logic of the VarHandleKey to parse the DataObjID keys supplied via the ExtraInputs and ExtraOuputs Properties to add the StoreName if it's not explicitly given
◆ fin_func()
Implements AthenaMPToolBase.
Definition at line 700 of file SharedEvtQueueConsumer.cxx.
702 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
711 if(
m_appMgr->finalize().isFailure()) {
712 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
722 *(
int*)(
outdata) = (all_ok?0:1);
724 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
726 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
728 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
731 outwork->size = outsize;
◆ finalize()
StatusCode SharedEvtQueueConsumer::finalize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 121 of file SharedEvtQueueConsumer.cxx.
131 srand((
unsigned)
time(0));
132 std::ostringstream randname;
134 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
139 std::filesystem::rename(ordersFile,ordersFileBakpath);
145 std::ostringstream workerIndex;
149 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
150 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
151 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
153 while(fs_worker.good()) {
154 std::getline(fs_worker,
line);
168 return StatusCode::SUCCESS;
◆ fmterror()
std::string AthenaMPToolBase::fmterror |
( |
int |
errnum | ) |
|
|
protectedinherited |
Definition at line 359 of file AthenaMPToolBase.cxx.
362 strerror_r(errnum, buf,
sizeof(buf));
363 return std::string(buf);
◆ generateOutputReport()
Implements IAthenaMPTool.
Reimplemented in EvtRangeProcessor, EvtRangeScatterer, SharedEvtQueueProvider, and SharedWriterTool.
Definition at line 132 of file AthenaMPToolBase.cxx.
137 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
144 std::ostringstream workindex;
157 std::ifstream inpStream(
logFile.string().c_str());
158 std::set<std::string> reportedFiles;
159 while(!inpStream.eof()) {
160 std::getline(inpStream,
line);
161 if(
line.find(
"WRITE")!=std::string::npos) {
164 std::vector<std::string>
entries;
165 while(startpos<
line.size()) {
166 while(
line[startpos]==
' ')
169 size_t endpos =
line.find(
' ',startpos);
170 if(endpos==std::string::npos) endpos =
line.size();
171 entries.push_back(
line.substr(startpos,endpos-startpos));
178 if(reportedFiles.find(
basename.string())==reportedFiles.end())
179 reportedFiles.insert(
basename.string());
184 if(
it1==jobOutputs->end()) {
190 newOutput.
filename = absolutename.string();
194 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
196 (*jobOutputs)[
basename.string()].push_back(newOutput);
◆ handleSavedPfc()
int AthenaMPToolBase::handleSavedPfc |
( |
const std::filesystem::path & |
dest_path | ) |
|
|
protectedinherited |
Definition at line 422 of file AthenaMPToolBase.cxx.
424 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
425 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
◆ initialize()
StatusCode SharedEvtQueueConsumer::initialize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 73 of file SharedEvtQueueConsumer.cxx.
84 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85 return StatusCode::FAILURE;
100 return StatusCode::FAILURE;
107 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolCnvSvc"));
111 return StatusCode::FAILURE;
118 return StatusCode::SUCCESS;
◆ inputHandles()
Return this algorithm's input handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ interfaceID()
static const InterfaceID& IAthenaMPTool::interfaceID |
( |
| ) |
|
|
inlinestaticinherited |
◆ killChildren()
void AthenaMPToolBase::killChildren |
( |
| ) |
|
|
overridevirtualinherited |
◆ msg() [1/2]
◆ msg() [2/2]
◆ msgLvl()
◆ operator()
virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator |
( |
| ) |
const & |
|
pure virtualinherited |
◆ operator=()
◆ outputHandles()
Return this algorithm's output handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ redirectLog()
int AthenaMPToolBase::redirectLog |
( |
const std::string & |
rundir, |
|
|
bool |
addTimeStamp = true |
|
) |
| |
|
protectedinherited |
Definition at line 282 of file AthenaMPToolBase.cxx.
285 int dup2result1(0), dup2result2(0);
287 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
292 dup2result1 = dup2(newout, STDOUT_FILENO);
293 dup2result2 = dup2(newout, STDERR_FILENO);
294 TEMP_FAILURE_RETRY(close(newout));
295 if(dup2result1==-1) {
299 if(dup2result2==-1) {
305 SmartIF<IProperty> propertyServer(
msgSvc());
306 if(propertyServer==0) {
311 std::string propertyName(
"Format");
312 std::string oldFormat(
"");
313 StringProperty formatProp(propertyName,oldFormat);
314 StatusCode sc = propertyServer->getProperty(&formatProp);
319 oldFormat = formatProp.value();
320 if(oldFormat.find(
"%t")==std::string::npos) {
322 std::string newFormat(
"%t " + oldFormat);
323 StringProperty newFormatProp(propertyName,newFormat);
324 if(propertyServer->setProperty(newFormatProp).isFailure()) {
325 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
330 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
◆ renounce()
◆ renounceArray()
◆ reopenFd()
int AthenaMPToolBase::reopenFd |
( |
int |
fd, |
|
|
const std::string & |
name |
|
) |
| |
|
privateinherited |
Definition at line 445 of file AthenaMPToolBase.cxx.
448 int old_openflags = fcntl(
fd,F_GETFL,0);
449 switch(old_openflags & O_ACCMODE) {
464 int old_descflags = fcntl(
fd,F_GETFD,0);
465 off_t oldpos = lseek(
fd,0,SEEK_CUR);
481 if(lseek(newfd,oldpos,SEEK_SET)==-1){
483 TEMP_FAILURE_RETRY(close(newfd));
486 TEMP_FAILURE_RETRY(close(
fd));
487 if(dup2(newfd,
fd)==-1) {
488 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
489 TEMP_FAILURE_RETRY(close(newfd));
492 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
493 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
494 TEMP_FAILURE_RETRY(close(newfd));
497 TEMP_FAILURE_RETRY(close(newfd));
◆ reopenFds()
int AthenaMPToolBase::reopenFds |
( |
| ) |
|
|
protectedinherited |
Definition at line 366 of file AthenaMPToolBase.cxx.
374 std::vector<const Io::FileAttr*> filemgrFiles;
375 std::vector<const Io::FileAttr*>::const_iterator itFile;
376 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
377 if(filenum!=filemgrFiles.size())
378 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
380 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
382 const std::string&
filename = (**itFile).name();
389 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
401 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
402 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
405 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
407 if(regEntry.fd==-1) {
409 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
413 if(
reopenFd(regEntry.fd,regEntry.name))
416 fdLog.insert(regEntry.fd);
◆ reportSubprocessStatuses()
void SharedEvtQueueConsumer::reportSubprocessStatuses |
( |
| ) |
|
|
overridevirtual |
◆ setMaxEvt()
virtual void AthenaMPToolBase::setMaxEvt |
( |
int |
maxEvt | ) |
|
|
inlineoverridevirtualinherited |
◆ setMPRunStop()
|
inlineoverridevirtualinherited |
◆ setRandString()
void AthenaMPToolBase::setRandString |
( |
const std::string & |
randStr | ) |
|
|
overridevirtualinherited |
◆ subProcessLogs()
void SharedEvtQueueConsumer::subProcessLogs |
( |
std::vector< std::string > & |
filenames | ) |
|
|
overridevirtual |
◆ sysInitialize()
◆ sysStart()
Handle START transition.
We override this in order to make sure that conditions handle keys can cache a pointer to the conditions container.
◆ updateIoReg()
int AthenaMPToolBase::updateIoReg |
( |
const std::string & |
rundir | ) |
|
|
protectedinherited |
◆ updateVHKA()
◆ useFdsRegistry()
◆ waitForSignal()
void AthenaMPToolBase::waitForSignal |
( |
| ) |
|
|
protectedinherited |
Definition at line 429 of file AthenaMPToolBase.cxx.
431 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
439 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
441 sigsuspend (&oldmask);
442 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
◆ m_appMgr
◆ m_chronoStatSvc
ServiceHandle<IChronoStatSvc> SharedEvtQueueConsumer::m_chronoStatSvc |
|
private |
◆ m_dataShare
SmartIF<IDataShare> SharedEvtQueueConsumer::m_dataShare |
|
private |
◆ m_debug
bool SharedEvtQueueConsumer::m_debug |
|
private |
◆ m_detStore
◆ m_eventOrders
std::vector<int> SharedEvtQueueConsumer::m_eventOrders |
|
private |
◆ m_eventOrdersFile
std::string SharedEvtQueueConsumer::m_eventOrdersFile |
|
private |
◆ m_eventStat
◆ m_evtContext
IEvtSelector::Context* SharedEvtQueueConsumer::m_evtContext |
|
private |
◆ m_evtProcessor
◆ m_evtSeek
SmartIF<IEventSeek> SharedEvtQueueConsumer::m_evtSeek |
|
private |
◆ m_evtSelector
SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector |
|
protectedinherited |
◆ m_evtSelName
std::string AthenaMPToolBase::m_evtSelName |
|
protectedinherited |
◆ m_evtSelSeek
◆ m_evtShare
SmartIF<IEventShare> SharedEvtQueueConsumer::m_evtShare |
|
private |
◆ m_evtStore
◆ m_fdsRegistry
◆ m_fileMgr
◆ m_fileMgrLog
std::string AthenaMPToolBase::m_fileMgrLog |
|
protectedinherited |
◆ m_finQueue
std::queue<pid_t> SharedEvtQueueConsumer::m_finQueue |
|
private |
◆ m_ioMgr
◆ m_isPileup
Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"} |
|
protectedinherited |
◆ m_isRoundRobin
bool SharedEvtQueueConsumer::m_isRoundRobin |
|
private |
◆ m_masterPid
pid_t SharedEvtQueueConsumer::m_masterPid |
|
private |
◆ m_maxEvt
int AthenaMPToolBase::m_maxEvt |
|
protectedinherited |
◆ m_mpRunStop
◆ m_nEventsBeforeFork
int SharedEvtQueueConsumer::m_nEventsBeforeFork |
|
private |
◆ m_nprocs
int AthenaMPToolBase::m_nprocs |
|
protectedinherited |
◆ m_nSkipEvents
int SharedEvtQueueConsumer::m_nSkipEvents |
|
private |
◆ m_processGroup
◆ m_randStr
std::string AthenaMPToolBase::m_randStr |
|
protectedinherited |
◆ m_rankId
int SharedEvtQueueConsumer::m_rankId |
|
private |
◆ m_readEventOrders
bool SharedEvtQueueConsumer::m_readEventOrders |
|
private |
◆ m_sharedEventQueue
◆ m_sharedRankQueue
◆ m_subprocDirPrefix
std::string AthenaMPToolBase::m_subprocDirPrefix |
|
protectedinherited |
◆ m_subprocTopDir
std::string AthenaMPToolBase::m_subprocTopDir |
|
protectedinherited |
◆ m_useSharedReader
bool SharedEvtQueueConsumer::m_useSharedReader |
|
private |
◆ m_useSharedWriter
bool SharedEvtQueueConsumer::m_useSharedWriter |
|
private |
◆ m_varHandleArraysDeclared
◆ m_vhka
The documentation for this class was generated from the following files:
const std::vector< ProcessStatus > & getStatuses() const
path
python interpreter configuration --------------------------------------—
def mkdir(path, recursive=True)
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
SmartIF< IEventSeek > m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
const std::vector< Process > & getChildren() const
StoreGateSvc_t m_evtStore
Pointer to StoreGate (event store by default)
std::vector< SG::VarHandleKeyArray * > m_vhka
bool try_receive_basic(T &)
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
virtual void setOwner(IDataHandleHolder *o)=0
#define COPY_FILE_HACK(_src, _dest)
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
msgSvc
Provide convenience handles for various services.
SmartIF< IDataShare > m_dataShare
std::vector< HWIdentifier >::iterator it1
::StatusCode StatusCode
StatusCode definition for legacy code.
std::vector< int > m_eventOrders
StoreGateSvc_t m_detStore
Pointer to StoreGate (detector store by default)
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual void renounce()=0
std::conditional< std::is_base_of< SG::VarHandleKeyArray, T >::value, VarHandleKeyArrayType, type2 >::type type
SmartIF< IEventShare > m_evtShare
AthenaInterprocess::SharedQueue * m_sharedEventQueue
virtual bool stopScheduled() const =0
def time(flags, cells_name, *args, **kw)
#define ATH_MSG_WARNING(x)
IEvtSelector::Context * m_evtContext
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
SG::VarHandleKey & vhKey()
Return a non-const reference to the HandleKey.
constexpr int pow(int base, int exp) noexcept
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Gaudi::Details::PropertyBase & declareGaudiProperty(Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
specialization for handling Gaudi::Property<SG::VarHandleKey>
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Abstract interface for seeking for an event selector.
System::ProcessTime::TimeValueType TimeValType
std::string m_eventOrdersFile