ATLAS Offline Software
Loading...
Searching...
No Matches
SharedEvtQueueProvider Class Referencefinal

#include <SharedEvtQueueProvider.h>

Inheritance diagram for SharedEvtQueueProvider:
Collaboration diagram for SharedEvtQueueProvider:

Public Member Functions

 SharedEvtQueueProvider (const std::string &type, const std::string &name, const IInterface *parent)
virtual ~SharedEvtQueueProvider () override
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
virtual void subProcessLogs (std::vector< std::string > &) override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
virtual void handle (const Incident &inc) override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override

Private Member Functions

 SharedEvtQueueProvider ()
 SharedEvtQueueProvider (const SharedEvtQueueProvider &)
SharedEvtQueueProvideroperator= (const SharedEvtQueueProvider &)
void addEventsToQueue ()

Private Attributes

Gaudi::Property< bool > m_useSharedReader {this, "UseSharedReader", false, "Use shared reader"}
Gaudi::Property< int > m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
Gaudi::Property< int > m_nChunkSize {this, "ChunkSize", 1}
int m_nprocesses {-1}
 We use this data member for adding negative numbers at the end of the event queue.
int m_nChunkStart {0}
 The beginning of the current chunk.
int m_nPositionInChunk {0}
 Position within the current chunk.
int m_nEvtRequested {-1}
 Max event received from AppMgr.
int m_nEvtCounted {0}
 The number of events this tool has counted itself in the input files.
AthenaInterprocess::SharedQueuem_sharedEventQueue {nullptr}
SmartIF< IEventSharem_evtShare

Detailed Description

Definition at line 14 of file SharedEvtQueueProvider.h.

Constructor & Destructor Documentation

◆ SharedEvtQueueProvider() [1/3]

SharedEvtQueueProvider::SharedEvtQueueProvider ( const std::string & type,
const std::string & name,
const IInterface * parent )

Definition at line 28 of file SharedEvtQueueProvider.cxx.

31 : base_class(type,name,parent)
32{
33 m_subprocDirPrefix = "evt_counter";
34}

◆ ~SharedEvtQueueProvider()

SharedEvtQueueProvider::~SharedEvtQueueProvider ( )
overridevirtual

Definition at line 36 of file SharedEvtQueueProvider.cxx.

37{
38}

◆ SharedEvtQueueProvider() [2/3]

SharedEvtQueueProvider::SharedEvtQueueProvider ( )
private

◆ SharedEvtQueueProvider() [3/3]

SharedEvtQueueProvider::SharedEvtQueueProvider ( const SharedEvtQueueProvider & )
private

Member Function Documentation

◆ addEventsToQueue()

void SharedEvtQueueProvider::addEventsToQueue ( )
private

Definition at line 357 of file SharedEvtQueueProvider.cxx.

358{
359 ATH_MSG_DEBUG("in addEventsToQueue");
360 long newValueForQueue = ((long)(m_nPositionInChunk-m_nChunkStart)<<(sizeof(int)*8))|m_nChunkStart;
361 while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
362 usleep(100);
363 }
364 ATH_MSG_INFO("Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
365 << " which corresponds to Chunks start " << m_nChunkStart
366 << " and chunk size " << m_nPositionInChunk-m_nChunkStart);
367}
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
AthenaInterprocess::SharedQueue * m_sharedEventQueue
int m_nChunkStart
The beginning of the current chunk.
int m_nPositionInChunk
Position within the current chunk.

◆ ATLAS_NOT_THREAD_SAFE() [1/2]

virtual StatusCode exec SharedEvtQueueProvider::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [2/2]

virtual int makePool SharedEvtQueueProvider::ATLAS_NOT_THREAD_SAFE ( int maxevt,
int nprocs,
const std::string & topdir )
overridevirtual

◆ bootstrap_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueProvider::bootstrap_func ( )
overridevirtual

Definition at line 110 of file SharedEvtQueueProvider.cxx.

