ATLAS Offline Software
Loading...
Searching...
No Matches
SharedEvtQueueProvider.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
7
8#include "GaudiKernel/IEvtSelector.h"
9#include "GaudiKernel/IIncidentSvc.h"
10#include "GaudiKernel/FileIncident.h"
11#include "GaudiKernel/IIoComponentMgr.h"
12#include "GaudiKernel/ISvcLocator.h"
14#include "CxxUtils/xmalloc.h"
15
16#include <boost/interprocess/shared_memory_object.hpp>
17#include <boost/interprocess/mapped_region.hpp>
18
19#include <sys/stat.h>
20#include <sstream>
21#include <fstream>
22#include <unistd.h>
23#include <stdio.h>
24#include <stdint.h>
25#include <stdexcept>
26#include <filesystem>
27
29 , const std::string& name
30 , const IInterface* parent)
31 : base_class(type,name,parent)
32{
33 m_subprocDirPrefix = "evt_counter";
34}
35
39
40int SharedEvtQueueProvider::makePool(int maxevt, int nprocs, const std::string& topdir)
41{
42 ATH_MSG_DEBUG( "In makePool " << getpid() );
43
44 if(maxevt < -1) {
45 ATH_MSG_ERROR( "Invalid number of events requested: " << maxevt );
46 return -1;
47 }
48
49 if(topdir.empty()) {
50 ATH_MSG_ERROR( "Empty name for the top directory!" );
51 return -1;
52 }
53
54 if(m_nChunkSize<=0) {
55 ATH_MSG_ERROR( "Non-positive chunk size requested: " << m_nChunkSize);
56 return -1;
57 }
58
59 m_nEvtRequested = maxevt;
60 m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
61 m_nprocesses = m_nprocs;
62 m_subprocTopDir = topdir;
63
64 // Create event queue
65 ATH_MSG_DEBUG( "Event queue name " << "AthenaMPEventQueue_" << m_randStr );
66 ATH_CHECK( detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr), -1);
67
68 // Create the process group and map_async bootstrap
69 m_processGroup = new AthenaInterprocess::ProcessGroup(1);
70 ATH_MSG_INFO( "Event Counter process created" );
71 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
72 return -1;
73 ATH_MSG_INFO( "Event Counter bootstrapped" );
74
75 return 1;
76}
77
78StatusCode SharedEvtQueueProvider::exec()
79{
80 ATH_MSG_DEBUG( "In exec " << getpid() );
81
82 // Map exec flag
83 if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC)) {
84 ATH_MSG_ERROR( "Unable to send work to the event counter" );
85 return StatusCode::FAILURE;
86 }
87
88 // Map exit flag
89 if(m_processGroup->map_async(0,0)){
90 ATH_MSG_ERROR( "Unable to set exit to the event counter" );
91 return StatusCode::FAILURE;
92 }
93 return StatusCode::SUCCESS;
94}
95
96void SharedEvtQueueProvider::subProcessLogs(std::vector<std::string>& filenames)
97{
98 filenames.clear();
99 std::filesystem::path counter_rundir(m_subprocTopDir);
100 counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
101 filenames.push_back(counter_rundir.string()+std::string("/AthenaMP.log"));
102}
103
109
110std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::bootstrap_func()
111{
112 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
113 outwork->data = CxxUtils::xmalloc(sizeof(int));
114 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
115 outwork->size = sizeof(int);
116
117 // ...
118 // (possible) TODO: extend outwork with some error message, which will be eventually
119 // reported in the master proces
120 // ...
121
122 // ________________________ mkdir ________________________
123 std::filesystem::path counter_rundir(m_subprocTopDir);
124 counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
125
126 if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
127 ATH_MSG_ERROR( "Unable to make event counter run directory: " << counter_rundir.string() << ". " << fmterror(errno) );
128 return outwork;
129 }
130
131 // ________________________ Redirect logs ________________________
132 if(redirectLog(counter_rundir.string()))
133 return outwork;
134
135 ATH_MSG_INFO( "Logs redirected in the AthenaMP event event counter PID=" << getpid() );
136
137 // ________________________ Update Io Registry ____________________________
138 if(updateIoReg(counter_rundir.string()))
139 return outwork;
140
141 ATH_MSG_INFO( "Io registry updated in the AthenaMP event event counter PID=" << getpid() );
142
143 // _______________________ Handle saved PFC (if any) ______________________
144 std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
145 if(handleSavedPfc(abs_counter_rundir))
146 return outwork;
147
148 // ________________________ reopen descriptors ____________________________
149 if(reopenFds())
150 return outwork;
151
152 ATH_MSG_INFO( "File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
153
154 // _______________________ event counting ________________________________
155 // Use incident service for registering a EndInputFile handler
156 SmartIF<IIncidentSvc> incsvc(serviceLocator()->service("IncidentSvc"));
157 if(!incsvc) {
158 ATH_MSG_ERROR( "Error retrieving IncidentSvc" );
159 return outwork;
160 }
161
162 incsvc->addListener(this,"EndInputFile");
163 ATH_MSG_DEBUG( "Added self as listener to EndInputFile" );
164
165 // _______________________ event sharing ________________________________
166 // Use EventSelector as SharedReader (if configured) and enable output streaming
167 if (m_useSharedReader) {
168 m_evtShare = SmartIF<IEventShare>(m_evtSelector);
169 if(!m_evtShare) {
170 ATH_MSG_ERROR( "Failed to dyncast event selector to IEventShare" );
171 return outwork;
172 } else {
173 if(!m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
174 ATH_MSG_ERROR("Failed to make the event selector a share server");
175 return outwork;
176 } else {
177 ATH_MSG_DEBUG("Successfully made the event selector a share server");
178 }
179 }
180 }
181
182 // ________________________ I/O reinit ________________________
183 if(!m_ioMgr->io_reinitialize().isSuccess()) {
184 ATH_MSG_ERROR( "Failed to reinitialize I/O" );
185 return outwork;
186 } else {
187 ATH_MSG_DEBUG( "Successfully reinitialized I/O" );
188 }
189
190 // ________________________ Event selector restart ________________________
191 if(m_evtSelector) {
192 SmartIF<IService> evtSelSvc(m_evtSelector);
193 if(!evtSelSvc) {
194 ATH_MSG_ERROR( "Failed to dyncast event selector to IService" );
195 return outwork;
196 }
197 if(!evtSelSvc->start().isSuccess()) {
198 ATH_MSG_ERROR( "Failed to restart the event selector" );
199 return outwork;
200 } else {
201 ATH_MSG_DEBUG( "Successfully restarted the event selector" );
202 }
203 }
204 // ________________________ chdir ________________________
205 if(chdir(counter_rundir.string().c_str())==-1) {
206 ATH_MSG_ERROR( "Failed to chdir to " << counter_rundir.string() );
207 return outwork;
208 }
209
210 // Declare success and return
211 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
212 return outwork;
213}
214
215std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::exec_func()
216{
217 ATH_MSG_INFO("Exec function in the AthenaMP Event Counter PID=" << getpid());
218
219 bool all_ok(true);
220
221 int skipEvents(0);
222 IEvtSelector::Context* evtContext(nullptr);
223
224 // Get SkipEvents property of the event selector
225 if(m_evtSelector) {
226 SmartIF<IProperty> propertyServer(m_evtSelector);
227 if(propertyServer==0) {
228 ATH_MSG_ERROR( "Unable to cast event selector to IProperty" );
229 all_ok=false;
230 }
231 else {
232 std::string propertyName("SkipEvents");
233 IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
234 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
235 ATH_MSG_INFO( "Event Selector does not have SkipEvents property" );
236 }
237 else {
238 skipEvents = skipEventsProp.value();
239 }
240 }
241
242 if(all_ok) {
243 StatusCode sc = m_evtSelector->createContext(evtContext);
244 if(sc.isFailure()) {
245 ATH_MSG_ERROR("Failed to create the event selector context");
246 all_ok=false;
247 }
248 else {
249 // advance to nEventsBeforeFork
250 for(int i(0); i<m_nEventsBeforeFork;++i) {
251 if(!m_evtSelector->next(*evtContext).isSuccess()) {
252 ATH_MSG_ERROR("Unexpected error: EventsBeforeFork>EventsInInputFiles");
253 all_ok=false;
254 break;
255 }
256 }
257 }
258 }
259 }
260
261 if(all_ok) {
262 if(m_nEvtRequested!=0) { // Take into account corner case with evtMax=0
266 ATH_MSG_VERBOSE("Starting to go through events. Chunk start = " << m_nChunkStart+1);
267
268 // Loop through all remaining events
269 while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
272 ATH_MSG_VERBOSE("Events Counted " << m_nEvtCounted << ", Position in Chunk " << m_nPositionInChunk);
278 }
279 }
280 }
281
282 // We are done. Add -m_nEvtCounted m_nprocesses-times to the queue
283 long newValueForQueue = (long)(-m_nEvtCounted);
284 for(int i=0;i<m_nprocesses;++i) {
285 while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
286 usleep(1000);
287 }
288 }
289
290 ATH_MSG_INFO("Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
291 << ", Event Chunk size in the queue is " << m_nChunkSize);
292
294 if(m_evtShare->readEvent(0).isFailure()) {
295 ATH_MSG_ERROR("Failed to read " << m_nEvtRequested << " events");
296 all_ok=false;
297 } else {
298 ATH_MSG_DEBUG("readEvent succeeded");
299 }
300 }
301
302 if(m_appMgr->stop().isFailure()) {
303 ATH_MSG_ERROR("Unable to stop AppMgr");
304 all_ok=false;
305 }
306 else {
307 if(m_appMgr->finalize().isFailure()) {
308 std::cerr << "Unable to finalize AppMgr" << std::endl;
309 all_ok=false;
310 }
311 }
312 }
313
314
315 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
316 outwork->data = CxxUtils::xmalloc(sizeof(int));
317 *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
318 outwork->size = sizeof(int);
319
320 // ...
321 // (possible) TODO: extend outwork with some error message, which will be eventually
322 // reported in the master proces
323 // ...
324 return outwork;
325}
326
327std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueProvider::fin_func()
328{
329 // Dummy
330 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
331 outwork->data = CxxUtils::xmalloc(sizeof(int));
332 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
333 outwork->size = sizeof(int);
334 return outwork;
335}
336
337void SharedEvtQueueProvider::handle(const Incident& inc)
338{
339 ATH_MSG_DEBUG( "Handling incident" );
340
341 const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
342 if(fileInc == 0) {
343 ATH_MSG_WARNING( "Failed to dyn-cast the incident" );
344 return;
345 }
346
347 if(fileInc->type()=="EndInputFile") {
348 ATH_MSG_DEBUG( "End Input File reached!" );
349
353 }
354 }
355}
356
358{
359 ATH_MSG_DEBUG("in addEventsToQueue");
360 long newValueForQueue = ((long)(m_nPositionInChunk-m_nChunkStart)<<(sizeof(int)*8))|m_nChunkStart;
361 while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
362 usleep(100);
363 }
364 ATH_MSG_INFO("Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
365 << " which corresponds to Chunks start " << m_nChunkStart
366 << " and chunk size " << m_nPositionInChunk-m_nChunkStart);
367}
368
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
static Double_t sc
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
int m_nEvtRequested
Max event received from AppMgr.
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
int m_nprocesses
We use this data member for adding negative numbers at the end of the event queue.
AthenaInterprocess::SharedQueue * m_sharedEventQueue
virtual ~SharedEvtQueueProvider() override
Gaudi::Property< int > m_nEventsBeforeFork
SmartIF< IEventShare > m_evtShare
int m_nEvtCounted
The number of events this tool has counted itself in the input files.
Gaudi::Property< bool > m_useSharedReader
Gaudi::Property< int > m_nChunkSize
virtual void handle(const Incident &inc) override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
int m_nChunkStart
The beginning of the current chunk.
int m_nPositionInChunk
Position within the current chunk.
virtual void subProcessLogs(std::vector< std::string > &) override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
retrieve(aClass, aKey=None)
Definition PyKernel.py:110
Trapping version of malloc.