ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
SharedEvtQueueProvider Class Referencefinal

#include <SharedEvtQueueProvider.h>

Inheritance diagram for SharedEvtQueueProvider:
Collaboration diagram for SharedEvtQueueProvider:

Public Member Functions

 SharedEvtQueueProvider (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~SharedEvtQueueProvider () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual void handle (const Incident &inc) override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 

Private Member Functions

 SharedEvtQueueProvider ()
 
 SharedEvtQueueProvider (const SharedEvtQueueProvider &)
 
SharedEvtQueueProvideroperator= (const SharedEvtQueueProvider &)
 
void addEventsToQueue ()
 

Private Attributes

Gaudi::Property< bool > m_useSharedReader {this, "UseSharedReader", false, "Use shared reader"}
 
Gaudi::Property< int > m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
 
Gaudi::Property< int > m_nChunkSize {this, "ChunkSize", 1}
 
int m_nprocesses {-1}
 We use this data member for adding negative numbers at the end of the event queue. More...
 
int m_nChunkStart {0}
 The beginning of the current chunk. More...
 
int m_nPositionInChunk {0}
 Position within the current chunk. More...
 
int m_nEvtRequested {-1}
 Max event received from AppMgr. More...
 
int m_nEvtCounted {0}
 The number of events this tool has counted itself in the input files. More...
 
AthenaInterprocess::SharedQueuem_sharedEventQueue {nullptr}
 
SmartIF< IEventSharem_evtShare
 

Detailed Description

Definition at line 14 of file SharedEvtQueueProvider.h.

Constructor & Destructor Documentation

◆ SharedEvtQueueProvider() [1/3]

SharedEvtQueueProvider::SharedEvtQueueProvider ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 28 of file SharedEvtQueueProvider.cxx.

31  : base_class(type,name,parent)
32 {
33  m_subprocDirPrefix = "evt_counter";
34 }

◆ ~SharedEvtQueueProvider()

SharedEvtQueueProvider::~SharedEvtQueueProvider ( )
overridevirtual

Definition at line 36 of file SharedEvtQueueProvider.cxx.

37 {
38 }

◆ SharedEvtQueueProvider() [2/3]

SharedEvtQueueProvider::SharedEvtQueueProvider ( )
private

◆ SharedEvtQueueProvider() [3/3]

SharedEvtQueueProvider::SharedEvtQueueProvider ( const SharedEvtQueueProvider )
private

Member Function Documentation

◆ addEventsToQueue()

void SharedEvtQueueProvider::addEventsToQueue ( )
private

Definition at line 357 of file SharedEvtQueueProvider.cxx.

358 {
359  ATH_MSG_DEBUG("in addEventsToQueue");
360  long newValueForQueue = ((long)(m_nPositionInChunk-m_nChunkStart)<<(sizeof(int)*8))|m_nChunkStart;
361  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
362  usleep(100);
363  }
364  ATH_MSG_INFO("Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
365  << " which corresponds to Chunks start " << m_nChunkStart
366  << " and chunk size " << m_nPositionInChunk-m_nChunkStart);
367 }

◆ ATLAS_NOT_THREAD_SAFE() [1/2]

virtual StatusCode exec SharedEvtQueueProvider::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [2/2]

virtual int makePool SharedEvtQueueProvider::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

◆ bootstrap_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueProvider::bootstrap_func ( )
overridevirtual

Definition at line 110 of file SharedEvtQueueProvider.cxx.