111{
112 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
113 outwork->data = CxxUtils::xmalloc(sizeof(int));
114 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
115 outwork->size = sizeof(int);
116
117 // ...
118 // (possible) TODO: extend outwork with some error message, which will be eventually
119 // reported in the master proces
120 // ...
121
122 // ________________________ mkdir ________________________
123 std::filesystem::path counter_rundir(m_subprocTopDir);
124 counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
125
126 if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
127 ATH_MSG_ERROR( "Unable to make event counter run directory: " << counter_rundir.string() << ". " << fmterror(errno) );
128 return outwork;
129 }
130
131 // ________________________ Redirect logs ________________________
132 if(redirectLog(counter_rundir.string()))
133 return outwork;
134
135 ATH_MSG_INFO( "Logs redirected in the AthenaMP event event counter PID=" << getpid() );
136
137 // ________________________ Update Io Registry ____________________________
138 if(updateIoReg(counter_rundir.string()))
139 return outwork;
140
141 ATH_MSG_INFO( "Io registry updated in the AthenaMP event event counter PID=" << getpid() );
142
143 // _______________________ Handle saved PFC (if any) ______________________
144 std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
145 if(handleSavedPfc(abs_counter_rundir))
146 return outwork;
147
148 // ________________________ reopen descriptors ____________________________
149 if(reopenFds())
150 return outwork;
151
152 ATH_MSG_INFO( "File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
153
154 // _______________________ event counting ________________________________
155 // Use incident service for registering a EndInputFile handler
156 SmartIF<IIncidentSvc> incsvc(serviceLocator()->service("IncidentSvc"));
157 if(!incsvc) {
158 ATH_MSG_ERROR( "Error retrieving IncidentSvc" );
159 return outwork;
160 }
161
162 incsvc->addListener(this,"EndInputFile");
163 ATH_MSG_DEBUG( "Added self as listener to EndInputFile" );
164
165 // _______________________ event sharing ________________________________
166 // Use EventSelector as SharedReader (if configured) and enable output streaming
167 if (m_useSharedReader) {
168 m_evtShare = SmartIF<IEventShare>(m_evtSelector);
169 if(!m_evtShare) {
170 ATH_MSG_ERROR( "Failed to dyncast event selector to IEventShare" );
171 return outwork;
172 } else {
173 if(!m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
174 ATH_MSG_ERROR("Failed to make the event selector a share server");
175 return outwork;
176 } else {
177 ATH_MSG_DEBUG("Successfully made the event selector a share server");
178 }
179 }
180 }
181
182 // ________________________ I/O reinit ________________________
183 if(!m_ioMgr->io_reinitialize().isSuccess()) {
184 ATH_MSG_ERROR( "Failed to reinitialize I/O" );
185 return outwork;
186 } else {
187 ATH_MSG_DEBUG( "Successfully reinitialized I/O" );
188 }
189
190 // ________________________ Event selector restart ________________________
191 if(m_evtSelector) {
192 SmartIF<IService> evtSelSvc(m_evtSelector);
193 if(!evtSelSvc) {
194 ATH_MSG_ERROR( "Failed to dyncast event selector to IService" );
195 return outwork;
196 }
197 if(!evtSelSvc->start().isSuccess()) {
198 ATH_MSG_ERROR( "Failed to restart the event selector" );
199 return outwork;
200 } else {
201 ATH_MSG_DEBUG( "Successfully restarted the event selector" );
202 }
203 }
204 // ________________________ chdir ________________________
205 if(chdir(counter_rundir.string().c_str())==-1) {
206 ATH_MSG_ERROR( "Failed to chdir to " << counter_rundir.string() );
207 return outwork;
208 }
209
210 // Declare success and return
211 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
212 return outwork;
213}
#define ATH_MSG_ERROR(x)
SmartIF< IEventShare > m_evtShare
Gaudi::Property< bool > m_useSharedReader
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
mkdir(path, recursive=True)

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueProvider::exec_func ( )
overridevirtual

Definition at line 215 of file SharedEvtQueueProvider.cxx.

