ATLAS Offline Software
SharedEvtQueueProvider.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
7 
8 #include "GaudiKernel/IEvtSelector.h"
9 #include "GaudiKernel/IIncidentSvc.h"
10 #include "GaudiKernel/FileIncident.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/ISvcLocator.h"
14 #include "CxxUtils/xmalloc.h"
15 
16 #include <boost/interprocess/shared_memory_object.hpp>
17 #include <boost/interprocess/mapped_region.hpp>
18 
19 #include <sys/stat.h>
20 #include <sstream>
21 #include <fstream>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <stdexcept>
26 #include <filesystem>
27 
29  , const std::string& name
30  , const IInterface* parent)
31  : base_class(type,name,parent)
32 {
33  m_subprocDirPrefix = "evt_counter";
34 }
35 
37 {
38 }
39 
40 int SharedEvtQueueProvider::makePool(int maxevt, int nprocs, const std::string& topdir)
41 {
42  ATH_MSG_DEBUG( "In makePool " << getpid() );
43 
44  if(maxevt < -1) {
45  ATH_MSG_ERROR( "Invalid number of events requested: " << maxevt );
46  return -1;
47  }
48 
49  if(topdir.empty()) {
50  ATH_MSG_ERROR( "Empty name for the top directory!" );
51  return -1;
52  }
53 
54  if(m_nChunkSize<=0) {
55  ATH_MSG_ERROR( "Non-positive chunk size requested: " << m_nChunkSize);
56  return -1;
57  }
58 
59  m_nEvtRequested = maxevt;
60  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
61  m_nprocesses = m_nprocs;
62  m_subprocTopDir = topdir;
63 
64  // Create event queue
65  ATH_MSG_DEBUG( "Event queue name " << "AthenaMPEventQueue_" << m_randStr );
66  ATH_CHECK( detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr), -1);
67 
68  // Create the process group and map_async bootstrap
69  m_processGroup = new AthenaInterprocess::ProcessGroup(1);
70  ATH_MSG_INFO( "Event Counter process created" );
71  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
72  return -1;
73  ATH_MSG_INFO( "Event Counter bootstrapped" );
74 
75  return 1;
76 }
77 
79 {
80  ATH_MSG_DEBUG( "In exec " << getpid() );
81 
82  // Map exec flag
83  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC)) {
84  ATH_MSG_ERROR( "Unable to send work to the event counter" );
85  return StatusCode::FAILURE;
86  }
87 
88  // Map exit flag
89  if(m_processGroup->map_async(0,0)){
90  ATH_MSG_ERROR( "Unable to set exit to the event counter" );
91  return StatusCode::FAILURE;
92  }
93  return StatusCode::SUCCESS;
94 }
95 
96 void SharedEvtQueueProvider::subProcessLogs(std::vector<std::string>& filenames)
97 {
98  filenames.clear();
99  std::filesystem::path counter_rundir(m_subprocTopDir);
100  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
101  filenames.push_back(counter_rundir.string()+std::string("/AthenaMP.log"));
102 }
103 
105 {
107  return jobOutputs;
108 }
109 
110 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::bootstrap_func()
111 {
112  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
113  outwork->data = CxxUtils::xmalloc(sizeof(int));
114  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
115  outwork->size = sizeof(int);
116 
117  // ...
118  // (possible) TODO: extend outwork with some error message, which will be eventually
119  // reported in the master proces
120  // ...
121 
122  // ________________________ mkdir ________________________
123  std::filesystem::path counter_rundir(m_subprocTopDir);
124  counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
125 
126  if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
127  ATH_MSG_ERROR( "Unable to make event counter run directory: " << counter_rundir.string() << ". " << fmterror(errno) );
128  return outwork;
129  }
130 
131  // ________________________ Redirect logs ________________________
132  if(redirectLog(counter_rundir.string()))
133  return outwork;
134 
135  ATH_MSG_INFO( "Logs redirected in the AthenaMP event event counter PID=" << getpid() );
136 
137  // ________________________ Update Io Registry ____________________________
138  if(updateIoReg(counter_rundir.string()))
139  return outwork;
140 
141  ATH_MSG_INFO( "Io registry updated in the AthenaMP event event counter PID=" << getpid() );
142 
143  // _______________________ Handle saved PFC (if any) ______________________
144  std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
145  if(handleSavedPfc(abs_counter_rundir))
146  return outwork;
147 
148  // ________________________ reopen descriptors ____________________________
149  if(reopenFds())
150  return outwork;
151 
152  ATH_MSG_INFO( "File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
153 
154  // _______________________ event counting ________________________________
155  // Use incident service for registering a EndInputFile handler
156  SmartIF<IIncidentSvc> incsvc(serviceLocator()->service("IncidentSvc"));
157  if(!incsvc) {
158  ATH_MSG_ERROR( "Error retrieving IncidentSvc" );
159  return outwork;
160  }
161 
162  incsvc->addListener(this,"EndInputFile");
163  ATH_MSG_DEBUG( "Added self as listener to EndInputFile" );
164 
165  // _______________________ event sharing ________________________________
166  // Use EventSelector as SharedReader (if configured) and enable output streaming
167  if (m_useSharedReader) {
168  m_evtShare = SmartIF<IEventShare>(m_evtSelector);
169  if(!m_evtShare) {
170  ATH_MSG_ERROR( "Failed to dyncast event selector to IEventShare" );
171  return outwork;
172  } else {
173  if(!m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
174  ATH_MSG_ERROR("Failed to make the event selector a share server");
175  return outwork;
176  } else {
177  ATH_MSG_DEBUG("Successfully made the event selector a share server");
178  }
179  }
180  }
181 
182  // ________________________ I/O reinit ________________________
183  if(!m_ioMgr->io_reinitialize().isSuccess()) {
184  ATH_MSG_ERROR( "Failed to reinitialize I/O" );
185  return outwork;
186  } else {
187  ATH_MSG_DEBUG( "Successfully reinitialized I/O" );
188  }
189 
190  // ________________________ Event selector restart ________________________
191  if(m_evtSelector) {
192  SmartIF<IService> evtSelSvc(m_evtSelector);
193  if(!evtSelSvc) {
194  ATH_MSG_ERROR( "Failed to dyncast event selector to IService" );
195  return outwork;
196  }
197  if(!evtSelSvc->start().isSuccess()) {
198  ATH_MSG_ERROR( "Failed to restart the event selector" );
199  return outwork;
200  } else {
201  ATH_MSG_DEBUG( "Successfully restarted the event selector" );
202  }
203  }
204  // ________________________ chdir ________________________
205  if(chdir(counter_rundir.string().c_str())==-1) {
206  ATH_MSG_ERROR( "Failed to chdir to " << counter_rundir.string() );
207  return outwork;
208  }
209 
210  // Declare success and return
211  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
212  return outwork;
213 }
214 
215 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::exec_func()
216 {
217  ATH_MSG_INFO("Exec function in the AthenaMP Event Counter PID=" << getpid());
218 
219  bool all_ok(true);
220 
221  int skipEvents(0);
222  IEvtSelector::Context* evtContext(nullptr);
223 
224  // Get SkipEvents property of the event selector
225  if(m_evtSelector) {
226  SmartIF<IProperty> propertyServer(m_evtSelector);
227  if(propertyServer==0) {
228  ATH_MSG_ERROR( "Unable to cast event selector to IProperty" );
229  all_ok=false;
230  }
231  else {
232  std::string propertyName("SkipEvents");
233  IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
234  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
235  ATH_MSG_INFO( "Event Selector does not have SkipEvents property" );
236  }
237  else {
238  skipEvents = skipEventsProp.value();
239  }
240  }
241 
242  if(all_ok) {
243  StatusCode sc = m_evtSelector->createContext(evtContext);
244  if(sc.isFailure()) {
245  ATH_MSG_ERROR("Failed to create the event selector context");
246  all_ok=false;
247  }
248  else {
249  // advance to nEventsBeforeFork
250  for(int i(0); i<m_nEventsBeforeFork;++i) {
251  if(!m_evtSelector->next(*evtContext).isSuccess()) {
252  ATH_MSG_ERROR("Unexpected error: EventsBeforeFork>EventsInInputFiles");
253  all_ok=false;
254  break;
255  }
256  }
257  }
258  }
259  }
260 
261  if(all_ok) {
262  if(m_nEvtRequested!=0) { // Take into account corner case with evtMax=0
266  ATH_MSG_VERBOSE("Starting to go through events. Chunk start = " << m_nChunkStart+1);
267 
268  // Loop through all remaining events
269  while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
271  m_nEvtCounted++;
272  ATH_MSG_VERBOSE("Events Counted " << m_nEvtCounted << ", Position in Chunk " << m_nPositionInChunk);
277  if(m_nEvtCounted==m_nEvtRequested) break;
278  }
279  }
280  }
281 
282  // We are done. Add -m_nEvtCounted m_nprocesses-times to the queue
283  long newValueForQueue = (long)(-m_nEvtCounted);
284  for(int i=0;i<m_nprocesses;++i) {
285  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
286  usleep(1000);
287  }
288  }
289 
290  ATH_MSG_INFO("Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
291  << ", Event Chunk size in the queue is " << m_nChunkSize);
292 
294  if(m_evtShare->readEvent(0).isFailure()) {
295  ATH_MSG_ERROR("Failed to read " << m_nEvtRequested << " events");
296  all_ok=false;
297  } else {
298  ATH_MSG_DEBUG("readEvent succeeded");
299  }
300  }
301 
302  if(m_appMgr->stop().isFailure()) {
303  ATH_MSG_ERROR("Unable to stop AppMgr");
304  all_ok=false;
305  }
306  else {
307  if(m_appMgr->finalize().isFailure()) {
308  std::cerr << "Unable to finalize AppMgr" << std::endl;
309  all_ok=false;
310  }
311  }
312  }
313 
314 
315  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
316  outwork->data = CxxUtils::xmalloc(sizeof(int));
317  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
318  outwork->size = sizeof(int);
319 
320  // ...
321  // (possible) TODO: extend outwork with some error message, which will be eventually
322  // reported in the master proces
323  // ...
324  return outwork;
325 }
326 
327 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::fin_func()
328 {
329  // Dummy
330  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
331  outwork->data = CxxUtils::xmalloc(sizeof(int));
332  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
333  outwork->size = sizeof(int);
334  return outwork;
335 }
336 
337 void SharedEvtQueueProvider::handle(const Incident& inc)
338 {
339  ATH_MSG_DEBUG( "Handling incident" );
340 
341  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
342  if(fileInc == 0) {
343  ATH_MSG_WARNING( "Failed to dyn-cast the incident" );
344  return;
345  }
346 
347  if(fileInc->type()=="EndInputFile") {
348  ATH_MSG_DEBUG( "End Input File reached!" );
349 
353  }
354  }
355 }
356 
358 {
359  ATH_MSG_DEBUG("in addEventsToQueue");
360  long newValueForQueue = ((long)(m_nPositionInChunk-m_nChunkStart)<<(sizeof(int)*8))|m_nChunkStart;
361  while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
362  usleep(100);
363  }
364  ATH_MSG_INFO("Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
365  << " which corresponds to Chunks start " << m_nChunkStart
366  << " and chunk size " << m_nPositionInChunk-m_nChunkStart);
367 }
368 
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
SharedEvtQueueProvider::m_nChunkSize
Gaudi::Property< int > m_nChunkSize
Definition: SharedEvtQueueProvider.h:45
SharedEvtQueueProvider::SharedEvtQueueProvider
SharedEvtQueueProvider()
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
SharedEvtQueueProvider::m_nEvtRequested
int m_nEvtRequested
Max event received from AppMgr.
Definition: SharedEvtQueueProvider.h:56
SharedEvtQueueProvider::m_nChunkStart
int m_nChunkStart
The beginning of the current chunk.
Definition: SharedEvtQueueProvider.h:53
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
SharedEvtQueueProvider::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: SharedEvtQueueProvider.h:44
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
ProcessGroup.h
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
SharedEvtQueueProvider::addEventsToQueue
void addEventsToQueue()
Definition: SharedEvtQueueProvider.cxx:357
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
SharedEvtQueueProvider::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueProvider.h:59
SharedEvtQueueProvider::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: SharedEvtQueueProvider.cxx:104
SharedEvtQueueProvider::m_nprocesses
int m_nprocesses
We use this data member for adding negative numbers at the end of the event queue.
Definition: SharedEvtQueueProvider.h:52
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:55
SharedEvtQueueProvider::handle
virtual void handle(const Incident &inc) override
Definition: SharedEvtQueueProvider.cxx:337
SharedEvtQueueProvider::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueProvider.cxx:96
SharedEvtQueueProvider::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueProvider.cxx:215
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:85
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedEvtQueueProvider::m_nEvtCounted
int m_nEvtCounted
The number of events this tool has counted itself in the input files.
Definition: SharedEvtQueueProvider.h:57
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
python.PyKernel.detStore
detStore
Definition: PyKernel.py:41
SharedEvtQueueProvider::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueProvider.cxx:327
AthenaInterprocess::SharedQueue::try_send_basic
bool try_send_basic(T)
Definition: SharedQueue.h:88
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
SharedEvtQueueProvider::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueProvider.cxx:110
SharedEvtQueueProvider.h
SharedEvtQueueProvider::~SharedEvtQueueProvider
virtual ~SharedEvtQueueProvider() override
Definition: SharedEvtQueueProvider.cxx:36
SharedEvtQueueProvider::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueProvider.h:60
grepfile.filenames
list filenames
Definition: grepfile.py:34
SharedEvtQueueProvider::m_useSharedReader
Gaudi::Property< bool > m_useSharedReader
Definition: SharedEvtQueueProvider.h:43
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
SharedEvtQueueProvider::m_nPositionInChunk
int m_nPositionInChunk
Position within the current chunk.
Definition: SharedEvtQueueProvider.h:54
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
xmalloc.h
Trapping version of malloc.