111 {
112  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
113  outwork->data = CxxUtils::xmalloc(sizeof(int));
114  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
115  outwork->size = sizeof(int);
116 
117  // ...
118  // (possible) TODO: extend outwork with some error message, which will be eventually
119  // reported in the master proces
120  // ...
121 
122  // ________________________ mkdir ________________________
123  std::filesystem::path counter_rundir(m_subprocTopDir);
124  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
125 
126  if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
127  ATH_MSG_ERROR( "Unable to make event counter run directory: " << counter_rundir.string() << ". " << fmterror(errno) );
128  return outwork;
129  }
130 
131  // ________________________ Redirect logs ________________________
132  if(redirectLog(counter_rundir.string()))
133  return outwork;
134 
135  ATH_MSG_INFO( "Logs redirected in the AthenaMP event event counter PID=" << getpid() );
136 
137  // ________________________ Update Io Registry ____________________________
138  if(updateIoReg(counter_rundir.string()))
139  return outwork;
140 
141  ATH_MSG_INFO( "Io registry updated in the AthenaMP event event counter PID=" << getpid() );
142 
143  // _______________________ Handle saved PFC (if any) ______________________
144  std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
145  if(handleSavedPfc(abs_counter_rundir))
146  return outwork;
147 
148  // ________________________ reopen descriptors ____________________________
149  if(reopenFds())
150  return outwork;
151 
152  ATH_MSG_INFO( "File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
153 
154  // _______________________ event counting ________________________________
155  // Use incident service for registering a EndInputFile handler
156  SmartIF<IIncidentSvc> incsvc(serviceLocator()->service("IncidentSvc"));
157  if(!incsvc) {
158  ATH_MSG_ERROR( "Error retrieving IncidentSvc" );
159  return outwork;
160  }
161 
162  incsvc->addListener(this,"EndInputFile");
163  ATH_MSG_DEBUG( "Added self as listener to EndInputFile" );
164 
165  // _______________________ event sharing ________________________________
166  // Use EventSelector as SharedReader (if configured) and enable output streaming
167  if (m_useSharedReader) {
168  m_evtShare = SmartIF<IEventShare>(m_evtSelector);
169  if(!m_evtShare) {
170  ATH_MSG_ERROR( "Failed to dyncast event selector to IEventShare" );
171  return outwork;
172  } else {
173  if(!m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
174  ATH_MSG_ERROR("Failed to make the event selector a share server");
175  return outwork;
176  } else {
177  ATH_MSG_DEBUG("Successfully made the event selector a share server");
178  }
179  }
180  }
181 
182  // ________________________ I/O reinit ________________________
183  if(!m_ioMgr->io_reinitialize().isSuccess()) {
184  ATH_MSG_ERROR( "Failed to reinitialize I/O" );
185  return outwork;
186  } else {
187  ATH_MSG_DEBUG( "Successfully reinitialized I/O" );
188  }
189 
190  // ________________________ Event selector restart ________________________
191  if(m_evtSelector) {
192  SmartIF<IService> evtSelSvc(m_evtSelector);
193  if(!evtSelSvc) {
194  ATH_MSG_ERROR( "Failed to dyncast event selector to IService" );
195  return outwork;
196  }
197  if(!evtSelSvc->start().isSuccess()) {
198  ATH_MSG_ERROR( "Failed to restart the event selector" );
199  return outwork;
200  } else {
201  ATH_MSG_DEBUG( "Successfully restarted the event selector" );
202  }
203  }
204  // ________________________ chdir ________________________
205  if(chdir(counter_rundir.string().c_str())==-1) {
206  ATH_MSG_ERROR( "Failed to chdir to " << counter_rundir.string() );
207  return outwork;
208  }
209 
210  // Declare success and return
211  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
212  return outwork;
213 }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueProvider::exec_func ( )
overridevirtual

Definition at line 215 of file SharedEvtQueueProvider.cxx.

