ATLAS Offline Software
Loading...
Searching...
No Matches
SharedHiveEvtQueueConsumer.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
13#include "CxxUtils/xmalloc.h"
14#include "GaudiKernel/IEvtSelector.h"
15#include "GaudiKernel/IIoComponentMgr.h"
16#include "GaudiKernel/IFileMgr.h"
17#include "GaudiKernel/IChronoStatSvc.h"
18#include "GaudiKernel/ISvcLocator.h"
19#include "GaudiKernel/IIncidentSvc.h"
20#include "GaudiKernel/IConversionSvc.h"
21
22#include <sys/stat.h>
23#include <sstream>
24#include <fstream>
25#include <unistd.h>
26#include <stdio.h>
27#include <stdint.h>
28#include <stdexcept>
29#include <cmath> // For pow
30
31#include <signal.h>
32
34 std::atomic<bool> sig_done = false;
35 void pauseForDebug(int /*sig*/) {
36 // std::cout << "Continuing after receiving signal "
37 // << sig << std::endl;
38 sig_done = true;
39 }
40}
41
43 , const std::string& name
44 , const IInterface* parent)
45 : AthenaMPToolBase(type,name,parent)
46 , m_chronoStatSvc("ChronoStatSvc", name)
47{
48 m_subprocDirPrefix = "worker_";
49}
50
51/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
52
56
57/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
58
60{
61 ATH_MSG_DEBUG("In initialize");
62
64
65 m_evtSelSeek = serviceLocator()->service(m_evtSelName);
66 ATH_CHECK( m_evtSelSeek.isValid() );
67 ATH_CHECK( evtSelector()->createContext (m_evtContext) );
68
69 ATH_CHECK(m_chronoStatSvc.retrieve());
70
72 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
73 if(!m_dataShare) {
74 ATH_MSG_ERROR("Error retrieving AthenaPoolSharedIOCnvSvc");
75 return StatusCode::FAILURE;
76 }
77 }
78
79 return StatusCode::SUCCESS;
80}
81
82/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
83
84StatusCode
86{
87 if (m_evtContext) {
88 ATH_CHECK( evtSelector()->releaseContext (m_evtContext) );
89 m_evtContext = nullptr;
90 }
91
92 return StatusCode::SUCCESS;
93}
94
95/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
96
97int
98SharedHiveEvtQueueConsumer::makePool(int, int nprocs, const std::string& topdir)
99{
100 ATH_MSG_DEBUG("In makePool " << getpid());
101
102 if(nprocs==0 || nprocs<-1) {
103 ATH_MSG_ERROR("Invalid value for the nprocs parameter: ");
104 return -1;
105 }
106
107 if(topdir.empty()) {
108 ATH_MSG_ERROR("Empty name for the top directory!");
109 return -1;
110 }
111
112 m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
113 m_subprocTopDir = topdir;
114
115 // Get the shared event queue
116 ATH_MSG_DEBUG("Event queue name " << "AthenaMPEventQueue_" << m_randStr);
117 StatusCode sc = detStore()->retrieve(m_sharedEventQueue,"AthenaMPEventQueue_"+m_randStr);
118 if(sc.isFailure()) {
119 ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Event Queue");
120 return -1;
121 }
122
123
124 // Create rank queue and fill it
125 m_sharedRankQueue = std::make_unique<AthenaInterprocess::SharedQueue>("SharedHiveEvtQueueConsumer_RankQueue_"+m_randStr,m_nprocs,sizeof(int));
126 for(int i=0; i<m_nprocs; ++i)
127 if(!m_sharedRankQueue->send_basic<int>(i)) {
128 ATH_MSG_ERROR("Unable to send int to the ranks queue!");
129 return -1;
130 }
131
132 // Create the process group and map_async bootstrap
133 m_processGroup = new AthenaInterprocess::ProcessGroup(m_nprocs);
134 ATH_MSG_INFO("Created Pool of " << m_nprocs << " worker processes");
135 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
136 return -1;
137 ATH_MSG_INFO("Workers bootstrapped");
138
139 return m_nprocs;
140}
141
142/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
143
144StatusCode
145SharedHiveEvtQueueConsumer::exec()
146{
147 ATH_MSG_DEBUG("In exec " << getpid());
148
149 if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
150 return StatusCode::FAILURE;
151 ATH_MSG_INFO("Workers started processing events");
152
153 return StatusCode::SUCCESS;
154}
155
156/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
157
158StatusCode
159SharedHiveEvtQueueConsumer::wait_once(pid_t& pid)
160{
161 StatusCode sc = AthenaMPToolBase::wait_once(pid);
162 AthenaInterprocess::ProcessResult* presult(0);
163 if(sc.isFailure()) {
164 // We are to stop waiting. Pull all available ProcessResults from the queue
165 // Don't serialize finalizations
166 do {
167 presult = m_processGroup->pullOneResult();
168 if(presult && (unsigned)(presult->output.size)>sizeof(int))
169 decodeProcessResult(presult,false);
170 if(presult) free(presult->output.data);
171 delete presult;
172 } while(presult);
173 } else {
174 // Pull one result and decode it if necessary
175 presult = m_processGroup->pullOneResult();
176 int res(0);
177 if(presult && (unsigned)(presult->output.size)>sizeof(int))
178 res = decodeProcessResult(presult,true);
179 if(presult) free(presult->output.data);
180 delete presult;
181 if(res) return StatusCode::FAILURE;
182 }
183 return sc;
184}
185
186/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
187
188void
190{
191 ATH_MSG_INFO("Statuses of event processors");
192 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
193 for(size_t i=0; i<statuses.size(); ++i) {
194 // Get the number of events processed by this worker
195 std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
196 msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
197 << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
198 << ". Number of events processed: ";
199 if(it==m_nProcessedEvents.end())
200 msg(MSG::INFO) << "N/A" << endmsg;
201 else
202 msg(MSG::INFO) << it->second << endmsg;
203 }
204}
205
206/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
207
208void
209SharedHiveEvtQueueConsumer::subProcessLogs(std::vector<std::string>& filenames)
210{
211 filenames.clear();
212 for(int i=0; i<m_nprocs; ++i) {
213 std::ostringstream workerIndex;
214 workerIndex << i;
215 std::filesystem::path worker_rundir(m_subprocTopDir);
216 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
217 filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
218 }
219}
220
221/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
222
223std::unique_ptr<AthenaInterprocess::ScheduledWork>
225{
226 if (m_debug) {
227 ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
228 sigset_t mask, oldmask;
229
231
232 sigemptyset (&mask);
233 sigaddset (&mask, SIGUSR1);
234
235 sigprocmask (SIG_BLOCK, &mask, &oldmask);
237 sigsuspend (&oldmask);
238 sigprocmask (SIG_UNBLOCK, &mask, NULL);
239 }
240
241 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
242 outwork->data = CxxUtils::xmalloc(sizeof(int));
243 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
244 outwork->size = sizeof(int);
245
246 // ...
247 // (possible) TODO: extend outwork with some error message, which will be eventually
248 // reported in the master proces
249 // ...
250
251 // ________________________ Get IncidentSvc and fire PostFork ________________________
252 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
253 if (!p_incidentSvc) {
254 ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
255 return outwork;
256 }
257 p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
258
259
260 // ________________________ Get RankID ________________________
261 //
262 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
263 ATH_MSG_ERROR("Unable to get rank ID!");
264 return outwork;
265 }
266 std::ostringstream workindex;
267 workindex<<m_rankId;
268
269 // ________________________ Worker dir: mkdir ________________________
270 std::filesystem::path worker_rundir(m_subprocTopDir);
271 worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
272 // TODO: this "worker_" can be made configurable too
273
274 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
275 ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
276 return outwork;
277 }
278
279 // ________________________ Redirect logs ________________________
280 if(redirectLog(worker_rundir.string()))
281 return outwork;
282
283 ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
284
285 // ________________________ Update Io Registry ____________________________
286 if(updateIoReg(worker_rundir.string()))
287 return outwork;
288
289 ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
290
291 // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
292 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
293 if(std::filesystem::is_regular_file("SimParams.db"))
294 COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
295 if(std::filesystem::is_regular_file("DigitParams.db"))
296 COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
297 if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
298 COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
299
300 // _______________________ Handle saved PFC (if any) ______________________
301 if(handleSavedPfc(abs_worker_rundir))
302 return outwork;
303
304 // ________________________ reopen descriptors ____________________________
305 if(reopenFds())
306 return outwork;
307
308 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
309
310 // ________________________ Make Shared Writer Client ________________________
311
313 SmartIF<IProperty> propertyServer(m_dataShare);
314 if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
315 ATH_MSG_ERROR("Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
316 return outwork;
317 } else {
318 ATH_MSG_DEBUG("Successfully made the conversion service a share client");
319 }
320 }
321
322 // ________________________ I/O reinit ________________________
323 ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
324
325 // ________________________ Event selector restart ________________________
326 SmartIF<IService> evtSelSvc(m_evtSelector);
327 ATH_CHECK( evtSelSvc.isValid(), outwork );
328 ATH_CHECK( evtSelSvc->start(), outwork );
329
330 // ________________________ Worker dir: chdir ________________________
331 if(chdir(worker_rundir.string().c_str())==-1) {
332 ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
333 return outwork;
334 }
335
336 // ___________________ Fire UpdateAfterFork incident _________________
337 p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
338
339 // Declare success and return
340 *(int*)(outwork->data) = 0;
341 return outwork;
342}
343
344/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
345
346std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedHiveEvtQueueConsumer::exec_func()
347{
348 ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
349
350 bool all_ok(true);
351
352 if (!initHive().isSuccess()) {
353 ATH_MSG_FATAL("unable to initialize Hive");
354 all_ok = false;
355 }
356
357 // Get the value of SkipEvent
358 int skipEvents(0);
359 SmartIF<IProperty> propertyServer(m_evtSelector);
360 if(propertyServer==0) {
361 ATH_MSG_ERROR("Unable to cast event selector to IProperty");
362 all_ok = false;
363 }
364 else {
365 std::string propertyName("SkipEvents");
366 IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
367 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
368 ATH_MSG_INFO("Event Selector does not have SkipEvents property");
369 }
370 else {
371 skipEvents = skipEventsProp.value();
372 }
373 }
374
375 IHybridProcessorHelper* hybridHelper = dynamic_cast<IHybridProcessorHelper*>(m_evtProcessor.get());
376 if(!hybridHelper) {
377 ATH_MSG_FATAL("Failed to acquire IHybridProcessorHelper interface");
378 all_ok = false;
379 return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
380 }
381 // Reset the application return code.
382 hybridHelper->resetAppReturnCode();
383
384 int finishedEvts =0;
385 int createdEvts =0;
386 long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
387 long evtnumAndChunk(0);
388// unsigned evtCounter(0);
389 int evtnum(0), chunkSize(1);
390
391 ATH_MSG_INFO("Starting loop on events");
392
393 StatusCode sc(StatusCode::SUCCESS);
394
395 while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
396 ATH_MSG_DEBUG("Event queue is empty");
397 usleep(1000);
398 }
399 bool loop_ended = (evtnumAndChunk<0);
400 if(!loop_ended) {
401 ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
402 chunkSize = evtnumAndChunk >> (sizeof(int)*8);
403 evtnum = evtnumAndChunk & intmask;
404 ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
405 hybridHelper->setCurrentEventNum(++evtnum);
406 }
407
408 bool no_more_events = false;
409
410 while(!loop_ended) {
411 ATH_MSG_DEBUG(" -> createdEvts: " << createdEvts);
412
413 if(!hybridHelper->terminateLoop() // No scheduled loop termination
414 && !no_more_events // We are not yet done getting events
415 && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
416 ATH_MSG_DEBUG("createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots());
417
418 auto ctx = m_evtProcessor->createEventContext();
419 if(!ctx.valid()) {
420 sc = StatusCode::FAILURE;
421 }
422 else {
423 sc = m_evtProcessor->executeEvent(std::move(ctx));
424 }
425
426 if (sc.isFailure()) {
427 ATH_MSG_ERROR("Terminating event processing loop due to errors");
428 loop_ended = true;
429 }
430 else {
431 ++createdEvts;
432 if(--chunkSize==0) {
433 // Fetch next chunk
434 while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
435 ATH_MSG_DEBUG("Event queue is empty");
436 usleep(1000);
437 }
438 if(evtnumAndChunk<0) {
439 no_more_events = true;
440 evtnumAndChunk *= -1;
441 ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
442 }
443 else {
444 ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
445 chunkSize = evtnumAndChunk >> (sizeof(int)*8);
446 evtnum = evtnumAndChunk & intmask;
447 ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
448 }
449 }
450 // Advance to the next event
451 if(!no_more_events) {
452 hybridHelper->setCurrentEventNum(++evtnum);
453 }
454 }
455 }
456 else {
457 // all the events were created but not all finished or the slots were
458 // all busy: the scheduler should finish its job
459 ATH_MSG_DEBUG("Draining the scheduler");
460
461 // Pull out of the scheduler the finished events
462 int ir = hybridHelper->drainScheduler(finishedEvts,true);
463 if(ir < 0) {
464 // some sort of error draining scheduler;
465 loop_ended = true;
466 sc = StatusCode::FAILURE;
467 }
468 else if(ir == 0) {
469 // no more events in scheduler
470 if(no_more_events) {
471 // We are done
472 loop_ended = true;
473 sc = StatusCode::SUCCESS;
474 }
475 }
476 else {
477 // keep going!
478 }
479 }
480 } // end main loop on finished events
481
482 if(all_ok) {
483 if(m_evtProcessor->executeRun(0).isFailure()) {
484 ATH_MSG_ERROR("Could not finalize the Run");
485 all_ok=false;
486 } else {
487 if(m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents).isFailure()) {
488 ATH_MSG_DEBUG("Seek past maxevt to " << evtnumAndChunk+skipEvents << " returned failure. As expected...");
489 }
490 }
491 }
492
493 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
494
495 // Return value: "ERRCODE|Func_Flag|NEvt"
496 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
497 void* outdata = CxxUtils::xmalloc(outsize);
498 *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
500 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
501 memcpy((char*)outdata+sizeof(int)+sizeof(func),&createdEvts,sizeof(int));
502
503 outwork->data = outdata;
504 outwork->size = outsize;
505 // ...
506 // (possible) TODO: extend outwork with some error message, which will be eventually
507 // reported in the master proces
508 // ...
509 return outwork;
510}
511
512/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
513
514std::unique_ptr<AthenaInterprocess::ScheduledWork>
516{
517 ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
518
519 bool all_ok(true);
520
521 if(m_appMgr->stop().isFailure()) {
522 ATH_MSG_ERROR("Unable to stop AppMgr");
523 all_ok=false;
524 } else {
525 if(m_appMgr->finalize().isFailure()) {
526 std::cerr << "Unable to finalize AppMgr" << std::endl;
527 all_ok=false;
528 }
529 }
530
531 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
532
533 // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
534 int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
535 void* outdata = CxxUtils::xmalloc(outsize);
536 *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
538 memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
539 int nEvt = -1;
540 memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
541
542 outwork->data = outdata;
543 outwork->size = outsize;
544
545 return outwork;
546}
547
548/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
549
550int
551SharedHiveEvtQueueConsumer::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
552{
553 if(!presult) return 0;
554 const AthenaInterprocess::ScheduledWork& output = presult->output;
555 ATH_MSG_DEBUG("Decoding the output of PID=" << presult->pid << " with the size=" << output.size);
556 if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) return 0;
557
559 memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
561 // Store the number of processed events
562 int nevt(0);
563 memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
564 m_nProcessedEvents[presult->pid]=nevt;
565 ATH_MSG_DEBUG("PID=" << presult->pid << " processed " << nevt << " events");
566
567 if(doFinalize) {
568 // Add PID to the finalization queue
569 m_finQueue.push(presult->pid);
570 ATH_MSG_DEBUG("Added PID=" << presult->pid << " to the finalization queue");
571
572 // If this is the only element in the queue then start its finalization
573 // Otherwise it has to wait its turn until all previous processes have been finalized
574 if(m_finQueue.size()==1) {
575 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
576 || m_processGroup->map_async(0,0,presult->pid)) {
577 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << presult->pid);
578 return 1;
579 } else {
580 ATH_MSG_DEBUG("Scheduled finalization of PID=" << presult->pid);
581 }
582 }
583 }
584 } else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
585 ATH_MSG_DEBUG("Finished finalization of PID=" << presult->pid);
586 pid_t pid = m_finQueue.front();
587 if(pid==presult->pid) {
588 // pid received as expected. Remove it from the queue
589 m_finQueue.pop();
590 ATH_MSG_DEBUG("PID=" << presult->pid << " removed from the queue");
591 // Schedule finalization of the next processe in the queue
592 if(m_finQueue.size()) {
593 if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
594 || m_processGroup->map_async(0,0,m_finQueue.front())) {
595 ATH_MSG_ERROR("Problem scheduling finalization on PID=" << m_finQueue.front());
596 return 1;
597 } else {
598 ATH_MSG_DEBUG("Scheduled finalization of PID=" << m_finQueue.front());
599 }
600 }
601 } else {
602 // Error: unexpected pid received from presult
603 ATH_MSG_ERROR("Finalized PID=" << presult->pid << " while PID=" << pid << " was expected");
604 return 1;
605 }
606 }
607
608 return 0;
609}
610
611/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
612
613StatusCode
615
616 if (m_evtProcessor.release().isFailure()) {
617 ATH_MSG_INFO("could not release old EventProcessor ");
618 }
619
620 ISvcManager* pISM(dynamic_cast<ISvcManager*>(serviceLocator().get()));
621 if (pISM == 0) {
622 ATH_MSG_ERROR("initHive: Could not get SvcManager");
623 } else {
624 if (pISM->removeService(m_evtProcessor.name()).isFailure()) {
625 ATH_MSG_ERROR("initHive: could not remove " << m_evtProcessor.name()
626 << " from SvcManager");
627 }
628 }
629
630 m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaMtesEventLoopMgr",name());
631
632 if (m_evtProcessor.retrieve().isFailure()) {
633 ATH_MSG_ERROR("could not setup " << m_evtProcessor.typeAndName());
634 return StatusCode::FAILURE;
635 }
636
637 m_schedulerSvc = serviceLocator()->service("AvalancheSchedulerSvc");
638
639 // m_whiteboard = serviceLocator()->service(m_whiteboardName);
640 // if( !m_whiteboard.isValid() ) {
641 // ATH_MSG_FATAL( "Error retrieving " << m_whiteboardName
642 // << " interface IHiveWhiteBoard." );
643 // return StatusCode::FAILURE;
644 // }
645
646 // m_schedulerSvc = serviceLocator()->service(m_schedulerName);
647 // if ( !m_schedulerSvc.isValid()){
648 // ATH_MSG_FATAL( "Error retrieving SchedulerSvc interface ISchedulerSvc." );
649 // return StatusCode::FAILURE;
650 // }
651 // // Setup algorithm resource pool
652 // m_algResourcePool = serviceLocator()->service("AlgResourcePool");
653 // if( !m_algResourcePool.isValid() ) {
654 // ATH_MSG_FATAL ("Error retrieving AlgResourcePool");
655 // return StatusCode::FAILURE;
656 // }
657
658 // sc = m_eventStore.retrieve();
659 // if( !sc.isSuccess() ) {
660 // ATH_MSG_FATAL("Error retrieving pointer to StoreGateSvc");
661 // return sc;
662 // }
663
664
665 return StatusCode::SUCCESS;
666
667}
668
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
int32_t pid_t
Extension to IEvtSelector to allow for seeking.
std::pair< std::vector< unsigned int >, bool > res
static Double_t sc
#define sigemptyset(x)
Definition SealSignal.h:82
#define sigaddset(x, y)
Definition SealSignal.h:84
int sigset_t
Definition SealSignal.h:80
constexpr int pow(int base, int exp) noexcept
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
std::string m_evtSelName
Name of the event selector.
SmartIF< IEvtSelector > m_evtSelector
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
IEvtSelector * evtSelector()
virtual StatusCode initialize() override
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IAppMgrUI > m_appMgr
int m_nprocs
Number of workers spawned by the master process.
ServiceHandle< IIoComponentMgr > m_ioMgr
AthenaInterprocess::ProcessGroup * m_processGroup
ServiceHandle< IEventProcessor > m_evtProcessor
std::string m_subprocDirPrefix
For ex. "worker__".
std::string fmterror(int errnum)
Helper interface for implementing hybrid MP+MT.
virtual void resetAppReturnCode()=0
virtual bool terminateLoop()=0
virtual void setCurrentEventNum(int num)=0
virtual int drainScheduler(int &finishedEvts, bool report)=0
virtual void reportSubprocessStatuses() override
virtual StatusCode initialize() override
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual void subProcessLogs(std::vector< std::string > &) override
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Gaudi::Property< bool > m_useSharedWriter
SmartIF< IEvtSelectorSeek > m_evtSelSeek
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
std::map< pid_t, int > m_nProcessedEvents
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
virtual StatusCode finalize() override
SharedHiveEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
#define COPY_FILE_HACK(_src, _dest)
int ir
counter of the current depth
Definition fastadd.cxx:49
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
::StatusCode StatusCode
StatusCode definition for legacy code.
MsgStream & msg
Definition testRead.cxx:32
Trapping version of malloc.