14 #include "GaudiKernel/IEvtSelector.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IFileMgr.h"
17 #include "GaudiKernel/IChronoStatSvc.h"
18 #include "GaudiKernel/ISvcLocator.h"
19 #include "GaudiKernel/IIncidentSvc.h"
20 #include "GaudiKernel/IConversionSvc.h"
34 ,
const std::string&
name
35 ,
const IInterface*
parent)
37 , m_useSharedReader(false)
38 , m_useSharedWriter(false)
39 , m_isRoundRobin(false)
40 , m_nEventsBeforeFork(0)
44 , m_chronoStatSvc(
"ChronoStatSvc",
name)
46 , m_evtSelSeek(nullptr)
47 , m_evtContext(nullptr)
49 , m_dataShare(nullptr)
50 , m_sharedEventQueue(nullptr)
51 , m_sharedRankQueue(nullptr)
52 , m_readEventOrders(false)
53 , m_eventOrdersFile(
"athenamp_eventorders.txt")
54 , m_masterPid(getpid())
56 declareInterface<IAthenaMPTool>(
this);
84 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85 return StatusCode::FAILURE;
100 return StatusCode::FAILURE;
107 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolCnvSvc"));
111 return StatusCode::FAILURE;
118 return StatusCode::SUCCESS;
131 srand((
unsigned)
time(0));
132 std::ostringstream randname;
134 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
139 std::filesystem::rename(ordersFile,ordersFileBakpath);
145 std::ostringstream workerIndex;
149 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
150 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
151 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
153 while(fs_worker.good()) {
154 std::getline(fs_worker,
line);
168 return StatusCode::SUCCESS;
171 int SharedEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
192 ATH_MSG_ERROR(
"Unable to retrieve the pointer to Shared Event Queue");
220 return StatusCode::FAILURE;
223 return StatusCode::SUCCESS;
235 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
236 decodeProcessResult(presult,
false);
237 if(presult) free(presult->output.data);
245 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
246 res = decodeProcessResult(presult,
true);
247 if(presult) free(presult->output.data);
249 if(
res)
return StatusCode::FAILURE;
262 <<
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS")
263 <<
". Number of events processed: ";
267 msg(MSG::INFO) <<
it->second.first
268 <<
", Event Loop Time: " <<
it->second.second <<
"sec."
277 std::ostringstream workerIndex;
281 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
291 *(
int*)(outwork->
data) = 1;
296 std::map<IService*,int> bkgEvtSelectors;
299 for(IService* ptrSvc : serviceLocator()->getServices()) {
300 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
308 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
313 bkgEvtSelectors.emplace(ptrSvc,0);
326 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
331 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
340 std::ostringstream workindex;
348 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
358 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
365 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
369 if(std::filesystem::is_regular_file(
"SimParams.db"))
370 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
371 if(std::filesystem::is_regular_file(
"DigitParams.db"))
372 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
373 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
374 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
384 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
390 ATH_MSG_ERROR(
"Failed to make the event selector a share client");
394 ATH_MSG_DEBUG(
"Successfully made the event selector a share client");
400 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
401 ATH_MSG_ERROR(
"Could not change AthenaPoolCnvSvc MakeClient Property");
405 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
410 if(!
m_ioMgr->io_reinitialize().isSuccess()) {
421 if(!propertyServer) {
426 std::string propertyName(
"SkipEvents");
428 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
440 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService");
443 if(!evtSelSvc->start().isSuccess()) {
458 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
469 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
470 if(evtsel->start().isSuccess()) {
472 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
473 if(evtselseek->seek(*
m_evtContext,curEvt).isFailure()) {
474 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
480 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
495 if(
line.empty())
continue;
499 int rank = std::stoi(
line,&
idx);
501 msg(MSG::INFO) <<
"This worker will proces the following events #";
504 int evtnum = std::stoi(
line,&
idx);
506 msg(MSG::INFO) <<
" " << evtnum;
524 if(chdir(worker_rundir.string().c_str())==-1) {
525 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
533 *(
int*)(outwork->
data) = 0;
539 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
543 long intmask =
pow(0x100,
sizeof(
int))-1;
545 int nEventsProcessed(0);
546 long evtnumAndChunk(0);
548 unsigned evtCounter(0);
562 while(evtnumAndChunk>0) {
567 evtnumAndChunk *= -1;
570 System::ProcessTime time_start = System::getProcessTime();
574 bool firstOrder(
true);
586 evtnum = *predefinedEvt;
588 fs << (firstOrder?
":":
",") << evtnum;
591 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
600 if(evtnumAndChunk<=0) {
601 evtnumAndChunk *= -1;
602 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
605 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
607 evtnum = evtnumAndChunk & intmask;
612 fs << (firstOrder?
":":
",") << evtnum+
i;
666 System::ProcessTime time_delta = System::getProcessTime() - time_start;
667 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
693 *(
int*)(
outdata) = (all_ok?0:1);
695 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
696 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
697 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
699 outwork->
size = outsize;
709 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
718 if(
m_appMgr->finalize().isFailure()) {
719 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
729 *(
int*)(
outdata) = (all_ok?0:1);
731 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
733 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
735 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
738 outwork->
size = outsize;
745 if(!presult)
return 0;
751 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
756 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
757 memcpy(&elapsed,(
char*)
output.data+2*+
sizeof(
int)+
sizeof(func),
sizeof(
TimeValType));
758 m_eventStat[presult->
pid]=std::pair<int,TimeValType>(nevt,elapsed);
759 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events in " << elapsed <<
"sec.");
764 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
801 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" <<
pid <<
" was expected");