14 #include "GaudiKernel/IEvtSelector.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IFileMgr.h"
17 #include "GaudiKernel/IChronoStatSvc.h"
18 #include "GaudiKernel/ISvcLocator.h"
19 #include "GaudiKernel/IIncidentSvc.h"
20 #include "GaudiKernel/IConversionSvc.h"
43 ,
const std::string&
name
44 ,
const IInterface*
parent)
46 , m_chronoStatSvc(
"ChronoStatSvc",
name)
72 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
75 return StatusCode::FAILURE;
79 return StatusCode::SUCCESS;
92 return StatusCode::SUCCESS;
98 SharedHiveEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
119 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Event Queue");
150 return StatusCode::FAILURE;
153 return StatusCode::SUCCESS;
159 SharedHiveEvtQueueConsumer::wait_once(
pid_t&
pid)
168 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
169 decodeProcessResult(presult,
false);
170 if(presult) free(presult->output.data);
177 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
178 res = decodeProcessResult(presult,
true);
179 if(presult) free(presult->output.data);
181 if(
res)
return StatusCode::FAILURE;
197 <<
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS")
198 <<
". Number of events processed: ";
213 std::ostringstream workerIndex;
217 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
223 std::unique_ptr<AthenaInterprocess::ScheduledWork>
227 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
235 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
237 sigsuspend (&oldmask);
238 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
243 *(
int*)(outwork->
data) = 1;
252 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
253 if (!p_incidentSvc) {
257 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
266 std::ostringstream workindex;
274 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
275 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
283 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
289 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
293 if(std::filesystem::is_regular_file(
"SimParams.db"))
294 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
295 if(std::filesystem::is_regular_file(
"DigitParams.db"))
296 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
297 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
298 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
308 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
314 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
315 ATH_MSG_ERROR(
"Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
318 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
327 ATH_CHECK( evtSelSvc.isValid(), outwork );
328 ATH_CHECK( evtSelSvc->start(), outwork );
331 if(chdir(worker_rundir.string().c_str())==-1) {
332 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
340 *(
int*)(outwork->
data) = 0;
348 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
360 if(propertyServer==0) {
365 std::string propertyName(
"SkipEvents");
366 IntegerProperty skipEventsProp(std::move(propertyName),
skipEvents);
367 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
368 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
377 ATH_MSG_FATAL(
"Failed to acquire IHybridProcessorHelper interface");
379 return std::unique_ptr<AthenaInterprocess::ScheduledWork>();
386 long intmask =
pow(0x100,
sizeof(
int))-1;
387 long evtnumAndChunk(0);
389 int evtnum(0), chunkSize(1);
399 bool loop_ended = (evtnumAndChunk<0);
401 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
402 chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
403 evtnum = evtnumAndChunk & intmask;
404 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
408 bool no_more_events =
false;
420 sc = StatusCode::FAILURE;
426 if (
sc.isFailure()) {
427 ATH_MSG_ERROR(
"Terminating event processing loop due to errors");
438 if(evtnumAndChunk<0) {
439 no_more_events =
true;
440 evtnumAndChunk *= -1;
441 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
444 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
445 chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
446 evtnum = evtnumAndChunk & intmask;
447 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
451 if(!no_more_events) {
466 sc = StatusCode::FAILURE;
473 sc = StatusCode::SUCCESS;
498 *(
int*)(
outdata) = (all_ok?0:1);
500 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
501 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&createdEvts,
sizeof(
int));
504 outwork->
size = outsize;
514 std::unique_ptr<AthenaInterprocess::ScheduledWork>
517 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
525 if(
m_appMgr->finalize().isFailure()) {
526 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
536 *(
int*)(
outdata) = (all_ok?0:1);
538 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
540 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
543 outwork->
size = outsize;
553 if(!presult)
return 0;
559 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
563 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
565 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events");
570 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
603 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" <<
pid <<
" was expected");
620 ISvcManager* pISM(
dynamic_cast<ISvcManager*
>(serviceLocator().
get()));
626 <<
" from SvcManager");
634 return StatusCode::FAILURE;
637 m_schedulerSvc = serviceLocator()->service(
"AvalancheSchedulerSvc");
665 return StatusCode::SUCCESS;