Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Public Member Functions | Static Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
EvtRangeScatterer Class Referencefinalabstract

#include <EvtRangeScatterer.h>

Inheritance diagram for EvtRangeScatterer:
Collaboration diagram for EvtRangeScatterer:

Public Member Functions

 EvtRangeScatterer (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~EvtRangeScatterer () override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
ServiceHandle< StoreGateSvc > & evtStore ()
 The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc. More...
 
const ServiceHandle< StoreGateSvc > & evtStore () const
 The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc. More...
 
const ServiceHandle< StoreGateSvc > & detStore () const
 The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc. More...
 
virtual StatusCode sysInitialize () override
 Perform system initialization for an algorithm. More...
 
virtual StatusCode sysStart () override
 Handle START transition. More...
 
virtual std::vector< Gaudi::DataHandle * > inputHandles () const override
 Return this algorithm's input handles. More...
 
virtual std::vector< Gaudi::DataHandle * > outputHandles () const override
 Return this algorithm's output handles. More...
 
Gaudi::Details::PropertyBase & declareProperty (Gaudi::Property< T > &t)
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, SG::VarHandleKey &hndl, const std::string &doc, const SG::VarHandleKeyType &)
 Declare a new Gaudi property. More...
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, SG::VarHandleBase &hndl, const std::string &doc, const SG::VarHandleType &)
 Declare a new Gaudi property. More...
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, SG::VarHandleKeyArray &hndArr, const std::string &doc, const SG::VarHandleKeyArrayType &)
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, T &property, const std::string &doc, const SG::NotHandleType &)
 Declare a new Gaudi property. More...
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, T &property, const std::string &doc="none")
 Declare a new Gaudi property. More...
 
void updateVHKA (Gaudi::Details::PropertyBase &)
 
MsgStream & msg () const
 
MsgStream & msg (const MSG::Level lvl) const
 
bool msgLvl (const MSG::Level lvl) const
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Static Public Member Functions

static const InterfaceID & interfaceID ()
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 
void renounceArray (SG::VarHandleKeyArray &handlesArray)
 remove all handles from I/O resolution More...
 
std::enable_if_t< std::is_void_v< std::result_of_t< decltype(&T::renounce)(T)> > &&!std::is_base_of_v< SG::VarHandleKeyArray, T > &&std::is_base_of_v< Gaudi::DataHandle, T >, void > renounce (T &h)
 
void extraDeps_update_handler (Gaudi::Details::PropertyBase &ExtraDeps)
 Add StoreName to extra input/output deps as needed. More...
 

Protected Attributes

int m_nprocs
 
int m_maxEvt
 
std::string m_subprocTopDir
 
std::string m_subprocDirPrefix
 
std::string m_evtSelName
 
AthenaInterprocess::ProcessGroupm_processGroup
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Types

typedef ServiceHandle< StoreGateSvcStoreGateSvc_t
 

Private Member Functions

 EvtRangeScatterer ()
 
 EvtRangeScatterer (const EvtRangeScatterer &)
 
EvtRangeScattereroperator= (const EvtRangeScatterer &)
 
void trimRangeStrings (std::string &)
 
std::string getNewRangeRequest (yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
 
pid_t pollFailedPidQueue (AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
 
int reopenFd (int fd, const std::string &name)
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
 specialization for handling Gaudi::Property<SG::VarHandleKey> More...
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &hndl, const SG::VarHandleKeyArrayType &)
 specialization for handling Gaudi::Property<SG::VarHandleKeyArray> More...
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &hndl, const SG::VarHandleType &)
 specialization for handling Gaudi::Property<SG::VarHandleBase> More...
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &t, const SG::NotHandleType &)
 specialization for handling everything that's not a Gaudi::Property<SG::VarHandleKey> or a <SG::VarHandleKeyArray> More...
 

Private Attributes

StringProperty m_processorChannel
 
StringProperty m_eventRangeChannel
 
bool m_doCaching
 
Pid2RangeID m_pid2RangeID
 
StoreGateSvc_t m_evtStore
 Pointer to StoreGate (event store by default) More...
 
StoreGateSvc_t m_detStore
 Pointer to StoreGate (detector store by default) More...
 
std::vector< SG::VarHandleKeyArray * > m_vhka
 
bool m_varHandleArraysDeclared
 

Detailed Description

Definition at line 18 of file EvtRangeScatterer.h.

Member Typedef Documentation

◆ StoreGateSvc_t

typedef ServiceHandle<StoreGateSvc> AthCommonDataStore< AthCommonMsg< AlgTool > >::StoreGateSvc_t
privateinherited

Definition at line 388 of file AthCommonDataStore.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 59 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 68 of file AthenaMPToolBase.h.

68  {
70  , FUNC_EXEC
71  , FUNC_FIN
72  };

Constructor & Destructor Documentation

◆ EvtRangeScatterer() [1/3]

EvtRangeScatterer::EvtRangeScatterer ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 26 of file EvtRangeScatterer.cxx.

30  , m_processorChannel("")
32  , m_doCaching(false)
33 {
34  m_subprocDirPrefix = "range_scatterer";
35  declareProperty("ProcessorChannel", m_processorChannel);
36  declareProperty("EventRangeChannel", m_eventRangeChannel);
37  declareProperty("DoCaching",m_doCaching);
38 }

◆ ~EvtRangeScatterer()

EvtRangeScatterer::~EvtRangeScatterer ( )
overridevirtual

Definition at line 40 of file EvtRangeScatterer.cxx.

41 {
42 }

◆ EvtRangeScatterer() [2/3]

EvtRangeScatterer::EvtRangeScatterer ( )
private

◆ EvtRangeScatterer() [3/3]

EvtRangeScatterer::EvtRangeScatterer ( const EvtRangeScatterer )
private

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/4]

virtual StatusCode exec EvtRangeScatterer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

