14 #include "GaudiKernel/IEvtSelector.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IFileMgr.h"
17 #include "GaudiKernel/IChronoStatSvc.h"
18 #include "GaudiKernel/ISvcLocator.h"
19 #include "GaudiKernel/IIncidentSvc.h"
20 #include "GaudiKernel/IConversionSvc.h"
34 ,
const std::string&
name
35 ,
const IInterface*
parent)
37 , m_useSharedReader(false)
38 , m_useSharedWriter(false)
39 , m_isRoundRobin(false)
40 , m_nEventsBeforeFork(0)
44 , m_chronoStatSvc(
"ChronoStatSvc",
name)
46 , m_evtSelSeek(nullptr)
47 , m_evtContext(nullptr)
49 , m_dataShare(nullptr)
50 , m_sharedEventQueue(nullptr)
51 , m_sharedRankQueue(nullptr)
52 , m_readEventOrders(false)
53 , m_eventOrdersFile(
"athenamp_eventorders.txt")
54 , m_masterPid(getpid())
56 declareInterface<IAthenaMPTool>(
this);
84 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85 return StatusCode::FAILURE;
99 return StatusCode::FAILURE;
105 IConversionSvc* cnvSvc{
nullptr};
106 sc = serviceLocator()->service(
"AthenaPoolCnvSvc",cnvSvc);
110 ATH_MSG_ERROR(
"Error retrieving AthenaPoolCnvSvc " << cnvSvc);
111 return StatusCode::FAILURE;
118 return StatusCode::SUCCESS;
131 srand((
unsigned)
time(0));
132 std::ostringstream randname;
134 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
139 std::filesystem::rename(ordersFile,ordersFileBakpath);
145 std::ostringstream workerIndex;
149 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
150 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
151 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
153 while(fs_worker.good()) {
154 std::getline(fs_worker,
line);
168 return StatusCode::SUCCESS;
171 int SharedEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
192 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Event Queue");
220 return StatusCode::FAILURE;
223 return StatusCode::SUCCESS;
235 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
236 decodeProcessResult(presult,
false);
237 if(presult) free(presult->output.data);
245 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
246 res = decodeProcessResult(presult,
true);
247 if(presult) free(presult->output.data);
249 if(
res)
return StatusCode::FAILURE;
262 <<
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS")
263 <<
". Number of events processed: ";
267 msg(MSG::INFO) <<
it->second.first
268 <<
", Event Loop Time: " <<
it->second.second <<
"sec."
277 std::ostringstream workerIndex;
281 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
291 *(
int*)(outwork->
data) = 1;
296 std::map<IService*,int> bkgEvtSelectors;
299 for(IService* ptrSvc : serviceLocator()->getServices()) {
300 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
308 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
313 bkgEvtSelectors.emplace(ptrSvc,0);
326 IIncidentSvc* p_incidentSvc(0);
327 if(!serviceLocator()->service(
"IncidentSvc", p_incidentSvc).isSuccess()) {
331 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
340 std::ostringstream workindex;
348 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
358 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
365 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
369 if(std::filesystem::is_regular_file(
"SimParams.db"))
370 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
371 if(std::filesystem::is_regular_file(
"DigitParams.db"))
372 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
373 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
374 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
384 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
390 ATH_MSG_ERROR(
"Failed to make the event selector a share client");
394 ATH_MSG_DEBUG(
"Successfully made the event selector a share client");
399 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_dataShare);
400 if (propertyServer==0 || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
401 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
405 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
410 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
420 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
421 if(!propertyServer) {
426 std::string propertyName(
"SkipEvents");
428 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
438 IService* evtSelSvc =
dynamic_cast<IService*
>(
m_evtSelector);
440 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
443 if(!evtSelSvc->start().isSuccess()) {
457 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
468 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
469 if(evtsel->start().isSuccess()) {
473 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
479 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
494 if(
line.empty())
continue;
498 int rank = std::stoi(
line,&
idx);
500 msg(MSG::INFO) <<
"This worker will proces the following events #";
503 int evtnum = std::stoi(
line,&
idx);
505 msg(MSG::INFO) <<
" " << evtnum;
523 if(chdir(worker_rundir.string().c_str())==-1) {
524 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
532 *(
int*)(outwork->
data) = 0;
538 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
542 long intmask =
pow(0x100,
sizeof(
int))-1;
544 int nEventsProcessed(0);
545 long evtnumAndChunk(0);
547 unsigned evtCounter(0);
561 while(evtnumAndChunk>0) {
566 evtnumAndChunk *= -1;
569 System::ProcessTime time_start = System::getProcessTime();
573 bool firstOrder(
true);
585 evtnum = *predefinedEvt;
587 fs << (firstOrder?
":":
",") << evtnum;
590 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
599 if(evtnumAndChunk<=0) {
600 evtnumAndChunk *= -1;
601 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
604 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
606 evtnum = evtnumAndChunk & intmask;
611 fs << (firstOrder?
":":
",") << evtnum+
i;
665 System::ProcessTime time_delta = System::getProcessTime() - time_start;
666 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
692 *(
int*)(
outdata) = (all_ok?0:1);
694 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
695 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
696 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
698 outwork->
size = outsize;
708 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
717 if(
m_appMgr->finalize().isFailure()) {
718 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
728 *(
int*)(
outdata) = (all_ok?0:1);
730 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
732 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
734 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
737 outwork->
size = outsize;
744 if(!presult)
return 0;
750 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
755 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
756 memcpy(&elapsed,(
char*)
output.data+2*+
sizeof(
int)+
sizeof(func),
sizeof(
TimeValType));
757 m_eventStat[presult->
pid]=std::pair<int,TimeValType>(nevt,elapsed);
758 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events in " << elapsed <<
"sec.");
763 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
800 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" <<
pid <<
" was expected");