216 {
217  ATH_MSG_INFO("Exec function in the AthenaMP Event Counter PID=" << getpid());
218 
219  bool all_ok(true);
220 
221  int skipEvents(0);
222  IEvtSelector::Context* evtContext(nullptr);
223 
224  // Get SkipEvents property of the event selector
225  if(m_evtSelector) {
226  SmartIF<IProperty> propertyServer(m_evtSelector);
227  if(propertyServer==0) {
228  ATH_MSG_ERROR( "Unable to cast event selector to IProperty" );
229  all_ok=false;
230  }
231  else {
232  std::string propertyName("SkipEvents");
233  IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
234  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
235  ATH_MSG_INFO( "Event Selector does not have SkipEvents property" );
236  }
237  else {
238  skipEvents = skipEventsProp.value();
239  }
240  }
241 
242  if(all_ok) {
243  StatusCode sc = m_evtSelector->createContext(evtContext);
244  if(sc.isFailure()) {
245  ATH_MSG_ERROR("Failed to create the event selector context");
246  all_ok=false;
247  }
248  else {
249  // advance to nEventsBeforeFork
250  for(int i(0); i<m_nEventsBeforeFork;++i) {
251  if(!m_evtSelector->next(*evtContext).isSuccess()) {
252  ATH_MSG_ERROR("Unexpected error: EventsBeforeFork>EventsInInputFiles");
253  all_ok=false;
254  break;
255  }
256  }
257  }
258  }
259  }
260 
261  if(all_ok) {
262  if(m_nEvtRequested!=0) { // Take into account corner case with evtMax=0
266  ATH_MSG_VERBOSE("Starting to go through events. Chunk start = " << m_nChunkStart+1);
267 
268  // Loop through all remaining events
269  while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
271  m_nEvtCounted++;
272  ATH_MSG_VERBOSE("Events Counted " << m_nEvtCounted << ", Position in Chunk " << m_nPositionInChunk);
277  if(m_nEvtCounted==m_nEvtRequested) break;
278  }
279  }
280  }
281 
282  // We are done. Add -m_nEvtCounted m_nprocesses-times to the queue
283  long newValueForQueue = (long)(-m_nEvtCounted);
284  for(int i=0;i<m_nprocesses;++i) {
285  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
286  usleep(1000);
287  }
288  }
289 
290  ATH_MSG_INFO("Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
291  << ", Event Chunk size in the queue is " << m_nChunkSize);
292 
294  if(m_evtShare->readEvent(0).isFailure()) {
295  ATH_MSG_ERROR("Failed to read " << m_nEvtRequested << " events");
296  all_ok=false;
297  } else {
298  ATH_MSG_DEBUG("readEvent succeeded");
299  }
300  }
301 
302  if(m_appMgr->stop().isFailure()) {
303  ATH_MSG_ERROR("Unable to stop AppMgr");
304  all_ok=false;
305  }
306  else {
307  if(m_appMgr->finalize().isFailure()) {
308  std::cerr << "Unable to finalize AppMgr" << std::endl;
309  all_ok=false;
310  }
311  }
312  }
313 
314 
315  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
316  outwork->data = CxxUtils::xmalloc(sizeof(int));
317  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
318  outwork->size = sizeof(int);
319 
320  // ...
321  // (possible) TODO: extend outwork with some error message, which will be eventually
322  // reported in the master proces
323  // ...
324  return outwork;
325 }

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueProvider::fin_func ( )
overridevirtual

Definition at line 327 of file SharedEvtQueueProvider.cxx.

328 {
329  // Dummy
330  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
331  outwork->data = CxxUtils::xmalloc(sizeof(int));
332  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
333  outwork->size = sizeof(int);
334  return outwork;
335 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr SharedEvtQueueProvider::generateOutputReport ( )
overridevirtual

Definition at line 104 of file SharedEvtQueueProvider.cxx.

105 {
107  return jobOutputs;
108 }

◆ handle()

void SharedEvtQueueProvider::handle ( const Incident &  inc)
overridevirtual

Definition at line 337 of file SharedEvtQueueProvider.cxx.

338 {
339  ATH_MSG_DEBUG( "Handling incident" );
340 
341  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
342  if(fileInc == 0) {
343  ATH_MSG_WARNING( "Failed to dyn-cast the incident" );
344  return;
345  }
346 
347  if(fileInc->type()=="EndInputFile") {
348  ATH_MSG_DEBUG( "End Input File reached!" );
349 
353  }
354  }
355 }

◆ operator=()

SharedEvtQueueProvider& SharedEvtQueueProvider::operator= ( const SharedEvtQueueProvider )
private

◆ subProcessLogs()

void SharedEvtQueueProvider::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Definition at line 96 of file SharedEvtQueueProvider.cxx.

97 {
98  filenames.clear();
99  std::filesystem::path counter_rundir(m_subprocTopDir);
100  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
101  filenames.push_back(counter_rundir.string()+std::string("/AthenaMP.log"));
102 }

