ATLAS Offline Software
SharedEvtQueueProvider.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
7 
8 #include "GaudiKernel/IEvtSelector.h"
9 #include "GaudiKernel/IIncidentSvc.h"
10 #include "GaudiKernel/FileIncident.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/ISvcLocator.h"
14 
15 #include <boost/interprocess/shared_memory_object.hpp>
16 #include <boost/interprocess/mapped_region.hpp>
17 
18 #include <sys/stat.h>
19 #include <sstream>
20 #include <fstream>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <stdint.h>
24 #include <stdexcept>
25 #include <filesystem>
26 
28  , const std::string& name
29  , const IInterface* parent)
31  , m_nprocesses(-1)
32  , m_useSharedReader(false)
33  , m_nEventsBeforeFork(0)
34  , m_nChunkSize(1)
35  , m_nChunkStart(0)
36  , m_nPositionInChunk(0)
37  , m_nEvtRequested(-1)
38  , m_nEvtCounted(0)
39  , m_sharedEventQueue(0)
40  , m_evtShare(0)
41 {
42  declareInterface<IAthenaMPTool>(this);
43 
44  declareProperty("UseSharedReader",m_useSharedReader);
45  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
46  declareProperty("ChunkSize",m_nChunkSize);
47 
48  m_subprocDirPrefix = "evt_counter";
49 }
50 
52 {
53 }
54 
55 int SharedEvtQueueProvider::makePool(int maxevt, int nprocs, const std::string& topdir)
56 {
57  ATH_MSG_DEBUG( "In makePool " << getpid() );
58 
59  if(maxevt < -1) {
60  ATH_MSG_ERROR( "Invalid number of events requested: " << maxevt );
61  return -1;
62  }
63 
64  if(topdir.empty()) {
65  ATH_MSG_ERROR( "Empty name for the top directory!" );
66  return -1;
67  }
68 
69  if(m_nChunkSize<=0) {
70  ATH_MSG_ERROR( "Non-positive chunk size requested: " << m_nChunkSize);
71  return -1;
72  }
73 
74  m_nEvtRequested = maxevt;
75  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
77  m_subprocTopDir = topdir;
78 
79  // Create event queue
80  ATH_MSG_DEBUG( "Event queue name " << "AthenaMPEventQueue_" << m_randStr );
81  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
82  if(sc.isFailure()) {
83  ATH_MSG_ERROR( "Unable to retrieve the pointer to Shared Event Queue" );
84  return -1;
85  }
86 
87  // Create the process group and map_async bootstrap
89  ATH_MSG_INFO( "Event Counter process created" );
90  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
91  return -1;
92  ATH_MSG_INFO( "Event Counter bootstraped" );
93 
94  return 1;
95 }
96 
98 {
99  ATH_MSG_DEBUG( "In exec " << getpid() );
100 
101  // Map exec flag
102  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC)) {
103  ATH_MSG_ERROR( "Unable to send work to the event counter" );
104  return StatusCode::FAILURE;
105  }
106 
107  // Map exit flag
108  if(m_processGroup->map_async(0,0)){
109  ATH_MSG_ERROR( "Unable to set exit to the event counter" );
110  return StatusCode::FAILURE;
111  }
112  return StatusCode::SUCCESS;
113 }
114 
115 void SharedEvtQueueProvider::subProcessLogs(std::vector<std::string>& filenames)
116 {
117  filenames.clear();
118  std::filesystem::path counter_rundir(m_subprocTopDir);
119  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
120  filenames.push_back(counter_rundir.string()+std::string("/AthenaMP.log"));
121 }
122 
124 {
126  return jobOutputs;
127 }
128 
129 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::bootstrap_func()
130 {
131  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
132  outwork->data = malloc(sizeof(int));
133  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
134  outwork->size = sizeof(int);
135 
136  // ...
137  // (possible) TODO: extend outwork with some error message, which will be eventually
138  // reported in the master proces
139  // ...
140 
141  // ________________________ mkdir ________________________
142  std::filesystem::path counter_rundir(m_subprocTopDir);
143  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
144 
145  if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
146  ATH_MSG_ERROR( "Unable to make event counter run directory: " << counter_rundir.string() << ". " << fmterror(errno) );
147  return outwork;
148  }
149 
150  // ________________________ Redirect logs ________________________
151  if(redirectLog(counter_rundir.string()))
152  return outwork;
153 
154  ATH_MSG_INFO( "Logs redirected in the AthenaMP event event counter PID=" << getpid() );
155 
156  // ________________________ Update Io Registry ____________________________
157  if(updateIoReg(counter_rundir.string()))
158  return outwork;
159 
160  ATH_MSG_INFO( "Io registry updated in the AthenaMP event event counter PID=" << getpid() );
161 
162  // _______________________ Handle saved PFC (if any) ______________________
163  std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
164  if(handleSavedPfc(abs_counter_rundir))
165  return outwork;
166 
167  // ________________________ reopen descriptors ____________________________
168  if(reopenFds())
169  return outwork;
170 
171  ATH_MSG_INFO( "File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
172 
173  // _______________________ event counting ________________________________
174  // Use incident service for registering a EndInputFile handler
175  SmartIF<IIncidentSvc> incsvc(serviceLocator()->service("IncidentSvc"));
176  if(!incsvc) {
177  ATH_MSG_ERROR( "Error retrieving IncidentSvc" );
178  return outwork;
179  }
180 
181  incsvc->addListener(this,"EndInputFile");
182  ATH_MSG_DEBUG( "Added self as listener to EndInputFile" );
183 
184  // _______________________ event sharing ________________________________
185  // Use EventSelector as SharedReader (if configured) and enable output streaming
186  if (m_useSharedReader) {
187  m_evtShare = SmartIF<IEventShare>(m_evtSelector);
188  if(!m_evtShare) {
189  ATH_MSG_ERROR( "Failed to dyncast event selector to IEventShare" );
190  return outwork;
191  } else {
192  if(!m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
193  ATH_MSG_ERROR("Failed to make the event selector a share server");
194  return outwork;
195  } else {
196  ATH_MSG_DEBUG("Successfully made the event selector a share server");
197  }
198  }
199  }
200 
201  // ________________________ I/O reinit ________________________
202  if(!m_ioMgr->io_reinitialize().isSuccess()) {
203  ATH_MSG_ERROR( "Failed to reinitialize I/O" );
204  return outwork;
205  } else {
206  ATH_MSG_DEBUG( "Successfully reinitialized I/O" );
207  }
208 
209  // ________________________ Event selector restart ________________________
210  if(m_evtSelector) {
211  SmartIF<IService> evtSelSvc(m_evtSelector);
212  if(!evtSelSvc) {
213  ATH_MSG_ERROR( "Failed to dyncast event selector to IService" );
214  return outwork;
215  }
216  if(!evtSelSvc->start().isSuccess()) {
217  ATH_MSG_ERROR( "Failed to restart the event selector" );
218  return outwork;
219  } else {
220  ATH_MSG_DEBUG( "Successfully restarted the event selector" );
221  }
222  }
223  // ________________________ chdir ________________________
224  if(chdir(counter_rundir.string().c_str())==-1) {
225  ATH_MSG_ERROR( "Failed to chdir to " << counter_rundir.string() );
226  return outwork;
227  }
228 
229  // Declare success and return
230  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
231  return outwork;
232 }
233 
234 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::exec_func()
235 {
236  ATH_MSG_INFO("Exec function in the AthenaMP Event Counter PID=" << getpid());
237 
238  bool all_ok(true);
239 
240  int skipEvents(0);
241  IEvtSelector::Context* evtContext(nullptr);
242 
243  // Get SkipEvents property of the event selector
244  if(m_evtSelector) {
245  SmartIF<IProperty> propertyServer(m_evtSelector);
246  if(propertyServer==0) {
247  ATH_MSG_ERROR( "Unable to cast event selector to IProperty" );
248  all_ok=false;
249  }
250  else {
251  std::string propertyName("SkipEvents");
252  IntegerProperty skipEventsProp(propertyName,skipEvents);
253  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
254  ATH_MSG_INFO( "Event Selector does not have SkipEvents property" );
255  }
256  else {
257  skipEvents = skipEventsProp.value();
258  }
259  }
260 
261  if(all_ok) {
262  StatusCode sc = m_evtSelector->createContext(evtContext);
263  if(sc.isFailure()) {
264  ATH_MSG_ERROR("Failed to create the event selector context");
265  all_ok=false;
266  }
267  else {
268  // advance to nEventsBeforeFork
269  for(int i(0); i<m_nEventsBeforeFork;++i) {
270  if(!m_evtSelector->next(*evtContext).isSuccess()) {
271  ATH_MSG_ERROR("Unexpected error: EventsBeforeFork>EventsInInputFiles");
272  all_ok=false;
273  break;
274  }
275  }
276  }
277  }
278  }
279 
280  if(all_ok) {
281  if(m_nEvtRequested!=0) { // Take into account corner case with evtMax=0
285  ATH_MSG_VERBOSE("Starting to go through events. Chunk start = " << m_nChunkStart+1);
286 
287  // Loop through all remaining events
288  while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
290  m_nEvtCounted++;
291  ATH_MSG_VERBOSE("Events Counted " << m_nEvtCounted << ", Position in Chunk " << m_nPositionInChunk);
296  if(m_nEvtCounted==m_nEvtRequested) break;
297  }
298  }
299  }
300 
301  // We are done. Add -m_nEvtCounted m_nprocesses-times to the queue
302  long newValueForQueue = (long)(-m_nEvtCounted);
303  for(int i=0;i<m_nprocesses;++i) {
304  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
305  usleep(1000);
306  }
307  }
308 
309  ATH_MSG_INFO("Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
310  << ", Event Chunk size in the queue is " << m_nChunkSize);
311 
313  if(m_evtShare->readEvent(0).isFailure()) {
314  ATH_MSG_ERROR("Failed to read " << m_nEvtRequested << " events");
315  all_ok=false;
316  } else {
317  ATH_MSG_DEBUG("readEvent succeeded");
318  }
319  }
320 
321  if(m_appMgr->stop().isFailure()) {
322  ATH_MSG_ERROR("Unable to stop AppMgr");
323  all_ok=false;
324  }
325  else {
326  if(m_appMgr->finalize().isFailure()) {
327  std::cerr << "Unable to finalize AppMgr" << std::endl;
328  all_ok=false;
329  }
330  }
331  }
332 
333 
334  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
335  outwork->data = malloc(sizeof(int));
336  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
337  outwork->size = sizeof(int);
338 
339  // ...
340  // (possible) TODO: extend outwork with some error message, which will be eventually
341  // reported in the master proces
342  // ...
343  return outwork;
344 }
345 
346 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::fin_func()
347 {
348  // Dummy
349  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
350  outwork->data = malloc(sizeof(int));
351  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
352  outwork->size = sizeof(int);
353  return outwork;
354 }
355 
356 void SharedEvtQueueProvider::handle(const Incident& inc)
357 {
358  ATH_MSG_DEBUG( "Handling incident" );
359 
360  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
361  if(fileInc == 0) {
362  ATH_MSG_WARNING( "Failed to dyn-cast the incident" );
363  return;
364  }
365 
366  if(fileInc->type()=="EndInputFile") {
367  ATH_MSG_DEBUG( "End Input File reached!" );
368 
372  }
373  }
374 }
375 
377 {
378  ATH_MSG_DEBUG("in addEventsToQueue");
379  long newValueForQueue = ((long)(m_nPositionInChunk-m_nChunkStart)<<(sizeof(int)*8))|m_nChunkStart;
380  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
381  usleep(100);
382  }
383  ATH_MSG_INFO("Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
384  << " which corresponds to Chunks start " << m_nChunkStart
385  << " and chunk size " << m_nPositionInChunk-m_nChunkStart);
386 }
387 
SharedEvtQueueProvider::SharedEvtQueueProvider
SharedEvtQueueProvider()
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
max
#define max(a, b)
Definition: cfImp.cxx:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
SharedEvtQueueProvider::m_useSharedReader
bool m_useSharedReader
Definition: SharedEvtQueueProvider.h:47
SharedEvtQueueProvider::m_nEvtRequested
int m_nEvtRequested
Definition: SharedEvtQueueProvider.h:54
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
SharedEvtQueueProvider::m_nChunkStart
int m_nChunkStart
Definition: SharedEvtQueueProvider.h:50
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:358
SharedEvtQueueProvider::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: SharedEvtQueueProvider.h:48
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
ProcessGroup.h
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
SharedEvtQueueProvider::m_nChunkSize
int m_nChunkSize
Definition: SharedEvtQueueProvider.h:49
SharedEvtQueueProvider::addEventsToQueue
void addEventsToQueue()
Definition: SharedEvtQueueProvider.cxx:376
SharedEvtQueueProvider::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueProvider.h:57
SharedEvtQueueProvider::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: SharedEvtQueueProvider.cxx:123
SharedEvtQueueProvider::m_nprocesses
int m_nprocesses
Definition: SharedEvtQueueProvider.h:45
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
SharedEvtQueueProvider::handle
virtual void handle(const Incident &inc) override
Definition: SharedEvtQueueProvider.cxx:356
SharedEvtQueueProvider::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueProvider.cxx:115
SharedEvtQueueProvider::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueProvider.cxx:234
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:94
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:85
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedEvtQueueProvider::m_nEvtCounted
int m_nEvtCounted
Definition: SharedEvtQueueProvider.h:55
test_pyathena.parent
parent
Definition: test_pyathena.py:15
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:337
SharedEvtQueueProvider::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueProvider.cxx:346
AthenaInterprocess::SharedQueue::try_send_basic
bool try_send_basic(T)
Definition: SharedQueue.h:88
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
SharedEvtQueueProvider::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueProvider.cxx:129
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:281
SharedEvtQueueProvider.h
SharedEvtQueueProvider::~SharedEvtQueueProvider
virtual ~SharedEvtQueueProvider() override
Definition: SharedEvtQueueProvider.cxx:51
SharedEvtQueueProvider::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueProvider.h:58
grepfile.filenames
list filenames
Definition: grepfile.py:34
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:421
SharedEvtQueueProvider::m_nPositionInChunk
int m_nPositionInChunk
Definition: SharedEvtQueueProvider.h:51
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:365