216{
217 ATH_MSG_INFO("Exec function in the AthenaMP Event Counter PID=" << getpid());
218
219 bool all_ok(true);
220
221 int skipEvents(0);
222 IEvtSelector::Context* evtContext(nullptr);
223
224 // Get SkipEvents property of the event selector
225 if(m_evtSelector) {
226 SmartIF<IProperty> propertyServer(m_evtSelector);
227 if(propertyServer==0) {
228 ATH_MSG_ERROR( "Unable to cast event selector to IProperty" );
229 all_ok=false;
230 }
231 else {
232 std::string propertyName("SkipEvents");
233 IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
234 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
235 ATH_MSG_INFO( "Event Selector does not have SkipEvents property" );
236 }
237 else {
238 skipEvents = skipEventsProp.value();
239 }
240 }
241
242 if(all_ok) {
243 StatusCode sc = m_evtSelector->createContext(evtContext);
244 if(sc.isFailure()) {
245 ATH_MSG_ERROR("Failed to create the event selector context");
246 all_ok=false;
247 }
248 else {
249 // advance to nEventsBeforeFork
250 for(int i(0); i<m_nEventsBeforeFork;++i) {
251 if(!m_evtSelector->next(*evtContext).isSuccess()) {
252 ATH_MSG_ERROR("Unexpected error: EventsBeforeFork>EventsInInputFiles");
253 all_ok=false;
254 break;
255 }
256 }
257 }
258 }
259 }
260
261 if(all_ok) {
262 if(m_nEvtRequested!=0) { // Take into account corner case with evtMax=0
266 ATH_MSG_VERBOSE("Starting to go through events. Chunk start = " << m_nChunkStart+1);
267
268 // Loop through all remaining events
269 while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
272 ATH_MSG_VERBOSE("Events Counted " << m_nEvtCounted << ", Position in Chunk " << m_nPositionInChunk);
278 }
279 }
280 }
281
282 // We are done. Add -m_nEvtCounted m_nprocesses-times to the queue
283 long newValueForQueue = (long)(-m_nEvtCounted);
284 for(int i=0;i<m_nprocesses;++i) {
285 while(!m_sharedEventQueue->try_send_basic<long>(newValueForQueue)) {
286 usleep(1000);
287 }
288 }
289
290 ATH_MSG_INFO("Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
291 << ", Event Chunk size in the queue is " << m_nChunkSize);
292
294 if(m_evtShare->readEvent(0).isFailure()) {
295 ATH_MSG_ERROR("Failed to read " << m_nEvtRequested << " events");
296 all_ok=false;
297 } else {
298 ATH_MSG_DEBUG("readEvent succeeded");
299 }
300 }
301
302 if(m_appMgr->stop().isFailure()) {
303 ATH_MSG_ERROR("Unable to stop AppMgr");
304 all_ok=false;
305 }
306 else {
307 if(m_appMgr->finalize().isFailure()) {
308 std::cerr << "Unable to finalize AppMgr" << std::endl;
309 all_ok=false;
310 }
311 }
312 }
313
314
315 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
316 outwork->data = CxxUtils::xmalloc(sizeof(int));
317 *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
318 outwork->size = sizeof(int);
319
320 // ...
321 // (possible) TODO: extend outwork with some error message, which will be eventually
322 // reported in the master proces
323 // ...
324 return outwork;
325}
#define ATH_MSG_VERBOSE(x)
static Double_t sc
int m_nEvtRequested
Max event received from AppMgr.
int m_nprocesses
We use this data member for adding negative numbers at the end of the event queue.
Gaudi::Property< int > m_nEventsBeforeFork
int m_nEvtCounted
The number of events this tool has counted itself in the input files.
Gaudi::Property< int > m_nChunkSize
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueProvider::fin_func ( )
overridevirtual

Definition at line 327 of file SharedEvtQueueProvider.cxx.

328{
329 // Dummy
330 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
331 outwork->data = CxxUtils::xmalloc(sizeof(int));
332 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
333 outwork->size = sizeof(int);
334 return outwork;
335}

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr SharedEvtQueueProvider::generateOutputReport ( )
overridevirtual

