15 #include "GaudiKernel/IEvtSelector.h"
16 #include "GaudiKernel/IIoComponentMgr.h"
17 #include "GaudiKernel/IFileMgr.h"
18 #include "GaudiKernel/IChronoStatSvc.h"
19 #include "GaudiKernel/ISvcLocator.h"
20 #include "GaudiKernel/IIncidentSvc.h"
21 #include "GaudiKernel/IConversionSvc.h"
35 ,
const std::string&
name
36 ,
const IInterface*
parent)
38 , m_chronoStatSvc(
"ChronoStatSvc",
name)
39 , m_masterPid(getpid())
59 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
60 return StatusCode::FAILURE;
75 return StatusCode::FAILURE;
83 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
86 return StatusCode::FAILURE;
93 return StatusCode::SUCCESS;
106 srand((
unsigned)
time(0));
107 std::ostringstream randname;
109 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
120 std::ostringstream workerIndex;
124 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
125 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
126 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
128 while(fs_worker.good()) {
129 std::getline(fs_worker,
line);
142 return StatusCode::SUCCESS;
145 int SharedEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
189 return StatusCode::FAILURE;
192 return StatusCode::SUCCESS;
204 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
205 decodeProcessResult(presult,
false);
206 if(presult) free(presult->output.data);
214 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
215 res = decodeProcessResult(presult,
true);
216 if(presult) free(presult->output.data);
218 if(
res)
return StatusCode::FAILURE;
231 <<
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS")
232 <<
". Number of events processed: ";
237 <<
", Event Loop Time: " <<
it->second.second <<
"sec."
246 std::ostringstream workerIndex;
250 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
260 *(
int*)(outwork->
data) = 1;
265 std::map<IService*,int> bkgEvtSelectors;
268 for(IService* ptrSvc : serviceLocator()->getServices()) {
269 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
277 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
282 bkgEvtSelectors.emplace(ptrSvc,0);
295 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
300 p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
309 std::ostringstream workindex;
317 if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
318 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
327 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
334 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
338 if(std::filesystem::is_regular_file(
"SimParams.db"))
339 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
340 if(std::filesystem::is_regular_file(
"DigitParams.db"))
341 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
342 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
343 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
353 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
363 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
364 ATH_MSG_ERROR(
"Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
368 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
378 ATH_CHECK( propertyServer.isValid(), outwork);
381 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
382 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
390 ATH_CHECK( evtSelSvc.isValid(), outwork);
400 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
411 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
412 if(evtsel->start().isSuccess()) {
414 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
415 if(evtselseek->seek(*
m_evtContext,curEvt).isFailure()) {
416 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
422 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
437 if(
line.empty())
continue;
441 int rank = std::stoi(
line,&
idx);
443 msg(
MSG::INFO) <<
"This worker will proces the following events #";
446 int evtnum = std::stoi(
line,&
idx);
466 if(chdir(worker_rundir.string().c_str())==-1) {
467 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
475 *(
int*)(outwork->
data) = 0;
481 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
485 long intmask =
pow(0x100,
sizeof(
int))-1;
487 int nEventsProcessed(0);
488 long evtnumAndChunk(0);
490 unsigned evtCounter(0);
491 int evtnum(0), chunkSize(1);
501 System::ProcessTime time_start = System::getProcessTime();
505 bool firstOrder(
true);
517 evtnum = *predefinedEvt;
519 fs << (firstOrder?
":":
",") << evtnum;
522 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
531 if(evtnumAndChunk<=0) {
532 evtnumAndChunk *= -1;
533 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
536 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
537 chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
538 evtnum = evtnumAndChunk & intmask;
539 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
542 for(
int i(0);
i<chunkSize;++
i) {
543 fs << (firstOrder?
":":
",") << evtnum+
i;
582 nEventsProcessed += chunkSize;
588 ATH_MSG_ERROR(
"Unable to process the chunk (" << evtnum <<
"," << evtnum+chunkSize-1 <<
")");
601 System::ProcessTime time_delta = System::getProcessTime() - time_start;
602 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
628 *(
int*)(
outdata) = (all_ok?0:1);
630 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
631 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
632 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
634 outwork->
size = outsize;
644 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
653 if(
m_appMgr->finalize().isFailure()) {
654 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
664 *(
int*)(
outdata) = (all_ok?0:1);
666 memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
668 memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
670 memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
673 outwork->
size = outsize;
680 if(!presult)
return 0;
686 memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
691 memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
692 memcpy(&elapsed,(
char*)
output.data+2*+
sizeof(
int)+
sizeof(func),
sizeof(
TimeValType));
693 m_eventStat[presult->
pid]=std::pair<int,TimeValType>(nevt,elapsed);
694 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events in " << elapsed <<
"sec.");
699 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
736 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" <<
pid <<
" was expected");