ATLAS Offline Software
Loading...
Searching...
No Matches
AthMpEvtLoopMgr.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "AthMpEvtLoopMgr.h"
6
9#include "GaudiKernel/IIncidentSvc.h"
10#include "GaudiKernel/IConversionSvc.h"
11#include "GaudiKernel/Incident.h"
12#include "GaudiKernel/ServiceHandle.h"
13#include "GaudiKernel/IIoComponentMgr.h"
14#include "GaudiKernel/IIoComponent.h"
15#include "GaudiKernel/ConcurrencyFlags.h"
17
18#include <sys/stat.h>
19#include <sys/wait.h>
20#include <errno.h>
21#include <ctime>
22#include <fcntl.h>
23#include <sstream>
24#include <fstream>
25#include <cstdlib>
26#include <string>
27#include <time.h>
28#include <chrono>
29#include <algorithm>
30#include <functional>
31
32#include <filesystem>
33
35{
36 int getPss(pid_t, unsigned long&, unsigned long&, unsigned long&, unsigned long&, bool verbose=false);
37}
38
39AthMpEvtLoopMgr::AthMpEvtLoopMgr(const std::string& name
40 , ISvcLocator* svcLocator)
41 : base_class(name,svcLocator)
42 , m_masterPid(getpid())
43{
44}
45
47{
48 ATH_MSG_DEBUG("in initialize() ... ");
49
50 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
51
52 SmartIF<IProperty> prpMgr(serviceLocator());
53 if(!prpMgr.isValid()) {
54 ATH_MSG_ERROR("Failed to get hold of the Property Manager");
55 return StatusCode::FAILURE;
56 }
57
58 {
59 std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
60 m_evtSelector = serviceLocator()->service(std::move(evtSelName));
61 }
62 ATH_CHECK(m_evtSelector.isValid());
63
64 if(m_strategy=="EventService") {
65 // ES with non-zero events before forking makes no sense
66 if(m_nEventsBeforeFork!=0) {
67 ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
68 return StatusCode::FAILURE;
69 }
70
71 // We need to ignore SkipEvents in ES
72 if(updateSkipEvents(0).isFailure()) {
73 ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
74 return StatusCode::FAILURE;
75 }
76 }
77
78 if(m_isPileup) {
79 m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
80 ATH_MSG_INFO("ELM: The job running in pileup mode");
81 }
82 else {
83 ATH_MSG_INFO("ELM: The job running in non-pileup mode");
84 }
85
86 ATH_CHECK(m_evtProcessor.retrieve());
87 if(!m_isPileup) {
88 SmartIF<IProperty> propertyServer(m_evtProcessor.get());
89 if(propertyServer) {
90 if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
91 ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
92 }
93 if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
94 ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
95 }
96 }
97 else {
98 ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
99 }
100 }
101 ATH_CHECK(m_tools.retrieve());
102
103 return StatusCode::SUCCESS;
104}
105
107{
108 return StatusCode::SUCCESS;
109}
110
111StatusCode AthMpEvtLoopMgr::nextEvent(int maxevt)
112{
113 // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
114 return m_evtProcessor->nextEvent(maxevt);
115}
116
118 // return an invalid context - method should not be called
119 return EventContext{};
120}
121
122StatusCode AthMpEvtLoopMgr::executeEvent(EventContext&& ctx)
123{
124 // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
125 return m_evtProcessor->executeEvent(std::move(ctx));
126}
127
128StatusCode AthMpEvtLoopMgr::executeRun(int maxevt)
129{
130 ATH_MSG_DEBUG("in executeRun()");
131
132 // Generate random component of the Shared Memory and Shared Queue names
133 srand(time(NULL));
134 std::ostringstream randStream;
135 randStream << getpid() << '_' << AthenaInterprocess::randString();
136 ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
137
138 ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
139 ATH_CHECK(pDetStore.retrieve());
140
141 // Create Shared Event queue if necessary and make it available to the tools
142 if(m_strategy=="SharedQueue"
143 || m_strategy=="RoundRobin") {
144 AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
145 if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146 ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
147 delete evtQueue;
148 return StatusCode::FAILURE;
149 }
150 }
151
152 // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
153 // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
154 if(m_strategy=="EventService") {
155 AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
156 if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157 ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
158 delete failedPidQueue;
159 return StatusCode::FAILURE;
160 }
161 }
162
163 // Prepare work directory for sub-processes
164 if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
165 switch(errno) {
166 case EEXIST:
167 {
168 // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
169 // Rename it with m_workerTopDir+"-bak-rand"
170
171 srand((unsigned)time(0));
172 std::ostringstream randname;
173 randname << rand();
174 std::string backupDir = (m_workerTopDir.value().rfind('/')==(m_workerTopDir.value().size()-1)
175 ? m_workerTopDir.value().substr(0,m_workerTopDir.value().size()-1)
176 : m_workerTopDir.value())+std::string("-bak-")+randname.str();
177
178 ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
179 ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
180
181 if(rename(m_workerTopDir.value().c_str(),backupDir.c_str())!=0) {
182 char buf[256];
183 strerror_r(errno, buf, sizeof(buf));
184 ATH_MSG_ERROR("Unable to make backup directory! " << buf);
185 return StatusCode::FAILURE;
186 }
187
188 if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0) break;
189 }
190 /* FALLTHROUGH */
191 default:
192 {
193 char buf[256];
194 strerror_r(errno, buf, sizeof(buf));
195 ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
196 return StatusCode::FAILURE;
197 }
198 }
199 }
200
201 // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
202 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
203 ATH_CHECK(incSvc.retrieve());
204
206 incSvc->fireIncident(Incident(name(),"BeforeFork"));
207 }
208
209 // Extract process file descriptors
210 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
211
212 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
213 itLast = m_tools.end();
214
215 // When using SharedWriter in conjunction with fork-after-N-events
216 // we have to make sure that mother process is a conversion service
217 // client so that events before forking workers are captured...
218
219 auto sharedWriterTool = m_tools["SharedWriterTool"];
220 const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
221
222 if(sharedWriterWithFAFE) {
223 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
224 ATH_CHECK(m_dataShare.isValid());
225
226 (*sharedWriterTool)->useFdsRegistry(registry);
227 (*sharedWriterTool)->setRandString(randStream.str());
228
229 int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
230 if(nChildren==-1) {
231 ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
232 return StatusCode::FAILURE;
233 }
234 else {
235 m_nChildProcesses+=nChildren;
236 }
237
238 // Execute the SharedWriterTool at this point
239 StatusCode mySc = (*sharedWriterTool)->exec();
240 if(!mySc.isSuccess()) {
241 ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
242 return StatusCode::FAILURE;
243 }
244
245 // Make the mother process a client
246 if(!m_dataShare->makeClient(m_nWorkers+1).isSuccess()) {
247 ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
248 return StatusCode::FAILURE;
249 }
250 }
251
252 //
253 // Try processing requested number of events here
255 // Take into account a corner case: m_nEventsBeforeFork > maxevt
256 int nEventsToProcess = (maxevt>-1 && m_nEventsBeforeFork>maxevt)
257 ? maxevt
258 : m_nEventsBeforeFork.value();
259 StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
260 if(!scEvtProc.isSuccess()) {
261 if(nEventsToProcess) {
262 ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
263 }
264 else {
265 ATH_MSG_FATAL("Unable to process first event in the master");
266 }
267 return scEvtProc;
268 }
269 }
270
271 // Finalize I/O (close input files) by IoComponents
272 ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
273 ATH_CHECK(ioMgr.retrieve());
274 ATH_CHECK(ioMgr->io_finalize());
275 ATH_MSG_DEBUG("Successfully finalized I/O before forking");
276
277 // Flush stream buffers
278 fflush(NULL);
279
280 // Make the mother process not client
281 if(sharedWriterWithFAFE && !m_dataShare->makeClient(0).isSuccess()) {
282 ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
283 return StatusCode::FAILURE;
284 }
285
286 int maxEvents(maxevt); // This can be modified after restart
287
288 // Re-extract process file descriptors
289 registry = extractFds();
290
291 // Make worker pools
292 it = m_tools.begin();
293 for(; it!=itLast; ++it) {
294 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
295 (*it)->useFdsRegistry(registry);
296 (*it)->setRandString(randStream.str());
297 (*it)->setMaxEvt(maxevt);
298 (*it)->setMPRunStop(this);
299 if(it==m_tools.begin()) {
300 incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
301 }
302 int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
303 if(nChildren==-1) {
304 ATH_MSG_FATAL("makePool failed for " << (*it)->name());
305 return StatusCode::FAILURE;
306 }
307 else {
308 m_nChildProcesses+=nChildren;
309 }
310 }
311
312 if(m_nChildProcesses==0) {
313 ATH_MSG_ERROR("No child processes were created");
314 return StatusCode::FAILURE;
315 }
316
317 // Assign work to child processes
318 for(it=m_tools.begin(); it!=itLast; ++it) {
319 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
320 if((*it)->exec().isFailure()) {
321 ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
322 return StatusCode::FAILURE;
323 }
324 }
325
326 StatusCode sc = wait();
327
329 ATH_MSG_INFO("*** *** Memory Usage *** ***");
330 ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
331 ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
332 ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
333 ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
334 ATH_MSG_INFO("*** *** Memory Usage *** ***");
335 }
336
338 ATH_MSG_INFO("BEGIN collecting sub-process logs");
339 std::vector<std::string> logs;
340 for(it=m_tools.begin(); it!=itLast; ++it) {
341 (*it)->subProcessLogs(logs);
342 for(size_t i=0;i<logs.size();++i) {
343 std::cout << "\n File: " << logs[i] << "\n" << std::endl;
344 std::ifstream log;
345 log.open(logs[i].c_str(),std::ifstream::in);
346 std::string line;
347 while(!log.eof()) {
348 std::getline(log,line);
349 std::cout << line << std::endl;
350 }
351 log.close();
352 }
353 }
354 ATH_MSG_INFO("END collecting sub-process logs");
355 }
356
357 if(sc.isSuccess())
358 return generateOutputReport();
359 else
360 return sc;
361}
362
364{
365 m_scheduledStop = true;
366 return m_evtProcessor->stopRun();
367}
368
369// !!! NB !!!
370//
371// Here we rely on fact that if master process finds that one of
372// its sub-processes finished abnormally (either signal or non-zero exit code)
373// it will stop waiting for other sub-processes and proceed with its finalization.
374// Once master process exits the remaining sub-processes will receive SIGHUP and exit too.
375//
376// We could also change the behavior and broadcast termination signal on all remaining
377// sub-processes once a problematic sub-process has been identified
378//
380{
381 ATH_MSG_INFO("Waiting for sub-processes");
382 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
383 itLast = m_tools.end();
384 pid_t pid(0);
385 bool all_ok(true);
386
387 auto memMonTime = std::chrono::system_clock::now();
388
389 while(m_nChildProcesses>0) {
390 for(it = m_tools.begin(); it!=itLast; ++it) {
391 if((*it)->wait_once(pid).isFailure()) {
392 all_ok = false;
393 ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
394 break;
395 }
396 else {
397 if(pid>0) m_nChildProcesses -= 1;
398 }
399 }
400 if(!all_ok) break;
401
402 usleep(m_nPollingInterval*1000);
403
405 auto currTime = std::chrono::system_clock::now();
406 if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
407 unsigned long size(0);
408 unsigned long rss(0);
409 unsigned long pss(0);
410 unsigned long swap(0);
411
412 if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
413 ATH_MSG_WARNING("Unable to get memory sample");
414 else {
415 m_samplesRss.push_back(rss);
416 m_samplesPss.push_back(pss);
417 m_samplesSize.push_back(size);
418 m_samplesSwap.push_back(swap);
419 }
420 memMonTime=currTime;
421 }
422 }
423 }
424
425 for(it=m_tools.begin(); it!=itLast; ++it)
426 (*it)->reportSubprocessStatuses();
427
428 if(!all_ok) {
429 for(it=m_tools.begin(); it!=itLast; ++it)
430 (*it)->killChildren();
431 }
432
433 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
434}
435
437{
438 // Loop over tools, collect their output reports and put them all together into a single file.
439 // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
440
441 std::ofstream ofs;
442 ofs.open(m_outputReportName.value().c_str());
443 if(!ofs) {
444 ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
445 return StatusCode::FAILURE;
446 }
447 else {
448 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
449
450 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
451 itLast = m_tools.end();
452 for(it=m_tools.begin(); it!=itLast; ++it)
453 allptrs.push_back((*it)->generateOutputReport());
454
455 // First collect keys=file_names from all tools
456 std::set<std::string> allkeys;
457 for(size_t i=0; i<allptrs.size(); ++i) {
458 AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
459 it_wosLast = allptrs[i]->end();
460 for(;it_wos!=it_wosLast;++it_wos)
461 allkeys.insert(it_wos->first);
462 }
463
464 // Generate XML
465 ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
466 ofs << "<athenaFileReport>" << std::endl;
467 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
468 keys_itLast = allkeys.end();
469 for(;keys_it!=keys_itLast;++keys_it) {
470 ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
471 for(size_t i=0; i<allptrs.size(); ++i) {
472 AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
473 if(it_wos!=(allptrs[i])->end()) {
474 for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
475 AthenaMP::WorkerOutput& outp = it_wos->second[ii];
476 if(ii==0 && m_nEventsBeforeFork>0) {
477 std::filesystem::path masterFile(std::filesystem::current_path());
478 masterFile /= std::filesystem::path(*keys_it);
479 if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
480 ofs << " <File "
481 << "description=\"" << outp.description
482 << "\" mode=\"" << outp.access_mode
483 << "\" name=\"" << masterFile.string()
484 << "\" shared=\"" << (outp.shared?"True":"False")
485 << "\" technology=\"" << outp.technology
486 << "\"/>" << std::endl;
487 }
488 ofs << " <File "
489 << "description=\"" << outp.description
490 << "\" mode=\"" << outp.access_mode
491 << "\" name=\"" << outp.filename
492 << "\" shared=\"" << (outp.shared?"True":"False")
493 << "\" technology=\"" << outp.technology
494 << "\"/>" << std::endl;
495 }
496 }
497 }
498 ofs << " </Files>" << std::endl;
499 }
500 ofs << "</athenaFileReport>" << std::endl;
501 ofs.close();
502 }
503
504 return StatusCode::SUCCESS;
505}
506
507std::shared_ptr<AthenaInterprocess::FdsRegistry> AthMpEvtLoopMgr::extractFds()
508{
509 ATH_MSG_DEBUG("Extracting file descriptors");
510 using namespace std::filesystem;
511 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
512
513 // Extract file descriptors associated with the current process
514 // 1. Store only those regular files in the registry, which
515 // don't contain substrings from the "exclusion pattern" set
516 // 2. Skip also stdout and stderr
517
518 std::vector<std::string> excludePatterns {
519 "/root/etc/plugins/"
520 ,"/root/cint/cint/"
521 ,"/root/include/"
522 ,"/var/tmp/"
523 ,"/var/lock/"
524 ,"/var/lib/"
525 ,"/bin/python/"
526 ,"/include/c++/"
527 ,".confdb2"
528 };
529
530 path fdPath("/proc/self/fd");
531 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
532 if(is_symlink(fdIt->path())) {
533 path realpath = read_symlink(fdIt->path());
534 int fd = atoi(fdIt->path().filename().string().c_str());
535
536 if (fd==1 || fd==2) // Skip stdout and stderr
537 continue;
538
539 if(exists(realpath)) {
540 if(is_regular_file(realpath)) {
541 // Check against the exclusion criteria
542 bool exclude(false);
543 for(size_t i=0;i<excludePatterns.size(); ++i) {
544 if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
545 exclude = true;
546 break;
547 }
548 }
549 if(exclude) {
550 ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
551 }
552 else {
553 registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
554 }
555 }
556 else {
557 ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
558 }
559 } // File exists
560 }
561 else
562 ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
563 } // Directory iteration
564
565 ATH_MSG_DEBUG("Fds Reistry created. Contents:");
566 for(size_t ii(0); ii<registry->size(); ++ii)
567 ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
568
569 return registry;
570}
571
572StatusCode AthMpEvtLoopMgr::updateSkipEvents(int skipEvents)
573{
574 SmartIF<IProperty> propertyServer(m_evtSelector);
575 if(!propertyServer) {
576 ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
577 return StatusCode::FAILURE;
578 }
579
580 IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
581 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
582 ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
583 return StatusCode::FAILURE;
584 }
585 ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
586
587 return StatusCode::SUCCESS;
588}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
void swap(DataVector< T > &a, DataVector< T > &b)
See DataVector<T, BASE>::swap().
int32_t pid_t
static Double_t sc
virtual StatusCode stopRun() override
std::vector< unsigned long > m_samplesSize
Gaudi::Property< int > m_nPollingInterval
virtual EventContext createEventContext() override
SmartIF< IDataShare > m_dataShare
virtual StatusCode finalize() override
virtual StatusCode executeEvent(EventContext &&ctx) override
StringArrayProperty m_execAtPreFork
virtual StatusCode nextEvent(int maxevt) override
ToolHandleArray< IAthenaMPTool > m_tools
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
virtual StatusCode initialize() override
StatusCode updateSkipEvents(int skipEvents)
Gaudi::Property< bool > m_collectSubprocessLogs
Gaudi::Property< unsigned int > m_eventPrintoutInterval
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_isPileup
std::vector< unsigned long > m_samplesRss
virtual StatusCode executeRun(int maxevt) override
Gaudi::Property< std::string > m_strategy
AthMpEvtLoopMgr()=delete
ServiceHandle< IEventProcessor > m_evtProcessor
StatusCode generateOutputReport()
Gaudi::Property< std::string > m_outputReportName
std::vector< unsigned long > m_samplesSwap
std::vector< unsigned long > m_samplesPss
Gaudi::Property< int > m_nMemSamplingInterval
SmartIF< IService > m_evtSelector
bool exists(const std::string &filename)
does a file exist
std::ostream * outp
send output to here ...
Definition hcg.cxx:76
std::string find(const std::string &s)
return a remapped string
Definition hcg.cxx:138
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
std::set< std::string > exclude
list of directories to be excluded
Definition hcg.cxx:98
bool verbose
Definition hcg.cxx:73
std::vector< FdsRegistryEntry > FdsRegistry
Definition FdsRegistry.h:22
AllWorkerOutputs::iterator AllWorkerOutputsIterator
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)