Definition at line 104 of file SharedEvtQueueProvider.cxx.

105{
107 return jobOutputs;
108}
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr

◆ handle()

void SharedEvtQueueProvider::handle ( const Incident & inc)
overridevirtual

Definition at line 337 of file SharedEvtQueueProvider.cxx.

338{
339 ATH_MSG_DEBUG( "Handling incident" );
340
341 const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
342 if(fileInc == 0) {
343 ATH_MSG_WARNING( "Failed to dyn-cast the incident" );
344 return;
345 }
346
347 if(fileInc->type()=="EndInputFile") {
348 ATH_MSG_DEBUG( "End Input File reached!" );
349
353 }
354 }
355}
#define ATH_MSG_WARNING(x)

◆ operator=()

SharedEvtQueueProvider & SharedEvtQueueProvider::operator= ( const SharedEvtQueueProvider & )
private

◆ subProcessLogs()

void SharedEvtQueueProvider::subProcessLogs ( std::vector< std::string > & filenames)
overridevirtual

Definition at line 96 of file SharedEvtQueueProvider.cxx.

97{
98 filenames.clear();
99 std::filesystem::path counter_rundir(m_subprocTopDir);
100 counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
101 filenames.push_back(counter_rundir.string()+std::string("/AthenaMP.log"));
102}
list filenames
Definition grepfile.py:34

Member Data Documentation

◆ m_evtShare

SmartIF<IEventShare> SharedEvtQueueProvider::m_evtShare
private

Definition at line 60 of file SharedEvtQueueProvider.h.

◆ m_nChunkSize

Gaudi::Property<int> SharedEvtQueueProvider::m_nChunkSize {this, "ChunkSize", 1}
private

Definition at line 45 of file SharedEvtQueueProvider.h.

45{this, "ChunkSize", 1};

◆ m_nChunkStart

int SharedEvtQueueProvider::m_nChunkStart {0}
private

The beginning of the current chunk.

Definition at line 53 of file SharedEvtQueueProvider.h.

53{0};

◆ m_nEventsBeforeFork

Gaudi::Property<int> SharedEvtQueueProvider::m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
private

Definition at line 44 of file SharedEvtQueueProvider.h.

44{this, "EventsBeforeFork", 0, "Number of events before forking"};

◆ m_nEvtCounted

int SharedEvtQueueProvider::m_nEvtCounted {0}
private

The number of events this tool has counted itself in the input files.

Definition at line 57 of file SharedEvtQueueProvider.h.

57{0};

◆ m_nEvtRequested

int SharedEvtQueueProvider::m_nEvtRequested {-1}
private

Max event received from AppMgr.

Definition at line 56 of file SharedEvtQueueProvider.h.

56{-1};

◆ m_nPositionInChunk

int SharedEvtQueueProvider::m_nPositionInChunk {0}
private

Position within the current chunk.

Definition at line 54 of file SharedEvtQueueProvider.h.

54{0};

◆ m_nprocesses

int SharedEvtQueueProvider::m_nprocesses {-1}
private

We use this data member for adding negative numbers at the end of the event queue.

We cannot use m_nprocs for this purpose in order to avoid generating Output File Reports by Shared Queue Providers.

Definition at line 52 of file SharedEvtQueueProvider.h.

52{-1};

◆ m_sharedEventQueue

AthenaInterprocess::SharedQueue* SharedEvtQueueProvider::m_sharedEventQueue {nullptr}
private

Definition at line 59 of file SharedEvtQueueProvider.h.

59{nullptr};

◆ m_useSharedReader

Gaudi::Property<bool> SharedEvtQueueProvider::m_useSharedReader {this, "UseSharedReader", false, "Use shared reader"}
private

Definition at line 43 of file SharedEvtQueueProvider.h.

43{this, "UseSharedReader", false, "Use shared reader"};

The documentation for this class was generated from the following files: