ATLAS Offline Software
SharedEvtQueueProvider.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
7 
8 #include "GaudiKernel/IEvtSelector.h"
9 #include "GaudiKernel/IIncidentSvc.h"
10 #include "GaudiKernel/FileIncident.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/ISvcLocator.h"
14 
15 #include <boost/interprocess/shared_memory_object.hpp>
16 #include <boost/interprocess/mapped_region.hpp>
17 
18 #include <sys/stat.h>
19 #include <sstream>
20 #include <fstream>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <stdint.h>
24 #include <stdexcept>
25 #include <filesystem>
26 
28  , const std::string& name
29  , const IInterface* parent)
31  , m_nprocesses(-1)
32  , m_useSharedReader(false)
33  , m_nEventsBeforeFork(0)
34  , m_nChunkSize(1)
35  , m_nChunkStart(0)
36  , m_nPositionInChunk(0)
37  , m_nEvtRequested(-1)
38  , m_nEvtCounted(0)
39  , m_sharedEventQueue(0)
40  , m_evtShare(0)
41 {
42  declareInterface<IAthenaMPTool>(this);
43 
44  declareProperty("UseSharedReader",m_useSharedReader);
45  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
46  declareProperty("ChunkSize",m_nChunkSize);
47 
48  m_subprocDirPrefix = "evt_counter";
49 }
50 
52 {
53 }
54 
55 int SharedEvtQueueProvider::makePool(int maxevt, int nprocs, const std::string& topdir)
56 {
57  ATH_MSG_DEBUG( "In makePool " << getpid() );
58 
59  if(maxevt < -1) {
60  ATH_MSG_ERROR( "Invalid number of events requested: " << maxevt );
61  return -1;
62  }
63 
64  if(topdir.empty()) {
65  ATH_MSG_ERROR( "Empty name for the top directory!" );
66  return -1;
67  }
68 
69  if(m_nChunkSize<=0) {
70  ATH_MSG_ERROR( "Non-positive chunk size requested: " << m_nChunkSize);
71  return -1;
72  }
73 
74  m_nEvtRequested = maxevt;
75  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
77  m_subprocTopDir = topdir;
78 
79  // Create event queue
80  ATH_MSG_DEBUG( "Event queue name " << "AthenaMPEventQueue_" << m_randStr );
81  StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
82  if(sc.isFailure()) {
83  ATH_MSG_ERROR( "Unable to retrieve the pointer to Shared Event Queue" );
84  return -1;
85  }
86 
87  // Create the process group and map_async bootstrap
89  ATH_MSG_INFO( "Event Counter process created" );
90  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
91  return -1;
92  ATH_MSG_INFO( "Event Counter bootstraped" );
93 
94  return 1;
95 }
96 
98 {
99  ATH_MSG_DEBUG( "In exec " << getpid() );
100 
101  // Map exec flag
102  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC)) {
103  ATH_MSG_ERROR( "Unable to send work to the event counter" );
104  return StatusCode::FAILURE;
105  }
106 
107  // Map exit flag
108  if(m_processGroup->map_async(0,0)){
109  ATH_MSG_ERROR( "Unable to set exit to the event counter" );
110  return StatusCode::FAILURE;
111  }
112  return StatusCode::SUCCESS;
113 }
114 
115 void SharedEvtQueueProvider::subProcessLogs(std::vector<std::string>& filenames)
116 {
117  filenames.clear();
118  std::filesystem::path counter_rundir(m_subprocTopDir);
119  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
120  filenames.push_back(counter_rundir.string()+std::string("/AthenaMP.log"));
121 }
122 
124 {
126  return jobOutputs;
127 }
128 
129 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::bootstrap_func()
130 {
131  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
132  outwork->data = malloc(sizeof(int));
133  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
134  outwork->size = sizeof(int);
135 
136  // ...
137  // (possible) TODO: extend outwork with some error message, which will be eventually
138  // reported in the master proces
139  // ...
140 
141  // ________________________ mkdir ________________________
142  std::filesystem::path counter_rundir(m_subprocTopDir);
143  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
144 
145  if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
146  ATH_MSG_ERROR( "Unable to make event counter run directory: " << counter_rundir.string() << ". " << fmterror(errno) );
147  return outwork;
148  }
149 
150  // ________________________ Redirect logs ________________________
151  if(redirectLog(counter_rundir.string()))
152  return outwork;
153 
154  ATH_MSG_INFO( "Logs redirected in the AthenaMP event event counter PID=" << getpid() );
155 
156  // ________________________ Update Io Registry ____________________________
157  if(updateIoReg(counter_rundir.string()))
158  return outwork;
159 
160  ATH_MSG_INFO( "Io registry updated in the AthenaMP event event counter PID=" << getpid() );
161 
162  // _______________________ Handle saved PFC (if any) ______________________
163  std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
164  if(handleSavedPfc(abs_counter_rundir))
165  return outwork;
166 
167  // ________________________ reopen descriptors ____________________________
168  if(reopenFds())
169  return outwork;
170 
171  ATH_MSG_INFO( "File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
172 
173  // _______________________ event counting ________________________________
174  // Use incident service for registering a EndInputFile handler
175  IIncidentSvc* incsvc(0);
176  StatusCode sc = serviceLocator()->service("IncidentSvc",incsvc);
177  if(sc.isFailure() || incsvc==0) {
178  ATH_MSG_ERROR( "Error retrieving IncidentSvc" );
179  return outwork;
180  }
181 
182  incsvc->addListener(this,"EndInputFile");
183  ATH_MSG_DEBUG( "Added self as listener to EndInputFile" );
184 
185  // _______________________ event sharing ________________________________
186  // Use EventSelector as SharedReader (if configured) and enable output streaming
187  if (m_useSharedReader) {
188  m_evtShare = dynamic_cast<IEventShare*>(m_evtSelector);
189  if(!m_evtShare) {
190  ATH_MSG_ERROR( "Failed to dyncast event selector to IEventShare" );
191  return outwork;
192  } else {
193  if(!m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
194  ATH_MSG_ERROR("Failed to make the event selector a share server");
195  return outwork;
196  } else {
197  ATH_MSG_DEBUG("Successfully made the event selector a share server");
198  }
199  }
200  }
201 
202  // ________________________ I/O reinit ________________________
203  if(!m_ioMgr->io_reinitialize().isSuccess()) {
204  ATH_MSG_ERROR( "Failed to reinitialize I/O" );
205  return outwork;
206  } else {
207  ATH_MSG_DEBUG( "Successfully reinitialized I/O" );
208  }
209 
210  // ________________________ Event selector restart ________________________
211  if(m_evtSelector) {
212  IService* evtSelSvc = dynamic_cast<IService*>(m_evtSelector);
213  if(!evtSelSvc) {
214  ATH_MSG_ERROR( "Failed to dyncast event selector to IService" );
215  return outwork;
216  }
217  if(!evtSelSvc->start().isSuccess()) {
218  ATH_MSG_ERROR( "Failed to restart the event selector" );
219  return outwork;
220  } else {
221  ATH_MSG_DEBUG( "Successfully restarted the event selector" );
222  }
223  }
224  // ________________________ chdir ________________________
225  if(chdir(counter_rundir.string().c_str())==-1) {
226  ATH_MSG_ERROR( "Failed to chdir to " << counter_rundir.string() );
227  return outwork;
228  }
229 
230  // Declare success and return
231  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
232  return outwork;
233 }
234 
235 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::exec_func()
236 {
237  ATH_MSG_INFO("Exec function in the AthenaMP Event Counter PID=" << getpid());
238 
239  bool all_ok(true);
240 
241  int skipEvents(0);
242  IEvtSelector::Context* evtContext(nullptr);
243 
244  // Get SkipEvents property of the event selector
245  if(m_evtSelector) {
246  IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector);
247  if(propertyServer==0) {
248  ATH_MSG_ERROR( "Unable to cast event selector to IProperty" );
249  all_ok=false;
250  }
251  else {
252  std::string propertyName("SkipEvents");
253  IntegerProperty skipEventsProp(propertyName,skipEvents);
254  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
255  ATH_MSG_INFO( "Event Selector does not have SkipEvents property" );
256  }
257  else {
258  skipEvents = skipEventsProp.value();
259  }
260  }
261 
262  if(all_ok) {
263  StatusCode sc = m_evtSelector->createContext(evtContext);
264  if(sc.isFailure()) {
265  ATH_MSG_ERROR("Failed to create the event selector context");
266  all_ok=false;
267  }
268  else {
269  // advance to nEventsBeforeFork
270  for(int i(0); i<m_nEventsBeforeFork;++i) {
271  if(!m_evtSelector->next(*evtContext).isSuccess()) {
272  ATH_MSG_ERROR("Unexpected error: EventsBeforeFork>EventsInInputFiles");
273  all_ok=false;
274  break;
275  }
276  }
277  }
278  }
279  }
280 
281  if(all_ok) {
282  if(m_nEvtRequested!=0) { // Take into account corner case with evtMax=0
286  ATH_MSG_VERBOSE("Starting to go through events. Chunk start = " << m_nChunkStart+1);
287 
288  // Loop through all remaining events
289  while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
291  m_nEvtCounted++;
292  ATH_MSG_VERBOSE("Events Counted " << m_nEvtCounted << ", Position in Chunk " << m_nPositionInChunk);
297  if(m_nEvtCounted==m_nEvtRequested) break;
298  }
299  }
300  }
301 
302  // We are done. Add -m_nEvtCounted m_nprocesses-times to the queue
303  long newValueForQueue = (long)(-m_nEvtCounted);
304  for(int i=0;i<m_nprocesses;++i) {
305  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
306  usleep(1000);
307  }
308  }
309 
310  ATH_MSG_INFO("Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
311  << ", Event Chunk size in the queue is " << m_nChunkSize);
312 
314  if(m_evtShare->readEvent(0).isFailure()) {
315  ATH_MSG_ERROR("Failed to read " << m_nEvtRequested << " events");
316  all_ok=false;
317  } else {
318  ATH_MSG_DEBUG("readEvent succeeded");
319  }
320  }
321 
322  if(m_appMgr->stop().isFailure()) {
323  ATH_MSG_ERROR("Unable to stop AppMgr");
324  all_ok=false;
325  }
326  else {
327  if(m_appMgr->finalize().isFailure()) {
328  std::cerr << "Unable to finalize AppMgr" << std::endl;
329  all_ok=false;
330  }
331  }
332  }
333 
334 
335  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
336  outwork->data = malloc(sizeof(int));
337  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
338  outwork->size = sizeof(int);
339 
340  // ...
341  // (possible) TODO: extend outwork with some error message, which will be eventually
342  // reported in the master proces
343  // ...
344  return outwork;
345 }
346 
347 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::fin_func()
348 {
349  // Dummy
350  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
351  outwork->data = malloc(sizeof(int));
352  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
353  outwork->size = sizeof(int);
354  return outwork;
355 }
356 
357 void SharedEvtQueueProvider::handle(const Incident& inc)
358 {
359  ATH_MSG_DEBUG( "Handling incident" );
360 
361  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
362  if(fileInc == 0) {
363  ATH_MSG_WARNING( "Failed to dyn-cast the incident" );
364  return;
365  }
366 
367  if(fileInc->type()=="EndInputFile") {
368  ATH_MSG_DEBUG( "End Input File reached!" );
369 
373  }
374  }
375 }
376 
378 {
379  ATH_MSG_DEBUG("in addEventsToQueue");
380  long newValueForQueue = ((long)(m_nPositionInChunk-m_nChunkStart)<<(sizeof(int)*8))|m_nChunkStart;
381  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
382  usleep(100);
383  }
384  ATH_MSG_INFO("Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
385  << " which corresponds to Chunks start " << m_nChunkStart
386  << " and chunk size " << m_nPositionInChunk-m_nChunkStart);
387 }
388 
SharedEvtQueueProvider::SharedEvtQueueProvider
SharedEvtQueueProvider()
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
max
#define max(a, b)
Definition: cfImp.cxx:41
AthenaMPToolBase::m_evtSelector
IEvtSelector * m_evtSelector
Definition: AthenaMPToolBase.h:94
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
SharedEvtQueueProvider::m_useSharedReader
bool m_useSharedReader
Definition: SharedEvtQueueProvider.h:47
SharedEvtQueueProvider::m_nEvtRequested
int m_nEvtRequested
Definition: SharedEvtQueueProvider.h:54
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
SharedEvtQueueProvider::m_nChunkStart
int m_nChunkStart
Definition: SharedEvtQueueProvider.h:50
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:32
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:362
SharedEvtQueueProvider::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: SharedEvtQueueProvider.h:48
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
ProcessGroup.h
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
SharedEvtQueueProvider::m_nChunkSize
int m_nChunkSize
Definition: SharedEvtQueueProvider.h:49
SharedEvtQueueProvider::addEventsToQueue
void addEventsToQueue()
Definition: SharedEvtQueueProvider.cxx:377
SharedEvtQueueProvider::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueProvider.h:57
SharedEvtQueueProvider::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: SharedEvtQueueProvider.cxx:123
SharedEvtQueueProvider::m_nprocesses
int m_nprocesses
Definition: SharedEvtQueueProvider.h:45
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
SharedEvtQueueProvider::handle
virtual void handle(const Incident &inc) override
Definition: SharedEvtQueueProvider.cxx:357
SharedEvtQueueProvider::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueProvider.cxx:115
SharedEvtQueueProvider::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueProvider.cxx:235
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
IEventShare
Abstract interface for sharing within an event stream.
Definition: IEventShare.h:28
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:92
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedEvtQueueProvider::m_nEvtCounted
int m_nEvtCounted
Definition: SharedEvtQueueProvider.h:55
test_pyathena.parent
parent
Definition: test_pyathena.py:15
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:129
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:341
IEventShare::makeServer
virtual StatusCode makeServer(int num)=0
Make this a server.
SharedEvtQueueProvider::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueProvider.cxx:347
AthenaInterprocess::SharedQueue::try_send_basic
bool try_send_basic(T)
Definition: SharedQueue.h:88
SharedEvtQueueProvider::m_evtShare
IEventShare * m_evtShare
Definition: SharedEvtQueueProvider.h:58
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
SharedEvtQueueProvider::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueProvider.cxx:129
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:278
SharedEvtQueueProvider.h
SharedEvtQueueProvider::~SharedEvtQueueProvider
virtual ~SharedEvtQueueProvider() override
Definition: SharedEvtQueueProvider.cxx:51
grepfile.filenames
list filenames
Definition: grepfile.py:34
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
IEventShare::readEvent
virtual StatusCode readEvent(int maxevt)=0
Read the next maxevt events.
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:425
SharedEvtQueueProvider::m_nPositionInChunk
int m_nPositionInChunk
Definition: SharedEvtQueueProvider.h:51
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:369