ATLAS Offline Software
SharedWriterTool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "SharedWriterTool.h"
7 
11 #include "GaudiKernel/IEvtSelector.h"
12 #include "GaudiKernel/IConversionSvc.h"
13 #include "GaudiKernel/IIoComponentMgr.h"
14 
16 #include <filesystem>
17 
19  , const std::string& name
20  , const IInterface* parent)
22  , m_rankId(0)
23  , m_sharedRankQueue(nullptr)
24  , m_cnvSvc(0)
25 {
26  m_subprocDirPrefix = "shared_writer";
27 }
28 
30 {
31 }
32 
34 {
35  ATH_MSG_DEBUG("In initialize");
36 
38  ATH_CHECK(serviceLocator()->service("AthenaPoolCnvSvc", m_cnvSvc));
39 
40  return StatusCode::SUCCESS;
41 }
42 
44 {
45  ATH_MSG_DEBUG("In finalize");
46 
47  delete m_sharedRankQueue;
48  return StatusCode::SUCCESS;
49 }
50 
51 int SharedWriterTool::makePool(int /*maxevt*/, int nprocs, const std::string& topdir)
52 {
53  ATH_MSG_DEBUG("In makePool " << getpid());
54 
55  if(topdir.empty()) {
56  ATH_MSG_ERROR("Empty name for the top directory!");
57  return -1;
58  }
59 
60  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs) + 1;
61  m_subprocTopDir = topdir;
62 
63  IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc);
64  if(propertyServer==0) {
65  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
66  return -1;
67  }
68  else {
69  std::string propertyName = "ParallelCompression";
70  bool parallelCompression(false);
71  BooleanProperty parallelCompressionProp(propertyName,parallelCompression);
72  if(propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
73  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
74  }
75  else {
76  IService* poolSvc;
77  if(serviceLocator()->service("PoolSvc", poolSvc).isFailure() || poolSvc==0) {
78  ATH_MSG_ERROR("Error retrieving PoolSvc");
79  }
80  else if(parallelCompressionProp.value()) {
81  propertyServer = dynamic_cast<IProperty*>(poolSvc);
82  if (propertyServer==0 || propertyServer->setProperty("FileOpen", "update").isFailure()) {
83  ATH_MSG_ERROR("Could not change PoolSvc FileOpen Property");
84  }
85  }
86  }
87  }
88 
89  // Create rank queue and fill it
90  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedWriterTool_RankQueue_"+m_randStr,1,sizeof(int));
91  if(!m_sharedRankQueue->send_basic<int>(0)) {
92  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
93  return -1;
94  }
95 
96  // Create the process group and map_async bootstrap
98  ATH_MSG_INFO("Created shared writer process");
99  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
100  return -1;
101  ATH_MSG_INFO("Shared writer process bootstraped");
102  return 1;
103 }
104 
106 {
107  ATH_MSG_DEBUG("In exec " << getpid());
108 
109  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
110  return StatusCode::FAILURE;
111  ATH_MSG_INFO("Shared writer started write events");
112 
113  // Set exit flag on writer
114  if(m_processGroup->map_async(0,0)){
115  ATH_MSG_ERROR("Unable to set exit to the writer");
116  return StatusCode::FAILURE;
117  }
118  return StatusCode::SUCCESS;
119 }
120 
121 void SharedWriterTool::subProcessLogs(std::vector<std::string>& filenames)
122 {
123  filenames.clear();
124  std::filesystem::path writer_rundir(m_subprocTopDir);
125  writer_rundir/= std::filesystem::path(m_subprocDirPrefix);
126  filenames.push_back(writer_rundir.string()+std::string("/AthenaMP.log"));
127 }
128 
130 {
132  return jobOutputs;
133 }
134 
135 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::bootstrap_func()
136 {
137  // It's possible to debug SharedWriter just like any other AthenaMP worker.
138  // The following procedure provides a minimal explanation on how this can be achieved:
139  //
140  // Terminal #1:
141  // * Run athena w/ debugging enabled, e.g. athena.py --debugWorker --stdcmalloc --nprocs=8 [...]
142  // * In this mode, workers will be stopped after fork(), waiting for SIGUSR1 to be resumed
143  // * Find the PID of the worker to be debugged (printed by the job in stdout)
144  //
145  // Terminal #2:
146  // * Attach gdb to the relevant worker, i.e. gdb python PID
147  // * Once the symbols are loaded, one can perform any gdb action such as setting breakpoints etc.
148  // * Once ready, send SIGUSR1 to the worker to resume work, i.e. signal SIGUSR1 (in gdb)
149  //
150  // Terminal #3:
151  // * Send SIGUSR1 to the remaining workers (easiest to use htop)
152  //
153  // However, note that sometimes Shared I/O infrastructure struggles with timing problems,
154  // such as server/client(s) starting/stopping too early/later. Debugging can change this
155  // behavior so please keep this in mind.
156  if(m_debug) waitForSignal();
157 
158  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
159  outwork->data = malloc(sizeof(int));
160  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
161  outwork->size = sizeof(int);
162 
163  // ...
164  // (possible) TODO: extend outwork with some error message, which will be eventually
165  // reported in the master proces
166  // ...
167 
168  // ________________________ Get RankID ________________________
169  //
171  ATH_MSG_ERROR("Unable to get rank ID!");
172  return outwork;
173  }
174  // Writer dir: mkdir
175  std::filesystem::path writer_rundir(m_subprocTopDir);
176  writer_rundir /= std::filesystem::path(m_subprocDirPrefix);
177 
178  if(mkdir(writer_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
179  ATH_MSG_ERROR("Unable to make writer run directory: " << writer_rundir.string() << ". " << fmterror(errno));
180  return outwork;
181  }
182 
183  // __________ Redirect logs unless we want to attach debugger ____________
184  if(!m_debug) {
185  if(redirectLog(writer_rundir.string()))
186  return outwork;
187 
188  ATH_MSG_INFO("Logs redirected in the AthenaMP Shared Writer PID=" << getpid());
189  }
190 
191  // Update Io Registry
192  if(updateIoReg(writer_rundir.string()))
193  return outwork;
194 
195  ATH_MSG_INFO("Io registry updated in the AthenaMP Shared Writer PID=" << getpid());
196 
197  // _______________________ Handle saved PFC (if any) ______________________
198  std::filesystem::path abs_writer_rundir = std::filesystem::absolute(writer_rundir);
199  if(handleSavedPfc(abs_writer_rundir))
200  return outwork;
201 
202  // Reopen file descriptors
203  if(reopenFds())
204  return outwork;
205 
206  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Shared Writer PID=" << getpid());
207 
208  // Try to initialize AthenaRootSharedWriterSvc early on
209  IAthenaSharedWriterSvc* sharedWriterSvc;
210  StatusCode sc = serviceLocator()->service("AthenaRootSharedWriterSvc", sharedWriterSvc);
211  if(sc.isFailure() || sharedWriterSvc == nullptr) {
212  ATH_MSG_WARNING("Error retrieving AthenaRootSharedWriterSvc from SharedWriterTool::bootstrap_func()");
213  }
214 
215  // Use IDataShare to make ConversionSvc a Share Server
216  IDataShare* cnvSvc = dynamic_cast<IDataShare*>(m_cnvSvc);
217  if (cnvSvc == 0 || !cnvSvc->makeServer(-m_nprocs - 1 - 1024 * m_rankId).isSuccess()) {
218  ATH_MSG_ERROR("Failed to make the conversion service a share server");
219  return outwork;
220  }
221  else {
222  ATH_MSG_DEBUG("Successfully made the conversion service a share server");
223  }
224 
225  // ________________________ I/O reinit ________________________
226  if(!m_ioMgr->io_reinitialize().isSuccess()) {
227  ATH_MSG_ERROR("Failed to reinitialize I/O");
228  return outwork;
229  } else {
230  ATH_MSG_DEBUG("Successfully reinitialized I/O");
231  }
232 
233  // Writer dir: chdir
234  if(chdir(writer_rundir.string().c_str())==-1) {
235  ATH_MSG_ERROR("Failed to chdir to " << writer_rundir.string());
236  return outwork;
237  }
238 
239  // Declare success and return
240  *(int*)(outwork->data) = 0;
241  return outwork;
242 }
243 
244 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::exec_func()
245 {
246  ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid());
247  bool all_ok=true;
248 
249  IAthenaSharedWriterSvc* sharedWriterSvc;
250  StatusCode sc = serviceLocator()->service("AthenaRootSharedWriterSvc", sharedWriterSvc);
251  if(sc.isFailure() || sharedWriterSvc==0) {
252  ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc");
253  all_ok=false;
254  } else if(!sharedWriterSvc->share(m_nprocs, m_nMotherProcess.value()).isSuccess()) {
255  ATH_MSG_ERROR("Exec function could not share data");
256  all_ok=false;
257  }
258  AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc);
259  if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) {
260  ATH_MSG_ERROR("Exec function could not disconnectOutput");
261  all_ok=false;
262  }
263 
264  if(m_appMgr->stop().isFailure()) {
265  ATH_MSG_ERROR("Unable to stop AppMgr");
266  all_ok=false;
267  }
268  else {
269  if(m_appMgr->finalize().isFailure()) {
270  std::cerr << "Unable to finalize AppMgr" << std::endl;
271  all_ok=false;
272  }
273  }
274 
275  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
276  outwork->data = malloc(sizeof(int));
277  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
278  outwork->size = sizeof(int);
279 
280  // ...
281  // (possible) TODO: extend outwork with some error message, which will be eventually
282  // reported in the master proces
283  // ...
284  return outwork;
285 }
286 
287 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::fin_func()
288 {
289  // Dummy
290  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
291  outwork->data = malloc(sizeof(int));
292  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
293  outwork->size = sizeof(int);
294  return outwork;
295 }
SharedWriterTool::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedWriterTool.cxx:287
SharedWriterTool::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedWriterTool.cxx:244
SharedWriterTool::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedWriterTool.h:51
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:432
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
SharedWriterTool::finalize
virtual StatusCode finalize() override
Definition: SharedWriterTool.cxx:43
SharedWriterTool::SharedWriterTool
SharedWriterTool()
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:32
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:362
AthCnvSvc.h
SharedWriterTool::~SharedWriterTool
virtual ~SharedWriterTool() override
Definition: SharedWriterTool.cxx:29
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
SharedWriterTool::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: SharedWriterTool.cxx:129
ProcessGroup.h
SharedWriterTool::m_rankId
int m_rankId
Definition: SharedWriterTool.h:49
SharedWriterTool::m_cnvSvc
IConversionSvc * m_cnvSvc
Definition: SharedWriterTool.h:52
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
IAthenaSharedWriterSvc.h
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:28
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
SharedWriterTool::initialize
virtual StatusCode initialize() override
Definition: SharedWriterTool.cxx:33
IAthenaSharedWriterSvc
Definition: IAthenaSharedWriterSvc.h:12
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedWriterTool.h
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:129
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:341
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
IDataShare::makeServer
virtual StatusCode makeServer(int num)=0
Make this a server.
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
SharedWriterTool::m_nMotherProcess
Gaudi::Property< bool > m_nMotherProcess
Definition: SharedWriterTool.h:42
SharedWriterTool::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedWriterTool.cxx:135
SharedWriterTool::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedWriterTool.h:45
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:278
LArConditionsTestConfig.poolSvc
poolSvc
Definition: LArConditionsTestConfig.py:79
IAthenaSharedWriterSvc::share
virtual StatusCode share(int numClients=0, bool motherClient=false)=0
grepfile.filenames
list filenames
Definition: grepfile.py:34
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:437
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:425
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
SharedWriterTool::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedWriterTool.cxx:121
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthCnvSvc
Definition: AthCnvSvc.h:67
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:369