Implements IAthenaMPTool.

◆ ATLAS_NOT_THREAD_SAFE() [2/4]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [3/4]

virtual int makePool EvtRangeScatterer::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

Implements IAthenaMPTool.

◆ ATLAS_NOT_THREAD_SAFE() [4/4]

virtual StatusCode wait_once AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtualinherited

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::bootstrap_func ( )
overridevirtual

Definition at line 114 of file EvtRangeScatterer.cxx.

115 {
116  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
117  outwork->data = malloc(sizeof(int));
118  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
119  outwork->size = sizeof(int);
120 
121  // ...
122  // (possible) TODO: extend outwork with some error message, which will be eventually
123  // reported in the master proces
124  // ...
125 
126  // Reader dir: mkdir
127  std::filesystem::path reader_rundir(m_subprocTopDir);
128  reader_rundir /= std::filesystem::path(m_subprocDirPrefix);
129 
130  if(mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
131  ATH_MSG_ERROR("Unable to make reader run directory: " << reader_rundir.string() << ". " << fmterror(errno));
132  return outwork;
133  }
134 
135  // Redirect logs
136  if(redirectLog(reader_rundir.string()))
137  return outwork;
138 
139  ATH_MSG_INFO("Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
140 
141  // Update Io Registry
142  if(updateIoReg(reader_rundir.string()))
143  return outwork;
144 
145  ATH_MSG_INFO("Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
146 
147  // _______________________ Handle saved PFC (if any) ______________________
148  std::filesystem::path abs_reader_rundir = std::filesystem::absolute(reader_rundir);
149  if(handleSavedPfc(abs_reader_rundir))
150  return outwork;
151 
152  // Reopen file descriptors
153  if(reopenFds())
154  return outwork;
155 
156  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
157 
158  // ________________________ I/O reinit ________________________
159  if(!m_ioMgr->io_reinitialize().isSuccess()) {
160  ATH_MSG_ERROR("Failed to reinitialize I/O");
161  return outwork;
162  } else {
163  ATH_MSG_DEBUG("Successfully reinitialized I/O");
164  }
165 
166  // Start the event selector
167  SmartIF<IService> evtSelSvc(m_evtSelector);
168  if(!evtSelSvc) {
169  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
170  return outwork;
171  }
172  if(!evtSelSvc->start().isSuccess()) {
173  ATH_MSG_ERROR("Failed to restart the event selector");
174  return outwork;
175  } else {
176  ATH_MSG_DEBUG("Successfully restarted the event selector");
177  }
178 
179  // Reader dir: chdir
180  if(chdir(reader_rundir.string().c_str())==-1) {
181  ATH_MSG_ERROR("Failed to chdir to " << reader_rundir.string());
182  return outwork;
183  }
184 
185  // Declare success and return
186  *(int*)(outwork->data) = 0;
187  return outwork;
188 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ declareGaudiProperty() [1/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  hndl,
const SG::VarHandleKeyArrayType  
)
inlineprivateinherited

specialization for handling Gaudi::Property<SG::VarHandleKeyArray>

Definition at line 170 of file AthCommonDataStore.h.

172  {
173  return *AthCommonDataStore<PBASE>::declareProperty(hndl.name(),
174  hndl.value(),
175  hndl.documentation());
176 
177  }

◆ declareGaudiProperty() [2/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  hndl,
const SG::VarHandleKeyType  
)
inlineprivateinherited

specialization for handling Gaudi::Property<SG::VarHandleKey>

Definition at line 156 of file AthCommonDataStore.h.

158  {
159  return *AthCommonDataStore<PBASE>::declareProperty(hndl.name(),
160  hndl.value(),
161  hndl.documentation());
162 
163  }

◆ declareGaudiProperty() [3/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  hndl,
const SG::VarHandleType  
)
inlineprivateinherited

specialization for handling Gaudi::Property<SG::VarHandleBase>

Definition at line 184 of file AthCommonDataStore.h.

186  {
187  return *AthCommonDataStore<PBASE>::declareProperty(hndl.name(),
188  hndl.value(),
189  hndl.documentation());
190  }

◆ declareGaudiProperty() [4/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  t,
const SG::NotHandleType  
)
inlineprivateinherited

specialization for handling everything that's not a Gaudi::Property<SG::VarHandleKey> or a <SG::VarHandleKeyArray>

Definition at line 199 of file AthCommonDataStore.h.

200  {
201  return PBASE::declareProperty(t);
202  }

◆ declareProperty() [1/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
SG::VarHandleBase hndl,
const std::string &  doc,
const SG::VarHandleType  
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
hndlObject holding the property value.
docDocumentation string for the property.

This is the version for types that derive from SG::VarHandleBase. The property value object is put on the input and output lists as appropriate; then we forward to the base class.

Definition at line 245 of file AthCommonDataStore.h.

249  {
250  this->declare(hndl.vhKey());
251  hndl.vhKey().setOwner(this);
252 
253  return PBASE::declareProperty(name,hndl,doc);
254  }

◆ declareProperty() [2/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
SG::VarHandleKey hndl,
const std::string &  doc,
const SG::VarHandleKeyType  
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
hndlObject holding the property value.
docDocumentation string for the property.

This is the version for types that derive from SG::VarHandleKey. The property value object is put on the input and output lists as appropriate; then we forward to the base class.

Definition at line 221 of file AthCommonDataStore.h.

225  {
226  this->declare(hndl);
227  hndl.setOwner(this);
228 
229  return PBASE::declareProperty(name,hndl,doc);
230  }

◆ declareProperty() [3/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
SG::VarHandleKeyArray hndArr,
const std::string &  doc,
const SG::VarHandleKeyArrayType  
)
inlineinherited

Definition at line 259 of file AthCommonDataStore.h.

263  {
264 
265  // std::ostringstream ost;
266  // ost << Algorithm::name() << " VHKA declareProp: " << name
267  // << " size: " << hndArr.keys().size()
268  // << " mode: " << hndArr.mode()
269  // << " vhka size: " << m_vhka.size()
270  // << "\n";
271  // debug() << ost.str() << endmsg;
272 
273  hndArr.setOwner(this);
274  m_vhka.push_back(&hndArr);
275 
276  Gaudi::Details::PropertyBase* p = PBASE::declareProperty(name, hndArr, doc);
277  if (p != 0) {
278  p->declareUpdateHandler(&AthCommonDataStore<PBASE>::updateVHKA, this);
279  } else {
280  ATH_MSG_ERROR("unable to call declareProperty on VarHandleKeyArray "
281  << name);
282  }
283 
284  return p;
285 
286  }

◆ declareProperty() [4/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
T &  property,
const std::string &  doc,
const SG::NotHandleType  
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
propertyObject holding the property value.
docDocumentation string for the property.

This is the generic version, for types that do not derive from SG::VarHandleKey. It just forwards to the base class version of declareProperty.

Definition at line 333 of file AthCommonDataStore.h.

337  {
338  return PBASE::declareProperty(name, property, doc);
339  }

◆ declareProperty() [5/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
T &  property,
const std::string &  doc = "none" 
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
propertyObject holding the property value.
docDocumentation string for the property.

This dispatches to either the generic declareProperty or the one for VarHandle/Key/KeyArray.

Definition at line 352 of file AthCommonDataStore.h.

355  {
356  typedef typename SG::HandleClassifier<T>::type htype;
357  return declareProperty (name, property, doc, htype());
358  }

◆ declareProperty() [6/6]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( Gaudi::Property< T > &  t)
inlineinherited

Definition at line 145 of file AthCommonDataStore.h.

145  {
146  typedef typename SG::HandleClassifier<T>::type htype;
148  }

◆ detStore()

const ServiceHandle<StoreGateSvc>& AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore ( ) const
inlineinherited

The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.

Definition at line 95 of file AthCommonDataStore.h.

95 { return m_detStore; }

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 84 of file AthenaMPToolBase.h.

84 { return m_evtSelector; }

◆ evtStore() [1/2]

ServiceHandle<StoreGateSvc>& AthCommonDataStore< AthCommonMsg< AlgTool > >::evtStore ( )
inlineinherited

The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc.

Definition at line 85 of file AthCommonDataStore.h.

85 { return m_evtStore; }

◆ evtStore() [2/2]

const ServiceHandle<StoreGateSvc>& AthCommonDataStore< AthCommonMsg< AlgTool > >::evtStore ( ) const
inlineinherited

The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc.

Definition at line 90 of file AthCommonDataStore.h.

90 { return m_evtStore; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 190 of file EvtRangeScatterer.cxx.

191 {
192  ATH_MSG_INFO("Exec function in the AthenaMP Token Scatterer PID=" << getpid());
193 
194  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
195  // Create a socket to communicate with the Pilot
196  yampl::ISocket* socket2Pilot = socketFactory->createClientSocket(yampl::Channel(m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA);
197  ATH_MSG_INFO("Created CLIENT socket for communicating Event Ranges with the Pilot");
198  // Create a socket to communicate with EvtRangeProcessors
199  std::string socket2ProcessorName = m_processorChannel.value() + std::string("_") + m_randStr;
200  yampl::ISocket* socket2Processor = socketFactory->createServerSocket(yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
201  ATH_MSG_INFO("Created SERVER socket to token processors: " << socket2ProcessorName);
202 
203  bool all_ok=true;
204  int procReportPending(0); // Keep track of how many output files are yet to be reported by Token Processors
205 
206  AthenaInterprocess::SharedQueue* sharedFailedPidQueue(0);
207  if(detStore()->retrieve(sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
208  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
209  all_ok=false;
210  }
211 
212  // Communication protocol with the Pilot
213  std::string strReady("Ready for events");
214  std::string strStopProcessing("No more events");
215  std::string processorWaitRequest("");
216  int workerPid{-1};
217 
218  ATH_MSG_INFO("Starting main loop");
219 
220  while(all_ok) {
221  // NO CACHING MODE: first get a request from one of the processors and only after that request the next event range from the pilot
222  if(!m_doCaching && processorWaitRequest.empty()) {
223  ATH_MSG_DEBUG("Waiting for event range request from one of the processors ... ");
224  while(processorWaitRequest.empty()) {
225  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
226  pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
227  usleep(1000);
228  }
229  ATH_MSG_INFO("One of the processors is ready for the next range");
230  // Get PID from the request and Update m_pid2RangeID
231  workerPid = std::atoi(processorWaitRequest.c_str());
232  auto it = m_pid2RangeID.find(workerPid);
233  if(it!=m_pid2RangeID.end()) {
234  m_pid2RangeID.erase(it);
235  }
236  }
237 
238  // Signal the Pilot that AthenaMP is ready for event processing
239  void* ready_message = malloc(strReady.size());
240  memcpy(ready_message,strReady.data(),strReady.size());
241  socket2Pilot->send(ready_message,strReady.size());
242  void* eventRangeMessage;
243  std::string strPeerId;
244  ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
245  std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
246  size_t carRet = eventRange.find('\n');
247  if(carRet!=std::string::npos)
248  eventRange.resize(carRet);
249 
250  // Break the loop if no more ranges are expected
251  if(eventRange.find(strStopProcessing)!=std::string::npos) {
252  ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
253  break;
254  }
255  ATH_MSG_INFO("Got Event Range from the pilot: " << eventRange);
256 
257  // Parse the Event Range string
258  // Expected the following format: [{KEY:VALUE[,KEY:VALUE]}]
259  // First get rid of the leading '[{' and the trailing '}]'
260  if(eventRange.starts_with( "[{")) eventRange=eventRange.substr(2);
261  if(eventRange.ends_with("}]")) eventRange.resize(eventRange.size()-2);
262 
263  std::map<std::string,std::string> eventRangeMap;
264  size_t startpos(0);
265  size_t endpos = eventRange.find(',');
266  while(endpos!=std::string::npos) {
267  // Get the Key-Value pair
268  std::string keyValue(eventRange.substr(startpos,endpos-startpos));
269  size_t colonPos = keyValue.find(':');
270  std::string strKey = keyValue.substr(0,colonPos);
271  std::string strVal = keyValue.substr(colonPos+1);
272  trimRangeStrings(strKey);
273  trimRangeStrings(strVal);
274  eventRangeMap[strKey]=strVal;
275  // Next iteration
276  startpos = endpos+1;
277  endpos = eventRange.find(',',startpos);
278  }
279  // Get the final Key-Value pair
280  std::string keyValue(eventRange.substr(startpos));
281  size_t colonPos = keyValue.find(':');
282  std::string strKey = keyValue.substr(0,colonPos);
283  std::string strVal = keyValue.substr(colonPos+1);
284  trimRangeStrings(strKey);
285  trimRangeStrings(strVal);
286  eventRangeMap[strKey]=strVal;
287 
288  if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
289  || eventRangeMap.find("startEvent")==eventRangeMap.end()
290  || eventRangeMap.find("lastEvent")==eventRangeMap.end()
291  || eventRangeMap.find("GUID")==eventRangeMap.end()) {
292  std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong format");
294  ATH_MSG_INFO("Ignoring this event range ");
295  void* errorMessage = malloc(errorStr.size());
296  memcpy(errorMessage,errorStr.data(),errorStr.size());
297  socket2Pilot->send(errorMessage,errorStr.size());
298  continue;
299  }
300  else {
301  ATH_MSG_DEBUG("*** Decoded Event Range ***");
302  std::map<std::string,std::string>::const_iterator it = eventRangeMap.begin(),
303  itEnd = eventRangeMap.end();
304  for(;it!=itEnd;++it)
305  ATH_MSG_DEBUG(it->first << ":" << it->second);
306  ATH_MSG_DEBUG("*** ***");
307  }
308 
309  std::string rangeID = eventRangeMap["eventRangeID"];
310  std::string guid = eventRangeMap["GUID"];
311  int startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
312  int lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
313  if(rangeID.empty()
314  || guid.empty()
315  || lastEvent < startEvent) {
316  std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong values of range fields");
318  ATH_MSG_INFO("Ignoring this event range ");
319  void* errorMessage = malloc(errorStr.size());
320  memcpy(errorMessage,errorStr.data(),errorStr.size());
321  socket2Pilot->send(errorMessage,errorStr.size());
322  continue;
323  }
324 
325  std::string message2ProcessorStr;
326  char* message2Processor(0);
327 
328  std::ostringstream ostr;
329  ostr << rangeID;
330  if(eventRangeMap.find("PFN")!=eventRangeMap.end()) {
331  ostr << "," << "PFN:" << eventRangeMap["PFN"];
332  }
333  ostr << "," << eventRangeMap["startEvent"]
334  << "," << eventRangeMap["lastEvent"];
335  message2ProcessorStr = ostr.str();
336 
337  // CACHING MODE: first get an event range from the pilot, transform it into the tokens
338  // and only after that wait for a new range request by one of the processors
339  if(m_doCaching) {
340  ATH_MSG_DEBUG("Waiting for event range request from one of the processors");
341  while(processorWaitRequest.empty()) {
342  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
343  pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
344  usleep(1000);
345  }
346  ATH_MSG_INFO("One of the processors is ready for the next range");
347  // Get PID from the request and Update m_pid2RangeID
348  workerPid = std::atoi(processorWaitRequest.c_str());
349  auto it = m_pid2RangeID.find(workerPid);
350  if(it!=m_pid2RangeID.end()) {
351  m_pid2RangeID.erase(it);
352  }
353  }
354 
355  // Send to the Processor: RangeID,evtToken[,evtToken]
356  message2Processor = (char*)malloc(message2ProcessorStr.size());
357  memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
358  socket2Processor->send(message2Processor,message2ProcessorStr.size());
359  procReportPending++;
360 
361  // Get PID from the request and Update m_pid2RangeID
362  m_pid2RangeID[workerPid] = rangeID;
363  processorWaitRequest.clear();
364 
365  ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
366  }
367 
368  if(all_ok) {
369  // We are done distributing event tokens.
370  // Tell the workers that the event processing is over
371  // i.e. send out m_nprocs empty messages
372  void* emptyMess4Processor(0);
373  if(!processorWaitRequest.empty()) {
374  // We already have one processor waiting for the answer
375  emptyMess4Processor = malloc(1);
376  socket2Processor->send(emptyMess4Processor,1);
377  ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
378  processorWaitRequest.clear();
379  }
380  bool endLoop{false};
381  while(true) {
382  ATH_MSG_DEBUG("Going to set another processor free");
383  while(processorWaitRequest.empty()) {
384  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
385  if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
386  endLoop = true;
387  break;
388  }
389  usleep(1000);
390  }
391  if(endLoop) break;
392  // Remove worker from m_pid2RangeID
393  workerPid = std::atoi(processorWaitRequest.c_str());
394  auto it = m_pid2RangeID.find(workerPid);
395  if(it!=m_pid2RangeID.end()) {
396  m_pid2RangeID.erase(it);
397  }
398  emptyMess4Processor = malloc(1);
399  socket2Processor->send(emptyMess4Processor,1);
400  ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
401  ATH_MSG_INFO("Still " << procReportPending << " pending reports");
402  processorWaitRequest.clear();
403  }
404  }
405 
406  if(m_appMgr->stop().isFailure()) {
407  ATH_MSG_ERROR("Unable to stop AppMgr");
408  all_ok=false;
409  }
410  else {
411  if(m_appMgr->finalize().isFailure()) {
412  std::cerr << "Unable to finalize AppMgr" << std::endl;
413  all_ok=false;
414  }
415  }
416 
417  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
418  outwork->data = malloc(sizeof(int));
419  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
420  outwork->size = sizeof(int);
421 
422  // ...
423  // (possible) TODO: extend outwork with some error message, which will be eventually
424  // reported in the master proces
425  // ...
426  delete socket2Processor;
427  delete socket2Pilot;
428  delete socketFactory;
429 
430  return outwork;
431 }

◆ extraDeps_update_handler()

void AthCommonDataStore< AthCommonMsg< AlgTool > >::extraDeps_update_handler ( Gaudi::Details::PropertyBase &  ExtraDeps)
protectedinherited

Add StoreName to extra input/output deps as needed.

use the logic of the VarHandleKey to parse the DataObjID keys supplied via the ExtraInputs and ExtraOuputs Properties to add the StoreName if it's not explicitly given

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeScatterer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 433 of file EvtRangeScatterer.cxx.

434 {
435  // Dummy
436  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
437  outwork->data = malloc(sizeof(int));
438  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
439  outwork->size = sizeof(int);
440  return outwork;
441 }

◆ finalize()

StatusCode EvtRangeScatterer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 54 of file EvtRangeScatterer.cxx.

55 {
56  return StatusCode::SUCCESS;
57 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 359 of file AthenaMPToolBase.cxx.

360 {
361  char buf[256];
362  strerror_r(errnum, buf, sizeof(buf));
363  return std::string(buf);
364 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr EvtRangeScatterer::generateOutputReport ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 108 of file EvtRangeScatterer.cxx.

109 {
111  return jobOutputs;
112 }

◆ getNewRangeRequest()

std::string EvtRangeScatterer::getNewRangeRequest ( yampl::ISocket *  socket2Processor,
yampl::ISocket *  socket2Pilot,
int &  procReportPending 
)
private

Definition at line 476 of file EvtRangeScatterer.cxx.

479 {
480  void* processor_request(0);
481  std::string strPeerId;
482  ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
483 
484  if(processorRequestSize==-1) return std::string("");
485  if(processorRequestSize==sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status)) {
486  ATH_MSG_INFO("Processor reported event range processing error");
487  pid_t pid = *((pid_t*)processor_request);
489  std::string errorStr("ERR_ATHENAMP_PROCESS "+ m_pid2RangeID[pid] + ": ");
490  switch(status) {
492  errorStr+=std::string("Not found in the input file");
493  break;
495  errorStr+=std::string("Seek failed");
496  break;
498  errorStr+=std::string("Failed to process event range");
499  break;
501  errorStr+=std::string("Failed to make output file");
502  break;
504  errorStr+=std::string("Failed to set input file");
505  break;
506  default:
507  break;
508  }
509  void* errorMessage = malloc(errorStr.size());
510  memcpy(errorMessage,errorStr.data(),errorStr.size());
511  socket2Pilot->send(errorMessage,errorStr.size());
512  procReportPending--;
513  ATH_MSG_INFO("Error reported to the pilot. Reports pending: " << procReportPending);
514  return std::string("");
515  }
516  std::string strProcessorRequest((const char*)processor_request,processorRequestSize);
517  ATH_MSG_INFO("Received request from a processor: " << strProcessorRequest);
518  // Decode the request. If it contains output file name then pass it over to the pilot and return empty string
519  if(strProcessorRequest.starts_with( "/")) {
520  void* outpFileNameMessage = malloc(strProcessorRequest.size());
521  memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
522  socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
523  procReportPending--;
524  ATH_MSG_INFO("Output file reported to the pilot. Reports pending: " << procReportPending);
525  return std::string("");
526  }
527  return strProcessorRequest;
528 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 422 of file AthenaMPToolBase.cxx.

423 {
424  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
425  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
426  return 0;
427 }

◆ initialize()

StatusCode EvtRangeScatterer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 44 of file EvtRangeScatterer.cxx.

45 {
46  ATH_MSG_DEBUG("In initialize");
47 
49  if(!sc.isSuccess()) return sc;
50 
51  return StatusCode::SUCCESS;
52 }

◆ inputHandles()

virtual std::vector<Gaudi::DataHandle*> AthCommonDataStore< AthCommonMsg< AlgTool > >::inputHandles ( ) const
overridevirtualinherited

Return this algorithm's input handles.

We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.

◆ interfaceID()

static const InterfaceID& IAthenaMPTool::interfaceID ( )
inlinestaticinherited

Definition at line 40 of file IAthenaMPTool.h.

40 { return IID_IAthenaMPTool; }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Implements IAthenaMPTool.

Definition at line 214 of file AthenaMPToolBase.cxx.

215 {
217  kill(child.getProcessID(),SIGKILL);
218  }
219 }

◆ msg() [1/2]

MsgStream& AthCommonMsg< AlgTool >::msg ( ) const
inlineinherited

Definition at line 24 of file AthCommonMsg.h.

24  {
25  return this->msgStream();
26  }

◆ msg() [2/2]

MsgStream& AthCommonMsg< AlgTool >::msg ( const MSG::Level  lvl) const
inlineinherited

Definition at line 27 of file AthCommonMsg.h.

27  {
28  return this->msgStream(lvl);
29  }

◆ msgLvl()

bool AthCommonMsg< AlgTool >::msgLvl ( const MSG::Level  lvl) const
inlineinherited

Definition at line 30 of file AthCommonMsg.h.

30  {
31  return this->msgLevel(lvl);
32  }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ operator=()

◆ outputHandles()

virtual std::vector<Gaudi::DataHandle*> AthCommonDataStore< AthCommonMsg< AlgTool > >::outputHandles ( ) const
overridevirtualinherited

Return this algorithm's output handles.

We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.

◆ pollFailedPidQueue()

pid_t EvtRangeScatterer::pollFailedPidQueue ( AthenaInterprocess::SharedQueue sharedFailedPidQueue,
yampl::ISocket *  socket2Pilot,
int &  procReportPending 
)
private

Definition at line 530 of file EvtRangeScatterer.cxx.

533 {
534  pid_t pid{0};
535  if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
536  && pid!=-1) {
537  ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
538  auto itPid = m_pid2RangeID.find(pid);
539  if(itPid!=m_pid2RangeID.end()) {
540  ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
541 
542  std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
543  void* errorMessage = malloc(errorStr.size());
544  memcpy(errorMessage,errorStr.data(),errorStr.size());
545  socket2Pilot->send(errorMessage,errorStr.size());
546  --procReportPending;
547  m_pid2RangeID.erase(pid);
548  }
549  ATH_MSG_INFO("Reports pending: " << procReportPending);
550  }
551  return pid;
552 }

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 282 of file AthenaMPToolBase.cxx.

283 {
284  // Redirect both stdout and stderr to the same file AthenaMP.log
285  int dup2result1(0), dup2result2(0);
286 
287  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
288  if(newout==-1) {
289  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
290  return -1;
291  }
292  dup2result1 = dup2(newout, STDOUT_FILENO);
293  dup2result2 = dup2(newout, STDERR_FILENO);
294  TEMP_FAILURE_RETRY(close(newout));
295  if(dup2result1==-1) {
296  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
297  return -1;
298  }
299  if(dup2result2==-1) {
300  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
301  return -1;
302  }
303 
304  if(addTimeStamp) {
305  SmartIF<IProperty> propertyServer(msgSvc());
306  if(propertyServer==0) {
307  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
308  return -1;
309  }
310 
311  std::string propertyName("Format");
312  std::string oldFormat("");
313  StringProperty formatProp(propertyName,oldFormat);
314  StatusCode sc = propertyServer->getProperty(&formatProp);
315  if(sc.isFailure()) {
316  ATH_MSG_WARNING("Message Service does not have Format property");
317  }
318  else {
319  oldFormat = formatProp.value();
320  if(oldFormat.find("%t")==std::string::npos) {
321  // Add time stamps
322  std::string newFormat("%t " + oldFormat);
323  StringProperty newFormatProp(propertyName,newFormat);
324  if(propertyServer->setProperty(newFormatProp).isFailure()) {
325  ATH_MSG_ERROR("Unable to set new Format property on the Message Service");
326  return -1;
327  }
328  }
329  else {
330  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
331  }
332  }
333  }
334 
335  return 0;
336 }

◆ renounce()

std::enable_if_t<std::is_void_v<std::result_of_t<decltype(&T::renounce)(T)> > && !std::is_base_of_v<SG::VarHandleKeyArray, T> && std::is_base_of_v<Gaudi::DataHandle, T>, void> AthCommonDataStore< AthCommonMsg< AlgTool > >::renounce ( T &  h)
inlineprotectedinherited

Definition at line 380 of file AthCommonDataStore.h.

381  {
382  h.renounce();
383  PBASE::renounce (h);
384  }

◆ renounceArray()

void AthCommonDataStore< AthCommonMsg< AlgTool > >::renounceArray ( SG::VarHandleKeyArray handlesArray)
inlineprotectedinherited

remove all handles from I/O resolution

Definition at line 364 of file AthCommonDataStore.h.

364  {
365  handlesArray.renounce();
366  }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 445 of file AthenaMPToolBase.cxx.

446 {
447  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
448  int old_openflags = fcntl(fd,F_GETFL,0);
449  switch(old_openflags & O_ACCMODE) {
450  case O_RDONLY: {
451  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
452  break;
453  }
454  case O_WRONLY: {
455  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
456  break;
457  }
458  case O_RDWR: {
459  ATH_MSG_DEBUG("The File Access Mode is RDWR");
460  break;
461  }
462  }
463 
464  int old_descflags = fcntl(fd,F_GETFD,0);
465  off_t oldpos = lseek(fd,0,SEEK_CUR);
466  if(oldpos==-1) {
467  if(errno==ESPIPE) {
468  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
469  }
470  else {
471  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
472  return -1;
473  }
474  }
475  else {
476  Io::Fd newfd = open(name.c_str(),old_openflags);
477  if(newfd==-1) {
478  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
479  return -1;
480  }
481  if(lseek(newfd,oldpos,SEEK_SET)==-1){
482  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
483  TEMP_FAILURE_RETRY(close(newfd));
484  return -1;
485  }
486  TEMP_FAILURE_RETRY(close(fd));
487  if(dup2(newfd,fd)==-1) {
488  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
489  TEMP_FAILURE_RETRY(close(newfd));
490  return -1;
491  }
492  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
493  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
494  TEMP_FAILURE_RETRY(close(newfd));
495  return -1;
496  }
497  TEMP_FAILURE_RETRY(close(newfd));
498  }
499  return 0;
500 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 366 of file AthenaMPToolBase.cxx.

367 {
368  // Reopen file descriptors.
369  // First go over all open files, which have been registered with the FileMgr
370  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
371  std::set<int> fdLog;
372 
373  // Query the FileMgr contents
374  std::vector<const Io::FileAttr*> filemgrFiles;
375  std::vector<const Io::FileAttr*>::const_iterator itFile;
376  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
377  if(filenum!=filemgrFiles.size())
378  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
379 
380  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
381  ATH_MSG_DEBUG("* " << **itFile);
382  const std::string& filename = (**itFile).name();
383  Io::Fd fd = (**itFile).fd();
384 
385  if(fd==-1) {
386  // It is legal to have fd=-1 for remote inputs
387  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
388  // So, this hopefully is a temporary patch
389  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
390  continue;
391  }
392 
393  if(reopenFd(fd,filename))
394  return -1;
395 
396  fdLog.insert(fd);
397  }
398 
399  // Check the FdsRegistry
400  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
401  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
402  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
403  }
404  else {
405  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
406 
407  if(regEntry.fd==-1) {
408  // Same protection as the one above
409  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
410  continue;
411  }
412 
413  if(reopenFd(regEntry.fd,regEntry.name))
414  return -1;
415 
416  fdLog.insert(regEntry.fd);
417  }
418  }
419  return 0;
420 }

◆ reportSubprocessStatuses()

void AthenaMPToolBase::reportSubprocessStatuses ( )
overridevirtualinherited

Implements IAthenaMPTool.

Reimplemented in EvtRangeProcessor, SharedEvtQueueConsumer, and SharedHiveEvtQueueConsumer.

Definition at line 124 of file AthenaMPToolBase.cxx.

125 {
126  ATH_MSG_INFO("Statuses of sub-processes");
127  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
128  for(size_t i=0; i<statuses.size(); ++i)
129  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS"));
130 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Implements IAthenaMPTool.

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Implements IAthenaMPTool.

Definition at line 46 of file AthenaMPToolBase.h.

46 {m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Implements IAthenaMPTool.

Definition at line 209 of file AthenaMPToolBase.cxx.

210 {
211  m_randStr = randStr;
212 }

◆ subProcessLogs()

void EvtRangeScatterer::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Implements IAthenaMPTool.

Definition at line 100 of file EvtRangeScatterer.cxx.

101 {
102  filenames.clear();
103  std::filesystem::path reader_rundir(m_subprocTopDir);
104  reader_rundir/= std::filesystem::path(m_subprocDirPrefix);
105  filenames.push_back(reader_rundir.string()+std::string("/AthenaMP.log"));
106 }

◆ sysInitialize()

virtual StatusCode AthCommonDataStore< AthCommonMsg< AlgTool > >::sysInitialize ( )
overridevirtualinherited

Perform system initialization for an algorithm.

We override this to declare all the elements of handle key arrays at the end of initialization. See comments on updateVHKA.

Reimplemented in DerivationFramework::CfAthAlgTool, AthCheckedComponent< AthAlgTool >, AthCheckedComponent<::AthAlgTool >, and asg::AsgMetadataTool.

◆ sysStart()

virtual StatusCode AthCommonDataStore< AthCommonMsg< AlgTool > >::sysStart ( )
overridevirtualinherited

Handle START transition.

We override this in order to make sure that conditions handle keys can cache a pointer to the conditions container.

◆ trimRangeStrings()

void EvtRangeScatterer::trimRangeStrings ( std::string &  str)
private

Definition at line 443 of file EvtRangeScatterer.cxx.

444 {
445  size_t i(0);
446  // get rid of leading spaces
447  while(i<str.size() && str[i]==' ') i++;
448  if(i) str = str.substr(i);
449 
450  if(str.empty()) return; // Corner case: string consists only of spaces
451 
452  // get rid of trailing spaces
453  i=str.size()-1;
454  while(str[i]==' ') i--;
455  if(i) str.resize(i+1);
456 
457  // the string might be enclosed by either
458  // "u\'" and "\'"
459  // or
460  // "\"" and "\""
461  // Get rid of them!
462  if(str.starts_with( "u\'")) {
463  str = str.substr(2);
464  if(str.ends_with( "\'")) {
465  str.resize(str.size()-1);
466  }
467  }
468  else if(str.starts_with( "\"")) {
469  str = str.substr(1);
470  if(str.ends_with( "\"")) {
471  str.resize(str.size()-1);
472  }
473  }
474 }

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 338 of file AthenaMPToolBase.cxx.

339 {
340  if (!m_ioMgr.retrieve().isSuccess()) {
341  ATH_MSG_ERROR("Error retrieving IoComponentMgr");
342  return -1;
343  } else {
344  ATH_MSG_DEBUG("Successfully retrieved IoComponentMgr");
345  }
346 
347  // update the IoRegistry for the new workdir - make sure we use absolute path
349  if(!m_ioMgr->io_update_all(abs_rundir.string()).isSuccess()) {
350  ATH_MSG_ERROR("Error updating IoRegistry");
351  return -1;
352  } else {
353  ATH_MSG_DEBUG("Successfully updated IoRegistry");
354  }
355 
356  return 0;
357 }

◆ updateVHKA()

void AthCommonDataStore< AthCommonMsg< AlgTool > >::updateVHKA ( Gaudi::Details::PropertyBase &  )
inlineinherited

Definition at line 308 of file AthCommonDataStore.h.

308  {
309  // debug() << "updateVHKA for property " << p.name() << " " << p.toString()
310  // << " size: " << m_vhka.size() << endmsg;
311  for (auto &a : m_vhka) {
312  std::vector<SG::VarHandleKey*> keys = a->keys();
313  for (auto k : keys) {
314  k->setOwner(this);
315  }
316  }
317  }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Implements IAthenaMPTool.

Definition at line 204 of file AthenaMPToolBase.cxx.

205 {
207 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 429 of file AthenaMPToolBase.cxx.

430 {
431  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
432  sigset_t mask, oldmask;
433 
435 
436  sigemptyset (&mask);
437  sigaddset (&mask, SIGUSR1);
438 
439  sigprocmask (SIG_BLOCK, &mask, &oldmask);
441  sigsuspend (&oldmask);
442  sigprocmask (SIG_UNBLOCK, &mask, NULL);
443 }

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_detStore

StoreGateSvc_t AthCommonDataStore< AthCommonMsg< AlgTool > >::m_detStore
privateinherited

Pointer to StoreGate (detector store by default)

Definition at line 393 of file AthCommonDataStore.h.

◆ m_doCaching

bool EvtRangeScatterer::m_doCaching
private

Definition at line 65 of file EvtRangeScatterer.h.

◆ m_eventRangeChannel

StringProperty EvtRangeScatterer::m_eventRangeChannel
private

Definition at line 64 of file EvtRangeScatterer.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Definition at line 90 of file AthenaMPToolBase.h.

◆ m_evtStore

StoreGateSvc_t AthCommonDataStore< AthCommonMsg< AlgTool > >::m_evtStore
privateinherited

Pointer to StoreGate (event store by default)

Definition at line 390 of file AthCommonDataStore.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 104 of file AthenaMPToolBase.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt
protectedinherited

Definition at line 87 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 93 of file AthenaMPToolBase.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs
protectedinherited

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_pid2RangeID

Pid2RangeID EvtRangeScatterer::m_pid2RangeID
private

Definition at line 66 of file EvtRangeScatterer.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_processorChannel

StringProperty EvtRangeScatterer::m_processorChannel
private

Definition at line 63 of file EvtRangeScatterer.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 102 of file AthenaMPToolBase.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_varHandleArraysDeclared

bool AthCommonDataStore< AthCommonMsg< AlgTool > >::m_varHandleArraysDeclared
privateinherited

Definition at line 399 of file AthCommonDataStore.h.

◆ m_vhka

std::vector<SG::VarHandleKeyArray*> AthCommonDataStore< AthCommonMsg< AlgTool > >::m_vhka
privateinherited

Definition at line 398 of file AthCommonDataStore.h.


The documentation for this class was generated from the following files:
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:116
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
EvtRangeScatterer::getNewRangeRequest
std::string getNewRangeRequest(yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
Definition: EvtRangeScatterer.cxx:476
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:65
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
EvtRangeScatterer::pollFailedPidQueue
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
Definition: EvtRangeScatterer.cxx:530
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:92
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:102
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:59
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:71
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:359
AthCommonDataStore< AthCommonMsg< AlgTool > >::m_evtStore
StoreGateSvc_t m_evtStore
Pointer to StoreGate (event store by default)
Definition: AthCommonDataStore.h:390
AthCommonDataStore< AthCommonMsg< AlgTool > >::m_vhka
std::vector< SG::VarHandleKeyArray * > m_vhka
Definition: AthCommonDataStore.h:398
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
EvtRangeScatterer::trimRangeStrings
void trimRangeStrings(std::string &)
Definition: EvtRangeScatterer.cxx:443
athena.exitcode
int exitcode
Definition: athena.py:161
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
read_hist_ntuple.t
t
Definition: read_hist_ntuple.py:5
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:62
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:460
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:70
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
SG::VarHandleKeyArray::setOwner
virtual void setOwner(IDataHandleHolder *o)=0
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
IDTPMcnv.htype
htype
Definition: IDTPMcnv.py:29
sigset_t
int sigset_t
Definition: SealSignal.h:80
EvtRangeScatterer::m_doCaching
bool m_doCaching
Definition: EvtRangeScatterer.h:65
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:99
python.utils.AtlRunQueryDQUtils.p
p
Definition: AtlRunQueryDQUtils.py:210
EvtRangeScatterer::m_eventRangeChannel
StringProperty m_eventRangeChannel
Definition: EvtRangeScatterer.h:64
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
RunTileMonitoring.keyValue
keyValue
Definition: RunTileMonitoring.py:150
AthCommonDataStore
Definition: AthCommonDataStore.h:52
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:96
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
python.LArMinBiasAlgConfig.int
int
Definition: LArMinBiasAlgConfig.py:59
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:445
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:27
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:60
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
AthCommonDataStore< AthCommonMsg< AlgTool > >::m_detStore
StoreGateSvc_t m_detStore
Pointer to StoreGate (detector store by default)
Definition: AthCommonDataStore.h:393
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:338
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:56
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:93
SG::VarHandleKeyArray::renounce
virtual void renounce()=0
pool_uuid.guid
guid
Definition: pool_uuid.py:112
SG::HandleClassifier::type
std::conditional< std::is_base_of< SG::VarHandleKeyArray, T >::value, VarHandleKeyArrayType, type2 >::type type
Definition: HandleClassifier.h:54
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
merge_scale_histograms.doc
string doc
Definition: merge_scale_histograms.py:9
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:63
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
checkRpcDigits.errorStr
string errorStr
Definition: checkRpcDigits.py:182
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:98
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:282
AthenaInterprocess::Process
Definition: Process.h:17
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Definition: AthenaMPToolBase.h:87
EvtRangeScatterer::m_processorChannel
StringProperty m_processorChannel
Definition: EvtRangeScatterer.h:63
a
TList * a
Definition: liststreamerinfos.cxx:10
EvtRangeScatterer::m_pid2RangeID
Pid2RangeID m_pid2RangeID
Definition: EvtRangeScatterer.h:66
h
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
SG::VarHandleBase::vhKey
SG::VarHandleKey & vhKey()
Return a non-const reference to the HandleKey.
Definition: StoreGate/src/VarHandleBase.cxx:629
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
str
Definition: BTagTrackIpAccessor.cxx:11
python.Bindings.keys
keys
Definition: Control/AthenaPython/python/Bindings.py:798
merge.status
status
Definition: merge.py:17
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:89
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:61
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:422
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:97
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthCommonDataStore::declareGaudiProperty
Gaudi::Details::PropertyBase & declareGaudiProperty(Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
specialization for handling Gaudi::Property<SG::VarHandleKey>
Definition: AthCommonDataStore.h:156
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:69
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:366
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:64
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:101
fitman.k
k
Definition: fitman.py:528
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:26