13 #include "GaudiKernel/IEvtSelector.h"
14 #include "GaudiKernel/IIoComponentMgr.h"
15 #include "GaudiKernel/IFileMgr.h"
16 #include "GaudiKernel/IChronoStatSvc.h"
17 #include "GaudiKernel/ISvcLocator.h"
18 #include "GaudiKernel/IIncidentSvc.h"
19 #include "GaudiKernel/IConversionSvc.h"
42 ,
const std::string&
name
43 ,
const IInterface*
parent)
46 , m_chronoStatSvc(
"ChronoStatSvc",
name)
49 , m_sharedEventQueue(0)
50 , m_sharedRankQueue(0)
52 declareInterface<IAthenaMPTool>(
this);
79 SmartIF<IConversionSvc> cnvSvc(serviceLocator()->service(
"AthenaPoolCnvSvc"));
83 ATH_MSG_ERROR(
"Error retrieving AthenaPoolCnvSvc " << cnvSvc);
84 return StatusCode::FAILURE;
88 return StatusCode::SUCCESS;
102 return StatusCode::SUCCESS;
108 SharedHiveEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
129 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Event Queue");
160 return StatusCode::FAILURE;
163 return StatusCode::SUCCESS;
169 SharedHiveEvtQueueConsumer::wait_once(
pid_t&
pid)
178 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
179 decodeProcessResult(presult,
false);
180 if(presult) free(presult->output.data);
187 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
188 res = decodeProcessResult(presult,
true);
189 if(presult) free(presult->output.data);
191 if(
res)
return StatusCode::FAILURE;
207 <<
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS")
208 <<
". Number of events processed: ";
223 std::ostringstream workerIndex;
227 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
233 std::unique_ptr<AthenaInterprocess::ScheduledWork>
237 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
245 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
247 sigsuspend (&oldmask);
248 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
253 *(
int*)(outwork->
data) = 1;
262 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
263 if (!p_incidentSvc) {
267 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
276 std::ostringstream workindex;
284 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
285 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
293 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
299 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
303 if(std::filesystem::is_regular_file(
"SimParams.db"))
304 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
305 if(std::filesystem::is_regular_file(
"DigitParams.db"))
306 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
307 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
308 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
318 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
324 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
325 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
328 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
333 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
343 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
346 if(!evtSelSvc->start().isSuccess()) {
354 if(chdir(worker_rundir.string().c_str())==-1) {
355 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
363 *(
int*)(outwork->
data) = 0;
371 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
383 if(propertyServer==0) {
388 std::string propertyName(
"SkipEvents");
389 IntegerProperty skipEventsProp(propertyName,
skipEvents);
390 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
391 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
400 ATH_MSG_FATAL(
"Failed to acquire IHybridProcessorHelper interface");
402 return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
409 long intmask =
pow(0x100,
sizeof(
int))-1;
410 long evtnumAndChunk(0);
422 bool loop_ended = (evtnumAndChunk<0);
424 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
426 evtnum = evtnumAndChunk & intmask;
431 bool no_more_events =
false;
443 sc = StatusCode::FAILURE;
449 if (
sc.isFailure()) {
450 ATH_MSG_ERROR(
"Terminating event processing loop due to errors");
461 if(evtnumAndChunk<0) {
462 no_more_events =
true;
463 evtnumAndChunk *= -1;
464 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
467 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
469 evtnum = evtnumAndChunk & intmask;
474 if(!no_more_events) {
489 sc = StatusCode::FAILURE;
496 sc = StatusCode::SUCCESS;
521 *(
int*)(
outdata) = (all_ok?0:1);
523 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
524 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&createdEvts,
sizeof(
int));
527 outwork->
size = outsize;
537 std::unique_ptr<AthenaInterprocess::ScheduledWork>
540 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
548 if(
m_appMgr->finalize().isFailure()) {
549 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
559 *(
int*)(
outdata) = (all_ok?0:1);
561 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
563 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
566 outwork->
size = outsize;
576 if(!presult)
return 0;
582 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
586 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
588 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events");
593 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
626 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" <<
pid <<
" was expected");
643 ISvcManager* pISM(
dynamic_cast<ISvcManager*
>(serviceLocator().
get()));
649 <<
" from SvcManager");
657 return StatusCode::FAILURE;
660 m_schedulerSvc = serviceLocator()->service(
"AvalancheSchedulerSvc");
688 return StatusCode::SUCCESS;