Member Data Documentation

◆ m_evtShare

SmartIF<IEventShare> SharedEvtQueueProvider::m_evtShare
private

Definition at line 60 of file SharedEvtQueueProvider.h.

◆ m_nChunkSize

Gaudi::Property<int> SharedEvtQueueProvider::m_nChunkSize {this, "ChunkSize", 1}
private

Definition at line 45 of file SharedEvtQueueProvider.h.

◆ m_nChunkStart

int SharedEvtQueueProvider::m_nChunkStart {0}
private

The beginning of the current chunk.

Definition at line 53 of file SharedEvtQueueProvider.h.

◆ m_nEventsBeforeFork

Gaudi::Property<int> SharedEvtQueueProvider::m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
private

Definition at line 44 of file SharedEvtQueueProvider.h.

◆ m_nEvtCounted

int SharedEvtQueueProvider::m_nEvtCounted {0}
private

The number of events this tool has counted itself in the input files.

Definition at line 57 of file SharedEvtQueueProvider.h.

◆ m_nEvtRequested

int SharedEvtQueueProvider::m_nEvtRequested {-1}
private

Max event received from AppMgr.

Definition at line 56 of file SharedEvtQueueProvider.h.

◆ m_nPositionInChunk

int SharedEvtQueueProvider::m_nPositionInChunk {0}
private

Position within the current chunk.

Definition at line 54 of file SharedEvtQueueProvider.h.

◆ m_nprocesses

int SharedEvtQueueProvider::m_nprocesses {-1}
private

We use this data member for adding negative numbers at the end of the event queue.

We cannot use m_nprocs for this purpose in order to avoid generating Output File Reports by Shared Queue Providers.

Definition at line 52 of file SharedEvtQueueProvider.h.

◆ m_sharedEventQueue

AthenaInterprocess::SharedQueue* SharedEvtQueueProvider::m_sharedEventQueue {nullptr}
private

Definition at line 59 of file SharedEvtQueueProvider.h.

◆ m_useSharedReader

Gaudi::Property<bool> SharedEvtQueueProvider::m_useSharedReader {this, "UseSharedReader", false, "Use shared reader"}
private

Definition at line 43 of file SharedEvtQueueProvider.h.


The documentation for this class was generated from the following files:
SharedEvtQueueProvider::m_nChunkSize
Gaudi::Property< int > m_nChunkSize
Definition: SharedEvtQueueProvider.h:45
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
SharedEvtQueueProvider::m_nEvtRequested
int m_nEvtRequested
Max event received from AppMgr.
Definition: SharedEvtQueueProvider.h:56
SharedEvtQueueProvider::m_nChunkStart
int m_nChunkStart
The beginning of the current chunk.
Definition: SharedEvtQueueProvider.h:53
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
SharedEvtQueueProvider::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: SharedEvtQueueProvider.h:44
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
SharedEvtQueueProvider::addEventsToQueue
void addEventsToQueue()
Definition: SharedEvtQueueProvider.cxx:357
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
SharedEvtQueueProvider::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueProvider.h:59
SharedEvtQueueProvider::m_nprocesses
int m_nprocesses
We use this data member for adding negative numbers at the end of the event queue.
Definition: SharedEvtQueueProvider.h:52
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:55
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:85
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
SharedEvtQueueProvider::m_nEvtCounted
int m_nEvtCounted
The number of events this tool has counted itself in the input files.
Definition: SharedEvtQueueProvider.h:57
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaInterprocess::SharedQueue::try_send_basic
bool try_send_basic(T)
Definition: SharedQueue.h:88
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
SharedEvtQueueProvider::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueProvider.h:60
grepfile.filenames
list filenames
Definition: grepfile.py:34
SharedEvtQueueProvider::m_useSharedReader
Gaudi::Property< bool > m_useSharedReader
Definition: SharedEvtQueueProvider.h:43
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
SharedEvtQueueProvider::m_nPositionInChunk
int m_nPositionInChunk
Position within the current chunk.
Definition: SharedEvtQueueProvider.h:54
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29