13 #include "GaudiKernel/IEvtSelector.h"
14 #include "GaudiKernel/IIoComponentMgr.h"
15 #include "GaudiKernel/IFileMgr.h"
16 #include "GaudiKernel/IChronoStatSvc.h"
17 #include "GaudiKernel/ISvcLocator.h"
18 #include "GaudiKernel/IIncidentSvc.h"
19 #include "GaudiKernel/IConversionSvc.h"
42 ,
const std::string&
name
43 ,
const IInterface*
parent)
46 , m_chronoStatSvc(
"ChronoStatSvc",
name)
49 , m_sharedEventQueue(0)
50 , m_sharedRankQueue(0)
52 declareInterface<IAthenaMPTool>(
this);
76 return StatusCode::FAILURE;
82 IConversionSvc* cnvSvc = 0;
83 sc = serviceLocator()->service(
"AthenaPoolCnvSvc",cnvSvc);
87 ATH_MSG_ERROR(
"Error retrieving AthenaPoolCnvSvc " << cnvSvc);
88 return StatusCode::FAILURE;
92 return StatusCode::SUCCESS;
106 return StatusCode::SUCCESS;
112 SharedHiveEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
133 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Event Queue");
164 return StatusCode::FAILURE;
167 return StatusCode::SUCCESS;
173 SharedHiveEvtQueueConsumer::wait_once(
pid_t&
pid)
182 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
183 decodeProcessResult(presult,
false);
184 if(presult) free(presult->output.data);
191 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
192 res = decodeProcessResult(presult,
true);
193 if(presult) free(presult->output.data);
195 if(
res)
return StatusCode::FAILURE;
211 <<
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS")
212 <<
". Number of events processed: ";
227 std::ostringstream workerIndex;
231 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
237 std::unique_ptr<AthenaInterprocess::ScheduledWork>
241 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
249 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
251 sigsuspend (&oldmask);
252 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
257 *(
int*)(outwork->
data) = 1;
266 IIncidentSvc* p_incidentSvc(0);
267 if(!serviceLocator()->service(
"IncidentSvc", p_incidentSvc).isSuccess()) {
271 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
280 std::ostringstream workindex;
288 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
289 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
297 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
303 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
307 if(std::filesystem::is_regular_file(
"SimParams.db"))
308 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
309 if(std::filesystem::is_regular_file(
"DigitParams.db"))
310 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
311 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
312 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
322 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
327 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_dataShare);
328 if (propertyServer==0 || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
329 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
332 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
337 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
345 IService* evtSelSvc =
dynamic_cast<IService*
>(
m_evtSelector);
347 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
350 if(!evtSelSvc->start().isSuccess()) {
358 if(chdir(worker_rundir.string().c_str())==-1) {
359 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
367 *(
int*)(outwork->
data) = 0;
375 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
386 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
387 if(propertyServer==0) {
392 std::string propertyName(
"SkipEvents");
393 IntegerProperty skipEventsProp(propertyName,
skipEvents);
394 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
395 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
404 ATH_MSG_FATAL(
"Failed to acquire IHybridProcessorHelper interface");
406 return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
413 long intmask =
pow(0x100,
sizeof(
int))-1;
414 long evtnumAndChunk(0);
426 bool loop_ended = (evtnumAndChunk<0);
428 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
430 evtnum = evtnumAndChunk & intmask;
435 bool no_more_events =
false;
447 sc = StatusCode::FAILURE;
453 if (
sc.isFailure()) {
454 ATH_MSG_ERROR(
"Terminating event processing loop due to errors");
465 if(evtnumAndChunk<0) {
466 no_more_events =
true;
467 evtnumAndChunk *= -1;
468 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
471 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
473 evtnum = evtnumAndChunk & intmask;
478 if(!no_more_events) {
493 sc = StatusCode::FAILURE;
500 sc = StatusCode::SUCCESS;
525 *(
int*)(
outdata) = (all_ok?0:1);
527 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
528 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&createdEvts,
sizeof(
int));
531 outwork->
size = outsize;
541 std::unique_ptr<AthenaInterprocess::ScheduledWork>
544 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
552 if(
m_appMgr->finalize().isFailure()) {
553 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
563 *(
int*)(
outdata) = (all_ok?0:1);
565 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
567 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
570 outwork->
size = outsize;
580 if(!presult)
return 0;
586 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
590 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
592 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events");
597 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
630 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" <<
pid <<
" was expected");
647 ISvcManager* pISM(
dynamic_cast<ISvcManager*
>(serviceLocator().
get()));
653 <<
" from SvcManager");
661 return StatusCode::FAILURE;
664 m_schedulerSvc = serviceLocator()->service(
"AvalancheSchedulerSvc");
692 return StatusCode::SUCCESS;