15 #include "GaudiKernel/IEvtSelector.h" 
   16 #include "GaudiKernel/IIoComponentMgr.h" 
   17 #include "GaudiKernel/IFileMgr.h" 
   18 #include "GaudiKernel/IChronoStatSvc.h" 
   19 #include "GaudiKernel/ISvcLocator.h" 
   20 #include "GaudiKernel/IIncidentSvc.h" 
   21 #include "GaudiKernel/IConversionSvc.h" 
   35                            , 
const std::string& 
name 
   36                            , 
const IInterface* 
parent)
 
   38   , m_chronoStatSvc(
"ChronoStatSvc", 
name)
 
   39   , m_masterPid(getpid())
 
   59       ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
 
   60       return StatusCode::FAILURE;
 
   75         return StatusCode::FAILURE;
 
   83       m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
 
   86         return StatusCode::FAILURE;
 
   93   return StatusCode::SUCCESS;
 
  106       srand((
unsigned)
time(0));
 
  107       std::ostringstream randname;
 
  109       std::string ordersFileBak = 
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
 
  120       std::ostringstream workerIndex;
 
  124       std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
 
  125       ATH_MSG_INFO(
"Processing " << ordersFileWorker << 
" ...");
 
  126       std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
 
  128       while(fs_worker.good()) {
 
  129     std::getline(fs_worker,
line);
 
  142   return StatusCode::SUCCESS;
 
  145 int SharedEvtQueueConsumer::makePool(
int, 
int nprocs, 
const std::string& topdir)
 
  189     return StatusCode::FAILURE;
 
  192   return StatusCode::SUCCESS;
 
  204       if(presult && (
unsigned)(presult->output.size)>
sizeof(
int)) 
 
  205     decodeProcessResult(presult,
false);
 
  206       if(presult) free(presult->output.data);
 
  214     if(presult && (
unsigned)(presult->output.size)>
sizeof(
int)) 
 
  215       res = decodeProcessResult(presult,
true);
 
  216     if(presult) free(presult->output.data);
 
  218     if(
res) 
return StatusCode::FAILURE;
 
  231            << 
". Status " << ((
statuses[
i].exitcode)?
"FAILURE":
"SUCCESS") 
 
  232            << 
". Number of events processed: ";
 
  237              << 
", Event Loop Time: " << 
it->second.second << 
"sec." 
  246     std::ostringstream workerIndex;
 
  250     filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
 
  260   *(
int*)(outwork->
data) = 1; 
 
  265   std::map<IService*,int> bkgEvtSelectors;
 
  268     for(IService* ptrSvc : serviceLocator()->getServices()) {
 
  269       IEvtSelector* evtsel = 
dynamic_cast<IEvtSelector*
>(ptrSvc);
 
  277             ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->
name());
 
  282           bkgEvtSelectors.emplace(ptrSvc,0);
 
  295   SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
 
  300   p_incidentSvc->fireIncident(Incident(
name(),
"PostFork"));
 
  309   std::ostringstream workindex;
 
  317   if(
mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
 
  318     ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() << 
". " << 
fmterror(errno));
 
  327     ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
 
  334   ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
 
  338   if(std::filesystem::is_regular_file(
"SimParams.db"))
 
  339     COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
 
  340   if(std::filesystem::is_regular_file(
"DigitParams.db"))
 
  341     COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
 
  342   if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
 
  343     COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
 
  353   ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
 
  363     if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient", 
m_rankId + 1).isFailure()) {
 
  364       ATH_MSG_ERROR(
"Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
 
  368       ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
 
  378     ATH_CHECK( propertyServer.isValid(), outwork);
 
  381     if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
 
  382       ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
 
  390     ATH_CHECK( evtSelSvc.isValid(), outwork);
 
  400       ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
 
  411     for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
 
  412       if(evtsel->start().isSuccess()) {
 
  414       SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
 
  415       if(evtselseek->seek(*
m_evtContext,curEvt).isFailure()) {
 
  416         ATH_MSG_ERROR(
"Failed to seek to " << curEvt << 
" in the BKG Event Selector " << evtsel->name());
 
  422     ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
 
  437     if(
line.empty())
continue;
 
  441     int rank = std::stoi(
line,&
idx);
 
  443       msg(
MSG::INFO) << 
"This worker will proces the following events #";
 
  446         int evtnum = std::stoi(
line,&
idx);
 
  466   if(chdir(worker_rundir.string().c_str())==-1) {
 
  467     ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
 
  475   *(
int*)(outwork->
data) = 0;
 
  481   ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
 
  485   long intmask = 
pow(0x100,
sizeof(
int))-1; 
 
  487   int nEventsProcessed(0);
 
  488   long evtnumAndChunk(0);
 
  490   unsigned evtCounter(0);
 
  491   int evtnum(0), chunkSize(1);
 
  501   System::ProcessTime time_start = System::getProcessTime();
 
  505     bool firstOrder(
true);
 
  517       evtnum = *predefinedEvt;
 
  519       fs << (firstOrder?
":":
",") << evtnum;
 
  522       ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
 
  531       if(evtnumAndChunk<=0) {
 
  532         evtnumAndChunk *= -1;
 
  533         ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
 
  536       ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
 
  537       chunkSize = evtnumAndChunk >> (
sizeof(
int)*8);
 
  538       evtnum = evtnumAndChunk & intmask;
 
  539       ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum << 
" chunk size=" << chunkSize);
 
  542       for(
int i(0);
i<chunkSize;++
i) {
 
  543         fs << (firstOrder?
":":
",") << evtnum+
i;
 
  582       nEventsProcessed += chunkSize;
 
  588       ATH_MSG_ERROR(
"Unable to process the chunk (" << evtnum << 
"," << evtnum+chunkSize-1 << 
")");
 
  601   System::ProcessTime time_delta = System::getProcessTime() - time_start;
 
  602   TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
 
  628   *(
int*)(
outdata) = (all_ok?0:1); 
 
  630   memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
 
  631   memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
 
  632   memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
 
  634   outwork->
size = outsize;
 
  644   ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
 
  653     if(
m_appMgr->finalize().isFailure()) {
 
  654       std::cerr << 
"Unable to finalize AppMgr" << std::endl;
 
  664   *(
int*)(
outdata) = (all_ok?0:1); 
 
  666   memcpy((
char*)
outdata+
sizeof(
int),&func,
sizeof(func));
 
  668   memcpy((
char*)
outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
 
  670   memcpy((
char*)
outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
 
  673   outwork->
size = outsize;
 
  680   if(!presult) 
return 0;
 
  686   memcpy(&func,(
char*)
output.data+
sizeof(
int),
sizeof(func));
 
  691     memcpy(&nevt,(
char*)
output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
 
  692     memcpy(&elapsed,(
char*)
output.data+2*+
sizeof(
int)+
sizeof(func),
sizeof(
TimeValType));
 
  693     m_eventStat[presult->
pid]=std::pair<int,TimeValType>(nevt,elapsed);
 
  694     ATH_MSG_DEBUG(
"PID=" << presult->
pid << 
" processed " << nevt << 
" events in " << elapsed << 
"sec.");
 
  699       ATH_MSG_DEBUG(
"Added PID=" << presult->
pid << 
" to the finalization queue");
 
  736       ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid << 
" while PID=" << 
pid << 
" was expected");