15#include "GaudiKernel/IEvtSelector.h"
16#include "GaudiKernel/IIoComponentMgr.h"
17#include "GaudiKernel/IFileMgr.h"
18#include "GaudiKernel/IChronoStatSvc.h"
19#include "GaudiKernel/ISvcLocator.h"
20#include "GaudiKernel/IIncidentSvc.h"
21#include "GaudiKernel/IConversionSvc.h"
35 ,
const std::string& name
36 ,
const IInterface* parent)
59 ATH_MSG_ERROR(
"Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
60 return StatusCode::FAILURE;
75 return StatusCode::FAILURE;
83 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
86 return StatusCode::FAILURE;
93 return StatusCode::SUCCESS;
105 if(std::filesystem::exists(ordersFile)) {
106 srand((
unsigned)time(0));
107 std::ostringstream randname;
109 std::string ordersFileBak =
m_eventOrdersFile+std::string(
"-bak-")+randname.str();
113 std::filesystem::path ordersFileBakpath(ordersFileBak);
114 std::filesystem::rename(ordersFile,ordersFileBakpath);
120 std::ostringstream workerIndex;
124 std::string ordersFileWorker(worker_rundir.string()+std::string(
"/")+
m_eventOrdersFile);
125 ATH_MSG_INFO(
"Processing " << ordersFileWorker <<
" ...");
126 std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
128 while(fs_worker.good()) {
129 std::getline(fs_worker,line);
130 fs << line << std::endl;
142 return StatusCode::SUCCESS;
145int SharedEvtQueueConsumer::makePool(
int,
int nprocs,
const std::string& topdir)
149 if(nprocs==0 || nprocs<-1) {
150 ATH_MSG_ERROR(
"Invalid value for the nprocs parameter: " << nprocs);
184StatusCode SharedEvtQueueConsumer::exec()
189 return StatusCode::FAILURE;
192 return StatusCode::SUCCESS;
195StatusCode SharedEvtQueueConsumer::wait_once(
pid_t& pid)
198 AthenaInterprocess::ProcessResult* presult(0);
204 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
205 decodeProcessResult(presult,
false);
206 if(presult) free(presult->output.data);
214 if(presult && (
unsigned)(presult->output.size)>
sizeof(
int))
215 res = decodeProcessResult(presult,
true);
216 if(presult) free(presult->output.data);
218 if(
res)
return StatusCode::FAILURE;
226 const std::vector<AthenaInterprocess::ProcessStatus>& statuses =
m_processGroup->getStatuses();
227 for(
size_t i=0; i<statuses.size(); ++i) {
230 msg(MSG::INFO) <<
"*** Process PID=" << statuses[i].pid
231 <<
". Status " << ((statuses[i].exitcode)?
"FAILURE":
"SUCCESS")
232 <<
". Number of events processed: ";
236 msg(MSG::INFO) << it->second.first
237 <<
", Event Loop Time: " << it->second.second <<
"sec."
246 std::ostringstream workerIndex;
250 filenames.push_back(worker_rundir.string()+std::string(
"/AthenaMP.log"));
260 *(
int*)(outwork->data) = 1;
261 outwork->size =
sizeof(int);
265 std::map<IService*,int> bkgEvtSelectors;
268 for(IService* ptrSvc : serviceLocator()->getServices()) {
269 IEvtSelector* evtsel =
dynamic_cast<IEvtSelector*
>(ptrSvc);
277 ATH_MSG_ERROR(
"Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
282 bkgEvtSelectors.emplace(ptrSvc,0);
295 SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service(
"IncidentSvc"));
300 p_incidentSvc->fireIncident(Incident(name(),
"PostFork"));
309 std::ostringstream workindex;
317 if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
318 ATH_MSG_ERROR(
"Unable to make worker run directory: " << worker_rundir.string() <<
". " <<
fmterror(errno));
327 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event worker PID=" << getpid());
334 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event worker PID=" << getpid());
337 std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
338 if(std::filesystem::is_regular_file(
"SimParams.db"))
339 COPY_FILE_HACK(
"SimParams.db", abs_worker_rundir.string()+
"/SimParams.db");
340 if(std::filesystem::is_regular_file(
"DigitParams.db"))
341 COPY_FILE_HACK(
"DigitParams.db", abs_worker_rundir.string()+
"/DigitParams.db");
342 if(std::filesystem::is_regular_file(
"PDGTABLE.MeV"))
343 COPY_FILE_HACK(
"PDGTABLE.MeV", abs_worker_rundir.string()+
"/PDGTABLE.MeV");
353 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
363 if (!propertyServer || propertyServer->setProperty(
"MakeStreamingToolClient",
m_rankId + 1).isFailure()) {
364 ATH_MSG_ERROR(
"Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
368 ATH_MSG_DEBUG(
"Successfully made the conversion service a share client");
378 ATH_CHECK( propertyServer.isValid(), outwork);
381 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
382 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property");
390 ATH_CHECK( evtSelSvc.isValid(), outwork);
400 ATH_MSG_ERROR(
"Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
411 for(
auto [evtsel,curEvt] : bkgEvtSelectors) {
412 if(evtsel->start().isSuccess()) {
414 SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
415 if(evtselseek->seek(*
m_evtContext,curEvt).isFailure()) {
416 ATH_MSG_ERROR(
"Failed to seek to " << curEvt <<
" in the BKG Event Selector " << evtsel->name());
422 ATH_MSG_ERROR(
"Failed to restart BKG Event Selector " << evtsel->name());
436 std::getline(
fs,line);
437 if(line.empty())
continue;
441 int rank = std::stoi(line,&idx);
443 msg(MSG::INFO) <<
"This worker will proces the following events #";
444 while(idx<line.size()-1) {
445 line = line.substr(idx+1);
446 int evtnum = std::stoi(line,&idx);
448 msg(MSG::INFO) <<
" " << evtnum;
466 if(chdir(worker_rundir.string().c_str())==-1) {
467 ATH_MSG_ERROR(
"Failed to chdir to " << worker_rundir.string());
475 *(
int*)(outwork->data) = 0;
481 ATH_MSG_INFO(
"Exec function in the AthenaMP worker PID=" << getpid());
485 long intmask =
pow(0x100,
sizeof(
int))-1;
487 int nEventsProcessed(0);
488 long evtnumAndChunk(0);
490 unsigned evtCounter(0);
491 int evtnum(0), chunkSize(1);
496 if(std::filesystem::exists(ordersFile)) {
501 System::ProcessTime time_start = System::getProcessTime();
505 bool firstOrder(
true);
517 evtnum = *predefinedEvt;
519 fs << (firstOrder?
":":
",") << evtnum;
522 ATH_MSG_INFO(
"Read event number from the orders file: " << evtnum);
531 if(evtnumAndChunk<=0) {
532 evtnumAndChunk *= -1;
533 ATH_MSG_DEBUG(
"No more events are expected. The total number of events for this job = " << evtnumAndChunk);
536 ATH_MSG_DEBUG(
"Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
537 chunkSize = evtnumAndChunk >> (
sizeof(int)*8);
538 evtnum = evtnumAndChunk & intmask;
539 ATH_MSG_INFO(
"Received from the queue: event num=" << evtnum <<
" chunk size=" << chunkSize);
542 for(
int i(0);i<chunkSize;++i) {
543 fs << (firstOrder?
":":
",") << evtnum+i;
582 nEventsProcessed += chunkSize;
588 ATH_MSG_ERROR(
"Unable to process the chunk (" << evtnum <<
"," << evtnum+chunkSize-1 <<
")");
601 System::ProcessTime time_delta = System::getProcessTime() - time_start;
602 TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
628 *(
int*)(outdata) = (all_ok?0:1);
630 memcpy((
char*)outdata+
sizeof(
int),&func,
sizeof(func));
631 memcpy((
char*)outdata+
sizeof(
int)+
sizeof(func),&nEventsProcessed,
sizeof(
int));
632 memcpy((
char*)outdata+2*
sizeof(
int)+
sizeof(func),&elapsedTime,
sizeof(elapsedTime));
633 outwork->data = outdata;
634 outwork->size = outsize;
644 ATH_MSG_INFO(
"Fin function in the AthenaMP worker PID=" << getpid());
653 if(
m_appMgr->finalize().isFailure()) {
654 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
664 *(
int*)(outdata) = (all_ok?0:1);
666 memcpy((
char*)outdata+
sizeof(
int),&func,
sizeof(func));
668 memcpy((
char*)outdata+
sizeof(
int)+
sizeof(func),&nEvt,
sizeof(
int));
670 memcpy((
char*)outdata+2*
sizeof(
int)+
sizeof(func),&elapsed,
sizeof(elapsed));
672 outwork->data = outdata;
673 outwork->size = outsize;
680 if(!presult)
return 0;
682 ATH_MSG_DEBUG(
"Decoding the output of PID=" << presult->
pid <<
" with the size=" << output.size);
686 memcpy(&func,(
char*)output.data+
sizeof(
int),
sizeof(func));
691 memcpy(&nevt,(
char*)output.data+
sizeof(
int)+
sizeof(func),
sizeof(
int));
692 memcpy(&elapsed,(
char*)output.data+2*+
sizeof(
int)+
sizeof(func),
sizeof(
TimeValType));
693 m_eventStat[presult->
pid]=std::pair<int,TimeValType>(nevt,elapsed);
694 ATH_MSG_DEBUG(
"PID=" << presult->
pid <<
" processed " << nevt <<
" events in " << elapsed <<
"sec.");
699 ATH_MSG_DEBUG(
"Added PID=" << presult->
pid <<
" to the finalization queue");
718 if(pid==presult->
pid) {
736 ATH_MSG_ERROR(
"Finalized PID=" << presult->
pid <<
" while PID=" << pid <<
" was expected");
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
Abstract interface for seeking within an event stream.
Extension to IEvtSelector to allow for seeking.
std::pair< std::vector< unsigned int >, bool > res
constexpr int pow(int base, int exp) noexcept
Abstract interface for seeking for an event selector.
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
virtual StatusCode finalize() override
virtual StatusCode initialize() override
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
SharedEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
SmartIF< IEventShare > m_evtShare
std::queue< pid_t > m_finQueue
Gaudi::Property< std::string > m_eventOrdersFile
SmartIF< IEventSeek > m_evtSeek
std::vector< int > m_eventOrders
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_readEventOrders
Gaudi::Property< bool > m_useSharedWriter
Gaudi::Property< bool > m_useSharedReader
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual void reportSubprocessStatuses() override
SmartIF< IDataShare > m_dataShare
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Gaudi::Property< bool > m_isRoundRobin
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Gaudi::Property< bool > m_debug
IEvtSelector::Context * m_evtContext
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual ~SharedEvtQueueConsumer() override
System::ProcessTime::TimeValueType TimeValType
virtual void subProcessLogs(std::vector< std::string > &) override
#define COPY_FILE_HACK(_src, _dest)
void * xmalloc(size_t size)
Trapping version of malloc.
::StatusCode StatusCode
StatusCode definition for legacy code.
retrieve(aClass, aKey=None)
Trapping version of malloc.