8#include "GaudiKernel/IEvtSelector.h"
9#include "GaudiKernel/IIncidentSvc.h"
10#include "GaudiKernel/FileIncident.h"
11#include "GaudiKernel/IIoComponentMgr.h"
12#include "GaudiKernel/ISvcLocator.h"
16#include <boost/interprocess/shared_memory_object.hpp>
17#include <boost/interprocess/mapped_region.hpp>
29 ,
const std::string& name
30 ,
const IInterface* parent)
31 : base_class(
type,name,parent)
33 m_subprocDirPrefix =
"evt_counter";
40int SharedEvtQueueProvider::makePool(
int maxevt,
int nprocs,
const std::string& topdir)
45 ATH_MSG_ERROR(
"Invalid number of events requested: " << maxevt );
60 m_nprocs = (
nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):
nprocs);
62 m_subprocTopDir = topdir;
65 ATH_MSG_DEBUG(
"Event queue name " <<
"AthenaMPEventQueue_" << m_randStr );
69 m_processGroup =
new AthenaInterprocess::ProcessGroup(1);
78StatusCode SharedEvtQueueProvider::exec()
85 return StatusCode::FAILURE;
89 if(m_processGroup->map_async(0,0)){
91 return StatusCode::FAILURE;
93 return StatusCode::SUCCESS;
99 std::filesystem::path counter_rundir(m_subprocTopDir);
100 counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
101 filenames.push_back(counter_rundir.string()+std::string(
"/AthenaMP.log"));
114 *(
int*)(outwork->data) = 1;
115 outwork->size =
sizeof(int);
123 std::filesystem::path counter_rundir(m_subprocTopDir);
124 counter_rundir /= std::filesystem::path(m_subprocDirPrefix);
126 if(mkdir(counter_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
127 ATH_MSG_ERROR(
"Unable to make event counter run directory: " << counter_rundir.string() <<
". " << fmterror(errno) );
132 if(redirectLog(counter_rundir.string()))
135 ATH_MSG_INFO(
"Logs redirected in the AthenaMP event event counter PID=" << getpid() );
138 if(updateIoReg(counter_rundir.string()))
141 ATH_MSG_INFO(
"Io registry updated in the AthenaMP event event counter PID=" << getpid() );
144 std::filesystem::path abs_counter_rundir = std::filesystem::absolute(counter_rundir);
145 if(handleSavedPfc(abs_counter_rundir))
152 ATH_MSG_INFO(
"File descriptors re-opened in the AthenaMP event event counter PID=" << getpid() );
156 SmartIF<IIncidentSvc> incsvc(serviceLocator()->service(
"IncidentSvc"));
162 incsvc->addListener(
this,
"EndInputFile");
168 m_evtShare = SmartIF<IEventShare>(m_evtSelector);
170 ATH_MSG_ERROR(
"Failed to dyncast event selector to IEventShare" );
173 if(!
m_evtShare->makeServer(m_nprocs+1).isSuccess()) {
174 ATH_MSG_ERROR(
"Failed to make the event selector a share server");
177 ATH_MSG_DEBUG(
"Successfully made the event selector a share server");
183 if(!m_ioMgr->io_reinitialize().isSuccess()) {
192 SmartIF<IService> evtSelSvc(m_evtSelector);
194 ATH_MSG_ERROR(
"Failed to dyncast event selector to IService" );
197 if(!evtSelSvc->start().isSuccess()) {
201 ATH_MSG_DEBUG(
"Successfully restarted the event selector" );
205 if(chdir(counter_rundir.string().c_str())==-1) {
206 ATH_MSG_ERROR(
"Failed to chdir to " << counter_rundir.string() );
211 *(
int*)(outwork->data) = 0;
217 ATH_MSG_INFO(
"Exec function in the AthenaMP Event Counter PID=" << getpid());
222 IEvtSelector::Context* evtContext(
nullptr);
226 SmartIF<IProperty> propertyServer(m_evtSelector);
227 if(propertyServer==0) {
228 ATH_MSG_ERROR(
"Unable to cast event selector to IProperty" );
232 std::string propertyName(
"SkipEvents");
233 IntegerProperty skipEventsProp(std::move(propertyName),skipEvents);
234 if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
235 ATH_MSG_INFO(
"Event Selector does not have SkipEvents property" );
238 skipEvents = skipEventsProp.value();
243 StatusCode
sc = m_evtSelector->createContext(evtContext);
245 ATH_MSG_ERROR(
"Failed to create the event selector context");
251 if(!m_evtSelector->next(*evtContext).isSuccess()) {
252 ATH_MSG_ERROR(
"Unexpected error: EventsBeforeFork>EventsInInputFiles");
269 while(!m_evtSelector || m_evtSelector->next(*evtContext).isSuccess()) {
291 <<
", Event Chunk size in the queue is " <<
m_nChunkSize);
302 if(m_appMgr->stop().isFailure()) {
307 if(m_appMgr->finalize().isFailure()) {
308 std::cerr <<
"Unable to finalize AppMgr" << std::endl;
317 *(
int*)(outwork->data) = (all_ok?0:1);
318 outwork->size =
sizeof(int);
332 *(
int*)(outwork->data) = 0;
333 outwork->size =
sizeof(int);
341 const FileIncident* fileInc =
dynamic_cast<const FileIncident*
>(&inc);
347 if(fileInc->type()==
"EndInputFile") {
364 ATH_MSG_INFO(
"Sent to the queue 0x" << std::hex << newValueForQueue << std::dec
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
int m_nEvtRequested
Max event received from AppMgr.
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
int m_nprocesses
We use this data member for adding negative numbers at the end of the event queue.
AthenaInterprocess::SharedQueue * m_sharedEventQueue
virtual ~SharedEvtQueueProvider() override
Gaudi::Property< int > m_nEventsBeforeFork
SmartIF< IEventShare > m_evtShare
int m_nEvtCounted
The number of events this tool has counted itself in the input files.
Gaudi::Property< bool > m_useSharedReader
Gaudi::Property< int > m_nChunkSize
virtual void handle(const Incident &inc) override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
int m_nChunkStart
The beginning of the current chunk.
int m_nPositionInChunk
Position within the current chunk.
virtual void subProcessLogs(std::vector< std::string > &) override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
void * xmalloc(size_t size)
Trapping version of malloc.
retrieve(aClass, aKey=None)
Trapping version of malloc.