![Logo](../../ATLAS-Logo-Square-Blue-RGB.png) |
ATLAS Offline Software
|
#include <SharedEvtQueueConsumer.h>
|
| SharedEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent) |
|
virtual | ~SharedEvtQueueConsumer () override |
|
virtual StatusCode | initialize () override |
|
virtual StatusCode | finalize () override |
|
virtual int makePool | ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override |
|
virtual StatusCode exec | ATLAS_NOT_THREAD_SAFE () override |
|
virtual StatusCode wait_once | ATLAS_NOT_THREAD_SAFE (pid_t &pid) override |
|
virtual void | reportSubprocessStatuses () override |
|
virtual void | subProcessLogs (std::vector< std::string > &) override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | exec_func () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > | fin_func () override |
|
virtual AthenaMP::AllWorkerOutputs_ptr | generateOutputReport () override |
|
virtual void | useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override |
|
virtual void | setRandString (const std::string &randStr) override |
|
virtual void | killChildren () override |
|
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWork > | bootstrap_func ()=0 |
|
ServiceHandle< StoreGateSvc > & | evtStore () |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | evtStore () const |
| The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
const ServiceHandle< StoreGateSvc > & | detStore () const |
| The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc . More...
|
|
virtual StatusCode | sysInitialize () override |
| Perform system initialization for an algorithm. More...
|
|
virtual StatusCode | sysStart () override |
| Handle START transition. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | inputHandles () const override |
| Return this algorithm's input handles. More...
|
|
virtual std::vector< Gaudi::DataHandle * > | outputHandles () const override |
| Return this algorithm's output handles. More...
|
|
Gaudi::Details::PropertyBase & | declareProperty (Gaudi::Property< T > &t) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKey &hndl, const std::string &doc, const SG::VarHandleKeyType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleBase &hndl, const std::string &doc, const SG::VarHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, SG::VarHandleKeyArray &hndArr, const std::string &doc, const SG::VarHandleKeyArrayType &) |
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc, const SG::NotHandleType &) |
| Declare a new Gaudi property. More...
|
|
Gaudi::Details::PropertyBase * | declareProperty (const std::string &name, T &property, const std::string &doc="none") |
| Declare a new Gaudi property. More...
|
|
void | updateVHKA (Gaudi::Details::PropertyBase &) |
|
MsgStream & | msg () const |
|
MsgStream & | msg (const MSG::Level lvl) const |
|
bool | msgLvl (const MSG::Level lvl) const |
|
virtual std::unique_ptr< ScheduledWork > | operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0 |
|
Definition at line 21 of file SharedEvtQueueConsumer.h.
◆ StoreGateSvc_t
◆ TimeValType
◆ ESRange_Status
Enumerator |
---|
ESRANGE_SUCCESS | |
ESRANGE_NOTFOUND | |
ESRANGE_SEEKFAILED | |
ESRANGE_PROCFAILED | |
ESRANGE_FILENOTMADE | |
ESRANGE_BADINPFILE | |
Definition at line 56 of file AthenaMPToolBase.h.
◆ Func_Flag
◆ SharedEvtQueueConsumer() [1/3]
SharedEvtQueueConsumer::SharedEvtQueueConsumer |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~SharedEvtQueueConsumer()
SharedEvtQueueConsumer::~SharedEvtQueueConsumer |
( |
| ) |
|
|
overridevirtual |
◆ SharedEvtQueueConsumer() [2/3]
SharedEvtQueueConsumer::SharedEvtQueueConsumer |
( |
| ) |
|
|
private |
◆ SharedEvtQueueConsumer() [3/3]
◆ ATLAS_NOT_THREAD_SAFE() [1/5]
virtual StatusCode exec SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
| ) |
|
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [2/5]
◆ ATLAS_NOT_THREAD_SAFE() [3/5]
int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE |
( |
Func_Flag |
flag, |
|
|
pid_t |
pid = 0 |
|
) |
| |
|
protectedinherited |
◆ ATLAS_NOT_THREAD_SAFE() [4/5]
virtual int makePool SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
int |
maxevt, |
|
|
int |
nprocs, |
|
|
const std::string & |
topdir |
|
) |
| |
|
overridevirtual |
◆ ATLAS_NOT_THREAD_SAFE() [5/5]
virtual StatusCode wait_once SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE |
( |
pid_t & |
pid | ) |
|
|
overridevirtual |
◆ bootstrap_func() [1/2]
Definition at line 285 of file SharedEvtQueueConsumer.cxx.
290 outwork->data =
malloc(
sizeof(
int));
291 *(
int*)(outwork->data) = 1;
292 outwork->size =
sizeof(
int);
296 std::map<IService*,int> bkgEvtSelectors;
299 for(IService* ptrSvc : serviceLocator()->getServices()) {
300 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
308 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
313 bkgEvtSelectors.emplace(ptrSvc,0);
326 IIncidentSvc* p_incidentSvc(0);
327 if(!serviceLocator()->service(
"IncidentSvc", p_incidentSvc).isSuccess()) {
331 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
340 std::ostringstream workindex;
348 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
358 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
365 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
369 if(std::filesystem::is_regular_file(
"SimParams.db"))
370 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
371 if(std::filesystem::is_regular_file(
"DigitParams.db"))
372 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
373 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
374 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
384 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
390 ATH_MSG_ERROR(
"Failed to make the event selector a share client");
394 ATH_MSG_DEBUG(
"Successfully made the event selector a share client");
399 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_dataShare);
400 if (propertyServer==0 || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
401 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
405 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
410 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
420 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
421 if(!propertyServer) {
426 std::string propertyName(
"SkipEvents");
428 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
438 IService* evtSelSvc =
dynamic_cast<IService*
>(
m_evtSelector);
440 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
443 if(!evtSelSvc->start().isSuccess()) {
457 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
468 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
469 if(evtsel->start().isSuccess()) {
473 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
479 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
494 if(
line.empty())
continue;
498 int rank = std::stoi(
line,&
idx);
500 msg(MSG::INFO) <<
"This worker will proces the following events #";
503 int evtnum = std::stoi(
line,&
idx);
505 msg(MSG::INFO) <<
" " << evtnum;
523 if(chdir(worker_rundir.string().c_str())==-1) {
524 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
532 *(
int*)(outwork->data) = 0;
◆ bootstrap_func() [2/2]
◆ declareGaudiProperty() [1/4]
specialization for handling Gaudi::Property<SG::VarHandleKeyArray>
Definition at line 170 of file AthCommonDataStore.h.
175 hndl.documentation());
◆ declareGaudiProperty() [2/4]
specialization for handling Gaudi::Property<SG::VarHandleKey>
Definition at line 156 of file AthCommonDataStore.h.
161 hndl.documentation());
◆ declareGaudiProperty() [3/4]
specialization for handling Gaudi::Property<SG::VarHandleBase>
Definition at line 184 of file AthCommonDataStore.h.
189 hndl.documentation());
◆ declareGaudiProperty() [4/4]
◆ declareProperty() [1/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleBase
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 245 of file AthCommonDataStore.h.
250 this->declare(hndl.
vhKey());
251 hndl.
vhKey().setOwner(
this);
◆ declareProperty() [2/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
hndl | Object holding the property value. |
doc | Documentation string for the property. |
This is the version for types that derive from SG::VarHandleKey
. The property value object is put on the input and output lists as appropriate; then we forward to the base class.
Definition at line 221 of file AthCommonDataStore.h.
◆ declareProperty() [3/6]
◆ declareProperty() [4/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This is the generic version, for types that do not derive from SG::VarHandleKey
. It just forwards to the base class version of declareProperty
.
Definition at line 333 of file AthCommonDataStore.h.
◆ declareProperty() [5/6]
Declare a new Gaudi property.
- Parameters
-
name | Name of the property. |
property | Object holding the property value. |
doc | Documentation string for the property. |
This dispatches to either the generic declareProperty
or the one for VarHandle/Key/KeyArray.
Definition at line 352 of file AthCommonDataStore.h.
◆ declareProperty() [6/6]
◆ detStore()
◆ evtSelector()
IEvtSelector* AthenaMPToolBase::evtSelector |
( |
| ) |
|
|
inlineprotectedinherited |
◆ evtStore() [1/2]
◆ evtStore() [2/2]
◆ exec_func()
Implements AthenaMPToolBase.
Definition at line 536 of file SharedEvtQueueConsumer.cxx.
538 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
542 long intmask =
pow(0x100,
sizeof(
int))-1;
544 int nEventsProcessed(0);
545 long evtnumAndChunk(0);
547 unsigned evtCounter(0);
561 while(evtnumAndChunk>0) {
566 evtnumAndChunk *= -1;
569 System::ProcessTime time_start = System::getProcessTime();
573 bool firstOrder(
true);
585 evtnum = *predefinedEvt;
587 fs << (firstOrder?
":":
",") << evtnum;
590 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
599 if(evtnumAndChunk<=0) {
600 evtnumAndChunk *= -1;
601 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
604 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
606 evtnum = evtnumAndChunk & intmask;
611 fs << (firstOrder?
":":
",") << evtnum+
i;
665 System::ProcessTime time_delta = System::getProcessTime() - time_start;
666 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
692 *(
int*)(
outdata) = (all_ok?0:1);
694 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
695 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
696 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
698 outwork->size = outsize;
◆ extraDeps_update_handler()
Add StoreName to extra input/output deps as needed.
use the logic of the VarHandleKey to parse the DataObjID keys supplied via the ExtraInputs and ExtraOuputs Properties to add the StoreName if it's not explicitly given
◆ fin_func()
Implements AthenaMPToolBase.
Definition at line 706 of file SharedEvtQueueConsumer.cxx.
708 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
717 if(
m_appMgr->finalize().isFailure()) {
718 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
728 *(
int*)(
outdata) = (all_ok?0:1);
730 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
732 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
734 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
737 outwork->size = outsize;
◆ finalize()
StatusCode SharedEvtQueueConsumer::finalize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 121 of file SharedEvtQueueConsumer.cxx.
131 srand((
unsigned)
time(0));
132 std::ostringstream randname;
134 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
139 std::filesystem::rename(ordersFile,ordersFileBakpath);
145 std::ostringstream workerIndex;
149 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
150 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
151 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
153 while(fs_worker.good()) {
154 std::getline(fs_worker,
line);
168 return StatusCode::SUCCESS;
◆ fmterror()
std::string AthenaMPToolBase::fmterror |
( |
int |
errnum | ) |
|
|
protectedinherited |
Definition at line 362 of file AthenaMPToolBase.cxx.
365 strerror_r(errnum, buf,
sizeof(buf));
366 return std::string(buf);
◆ generateOutputReport()
Implements IAthenaMPTool.
Reimplemented in EvtRangeProcessor, EvtRangeScatterer, SharedEvtQueueProvider, and SharedWriterTool.
Definition at line 128 of file AthenaMPToolBase.cxx.
133 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
140 std::ostringstream workindex;
153 std::ifstream inpStream(
logFile.string().c_str());
154 std::set<std::string> reportedFiles;
155 while(!inpStream.eof()) {
156 std::getline(inpStream,
line);
157 if(
line.find(
"WRITE")!=std::string::npos) {
160 std::vector<std::string>
entries;
161 while(startpos<
line.size()) {
162 while(
line[startpos]==
' ')
165 size_t endpos =
line.find(
' ',startpos);
166 if(endpos==std::string::npos) endpos =
line.size();
167 entries.push_back(
line.substr(startpos,endpos-startpos));
174 if(reportedFiles.find(
basename.string())==reportedFiles.end())
175 reportedFiles.insert(
basename.string());
180 if(
it1==jobOutputs->end()) {
186 newOutput.
filename = absolutename.string();
190 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
192 (*jobOutputs)[
basename.string()].push_back(newOutput);
◆ handleSavedPfc()
int AthenaMPToolBase::handleSavedPfc |
( |
const std::filesystem::path & |
dest_path | ) |
|
|
protectedinherited |
Definition at line 425 of file AthenaMPToolBase.cxx.
427 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
428 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
◆ initialize()
StatusCode SharedEvtQueueConsumer::initialize |
( |
| ) |
|
|
overridevirtual |
Reimplemented from AthenaMPToolBase.
Definition at line 73 of file SharedEvtQueueConsumer.cxx.
84 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85 return StatusCode::FAILURE;
99 return StatusCode::FAILURE;
105 IConversionSvc* cnvSvc{
nullptr};
106 sc = serviceLocator()->service(
"AthenaPoolCnvSvc",cnvSvc);
110 ATH_MSG_ERROR(
"Error retrieving AthenaPoolCnvSvc " << cnvSvc);
111 return StatusCode::FAILURE;
118 return StatusCode::SUCCESS;
◆ inputHandles()
Return this algorithm's input handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ interfaceID()
static const InterfaceID& IAthenaMPTool::interfaceID |
( |
| ) |
|
|
inlinestaticinherited |
◆ killChildren()
void AthenaMPToolBase::killChildren |
( |
| ) |
|
|
overridevirtualinherited |
◆ msg() [1/2]
◆ msg() [2/2]
◆ msgLvl()
◆ operator()
virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator |
( |
| ) |
const & |
|
pure virtualinherited |
◆ operator=()
◆ outputHandles()
Return this algorithm's output handles.
We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.
◆ redirectLog()
int AthenaMPToolBase::redirectLog |
( |
const std::string & |
rundir, |
|
|
bool |
addTimeStamp = true |
|
) |
| |
|
protectedinherited |
Definition at line 278 of file AthenaMPToolBase.cxx.
281 int dup2result1(0), dup2result2(0);
283 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
288 dup2result1 = dup2(newout, STDOUT_FILENO);
289 dup2result2 = dup2(newout, STDERR_FILENO);
290 TEMP_FAILURE_RETRY(close(newout));
291 if(dup2result1==-1) {
295 if(dup2result2==-1) {
301 IMessageSvc* messageSvc(0);
302 StatusCode sc = serviceLocator()->service(
"MessageSvc",messageSvc);
308 IProperty* propertyServer =
dynamic_cast<IProperty*
>(messageSvc);
309 if(propertyServer==0) {
314 std::string propertyName(
"Format");
315 std::string oldFormat(
"");
316 StringProperty formatProp(propertyName,oldFormat);
317 sc = propertyServer->getProperty(&formatProp);
322 oldFormat = formatProp.value();
323 if(oldFormat.find(
"%t")==std::string::npos) {
325 std::string newFormat(
"%t " + oldFormat);
326 StringProperty newFormatProp(propertyName,newFormat);
327 if(propertyServer->setProperty(newFormatProp).isFailure()) {
328 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
333 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
◆ renounce()
◆ renounceArray()
◆ reopenFd()
int AthenaMPToolBase::reopenFd |
( |
int |
fd, |
|
|
const std::string & |
name |
|
) |
| |
|
privateinherited |
Definition at line 448 of file AthenaMPToolBase.cxx.
451 int old_openflags = fcntl(
fd,F_GETFL,0);
452 switch(old_openflags & O_ACCMODE) {
467 int old_descflags = fcntl(
fd,F_GETFD,0);
468 off_t oldpos = lseek(
fd,0,SEEK_CUR);
484 if(lseek(newfd,oldpos,SEEK_SET)==-1){
486 TEMP_FAILURE_RETRY(close(newfd));
489 TEMP_FAILURE_RETRY(close(
fd));
490 if(dup2(newfd,
fd)==-1) {
491 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
492 TEMP_FAILURE_RETRY(close(newfd));
495 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
496 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
497 TEMP_FAILURE_RETRY(close(newfd));
500 TEMP_FAILURE_RETRY(close(newfd));
◆ reopenFds()
int AthenaMPToolBase::reopenFds |
( |
| ) |
|
|
protectedinherited |
Definition at line 369 of file AthenaMPToolBase.cxx.
377 std::vector<const Io::FileAttr*> filemgrFiles;
378 std::vector<const Io::FileAttr*>::const_iterator itFile;
379 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
380 if(filenum!=filemgrFiles.size())
381 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
383 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
385 const std::string&
filename = (**itFile).name();
392 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
404 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
405 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
408 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
410 if(regEntry.fd==-1) {
412 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
416 if(
reopenFd(regEntry.fd,regEntry.name))
419 fdLog.insert(regEntry.fd);
◆ reportSubprocessStatuses()
void SharedEvtQueueConsumer::reportSubprocessStatuses |
( |
| ) |
|
|
overridevirtual |
◆ setRandString()
void AthenaMPToolBase::setRandString |
( |
const std::string & |
randStr | ) |
|
|
overridevirtualinherited |
◆ subProcessLogs()
void SharedEvtQueueConsumer::subProcessLogs |
( |
std::vector< std::string > & |
filenames | ) |
|
|
overridevirtual |
◆ sysInitialize()
◆ sysStart()
Handle START transition.
We override this in order to make sure that conditions handle keys can cache a pointer to the conditions container.
◆ updateIoReg()
int AthenaMPToolBase::updateIoReg |
( |
const std::string & |
rundir | ) |
|
|
protectedinherited |
◆ updateVHKA()
◆ useFdsRegistry()
◆ waitForSignal()
void AthenaMPToolBase::waitForSignal |
( |
| ) |
|
|
protectedinherited |
Definition at line 432 of file AthenaMPToolBase.cxx.
434 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
442 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
444 sigsuspend (&oldmask);
445 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
◆ m_appMgr
◆ m_chronoStatSvc
ServiceHandle<IChronoStatSvc> SharedEvtQueueConsumer::m_chronoStatSvc |
|
private |
◆ m_dataShare
◆ m_debug
bool SharedEvtQueueConsumer::m_debug |
|
private |
◆ m_detStore
◆ m_eventOrders
std::vector<int> SharedEvtQueueConsumer::m_eventOrders |
|
private |
◆ m_eventOrdersFile
std::string SharedEvtQueueConsumer::m_eventOrdersFile |
|
private |
◆ m_eventStat
◆ m_evtContext
IEvtSelector::Context* SharedEvtQueueConsumer::m_evtContext |
|
private |
◆ m_evtProcessor
◆ m_evtSeek
◆ m_evtSelector
IEvtSelector* AthenaMPToolBase::m_evtSelector |
|
protectedinherited |
◆ m_evtSelName
std::string AthenaMPToolBase::m_evtSelName |
|
protectedinherited |
◆ m_evtSelSeek
◆ m_evtShare
◆ m_evtStore
◆ m_fdsRegistry
◆ m_fileMgr
◆ m_fileMgrLog
std::string AthenaMPToolBase::m_fileMgrLog |
|
protectedinherited |
◆ m_finQueue
std::queue<pid_t> SharedEvtQueueConsumer::m_finQueue |
|
private |
◆ m_ioMgr
◆ m_isPileup
Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"} |
|
protectedinherited |
◆ m_isRoundRobin
bool SharedEvtQueueConsumer::m_isRoundRobin |
|
private |
◆ m_masterPid
pid_t SharedEvtQueueConsumer::m_masterPid |
|
private |
◆ m_nEventsBeforeFork
int SharedEvtQueueConsumer::m_nEventsBeforeFork |
|
private |
◆ m_nprocs
int AthenaMPToolBase::m_nprocs |
|
protectedinherited |
◆ m_nSkipEvents
int SharedEvtQueueConsumer::m_nSkipEvents |
|
private |
◆ m_processGroup
◆ m_randStr
std::string AthenaMPToolBase::m_randStr |
|
protectedinherited |
◆ m_rankId
int SharedEvtQueueConsumer::m_rankId |
|
private |
◆ m_readEventOrders
bool SharedEvtQueueConsumer::m_readEventOrders |
|
private |
◆ m_sharedEventQueue
◆ m_sharedRankQueue
◆ m_subprocDirPrefix
std::string AthenaMPToolBase::m_subprocDirPrefix |
|
protectedinherited |
◆ m_subprocTopDir
std::string AthenaMPToolBase::m_subprocTopDir |
|
protectedinherited |
◆ m_useSharedReader
bool SharedEvtQueueConsumer::m_useSharedReader |
|
private |
◆ m_useSharedWriter
bool SharedEvtQueueConsumer::m_useSharedWriter |
|
private |
◆ m_varHandleArraysDeclared
◆ m_vhka
The documentation for this class was generated from the following files:
virtual StatusCode makeClient(int num)=0
Make this a client.
const std::vector< ProcessStatus > & getStatuses() const
path
python interpreter configuration --------------------------------------—
def mkdir(path, recursive=True)
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
const std::vector< Process > & getChildren() const
StoreGateSvc_t m_evtStore
Pointer to StoreGate (event store by default)
std::vector< SG::VarHandleKeyArray * > m_vhka
bool try_receive_basic(T &)
virtual StatusCode seek(int evtnum)=0
Seek to a given event number.
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
virtual void setOwner(IDataHandleHolder *o)=0
#define COPY_FILE_HACK(_src, _dest)
Abstract interface for sharing data.
virtual StatusCode share(int evtnum)=0
Request to share a given event.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
std::vector< HWIdentifier >::iterator it1
::StatusCode StatusCode
StatusCode definition for legacy code.
std::vector< int > m_eventOrders
StoreGateSvc_t m_detStore
Pointer to StoreGate (detector store by default)
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual void renounce()=0
std::conditional< std::is_base_of< SG::VarHandleKeyArray, T >::value, VarHandleKeyArrayType, type2 >::type type
AthenaInterprocess::SharedQueue * m_sharedEventQueue
IEvtSelectorSeek * m_evtSelSeek
def time(flags, cells_name, *args, **kw)
#define ATH_MSG_WARNING(x)
IEvtSelector::Context * m_evtContext
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
SG::VarHandleKey & vhKey()
Return a non-const reference to the HandleKey.
Abstract interface for seeking within an event stream.
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Gaudi::Details::PropertyBase & declareGaudiProperty(Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
specialization for handling Gaudi::Property<SG::VarHandleKey>
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Abstract interface for seeking for an event selector.
System::ProcessTime::TimeValueType TimeValType
std::string m_eventOrdersFile