ATLAS Offline Software
Loading...
Searching...
No Matches
SharedWriterTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "SharedWriterTool.h"
7
11#include "CxxUtils/xmalloc.h"
12#include "GaudiKernel/IEvtSelector.h"
13#include "GaudiKernel/IConversionSvc.h"
14#include "GaudiKernel/IIoComponentMgr.h"
15
17#include <filesystem>
18
20 , const std::string& name
21 , const IInterface* parent)
22 : AthenaMPToolBase(type,name,parent)
23 , m_rankId(0)
24 , m_sharedRankQueue(nullptr)
25{
26 m_subprocDirPrefix = "shared_writer";
27}
28
32
34{
35 ATH_MSG_DEBUG("In initialize");
36
38 m_cnvSvc = serviceLocator()->service("AthenaPoolSharedIOCnvSvc");
39 ATH_CHECK(m_cnvSvc.isValid());
40
41 return StatusCode::SUCCESS;
42}
43
45{
46 ATH_MSG_DEBUG("In finalize");
47
48 delete m_sharedRankQueue;
49 return StatusCode::SUCCESS;
50}
51
52int SharedWriterTool::makePool(int /*maxevt*/, int nprocs, const std::string& topdir)
53{
54 ATH_MSG_DEBUG("In makePool " << getpid());
55
56 if(topdir.empty()) {
57 ATH_MSG_ERROR("Empty name for the top directory!");
58 return -1;
59 }
60
61 m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs) + 1;
62 m_subprocTopDir = topdir;
63
64 // Create rank queue and fill it
65 m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedWriterTool_RankQueue_"+m_randStr,1,sizeof(int));
66 if(!m_sharedRankQueue->send_basic<int>(0)) {
67 ATH_MSG_ERROR("Unable to send int to the ranks queue!");
68 return -1;
69 }
70
71 // Create the process group and map_async bootstrap
72 m_processGroup = new AthenaInterprocess::ProcessGroup(1);
73 ATH_MSG_INFO("Created shared writer process");
74 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
75 return -1;
76 ATH_MSG_INFO("Shared writer process bootstrapped");
77 return 1;
78}
79
80StatusCode SharedWriterTool::exec()
81{
82 ATH_MSG_DEBUG("In exec " << getpid());
83
84 if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
85 return StatusCode::FAILURE;
86 ATH_MSG_INFO("Shared writer started write events");
87
88 // Set exit flag on writer
89 if(m_processGroup->map_async(0,0)){
90 ATH_MSG_ERROR("Unable to set exit to the writer");
91 return StatusCode::FAILURE;
92 }
93 return StatusCode::SUCCESS;
94}
95
96void SharedWriterTool::subProcessLogs(std::vector<std::string>& filenames)
97{
98 filenames.clear();
99 std::filesystem::path writer_rundir(m_subprocTopDir);
100 writer_rundir/= std::filesystem::path(m_subprocDirPrefix);
101 filenames.push_back(writer_rundir.string()+std::string("/AthenaMP.log"));
102}
103
109
110std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::bootstrap_func()
111{
112 // It's possible to debug SharedWriter just like any other AthenaMP worker.
113 // The following procedure provides a minimal explanation on how this can be achieved:
114 //
115 // Terminal #1:
116 // * Run athena w/ debugging enabled, e.g. athena.py --debugWorker --stdcmalloc --nprocs=8 [...]
117 // * In this mode, workers will be stopped after fork(), waiting for SIGUSR1 to be resumed
118 // * Find the PID of the worker to be debugged (printed by the job in stdout)
119 //
120 // Terminal #2:
121 // * Attach gdb to the relevant worker, i.e. gdb python PID
122 // * Once the symbols are loaded, one can perform any gdb action such as setting breakpoints etc.
123 // * Once ready, send SIGUSR1 to the worker to resume work, i.e. signal SIGUSR1 (in gdb)
124 //
125 // Terminal #3:
126 // * Send SIGUSR1 to the remaining workers (easiest to use htop)
127 //
128 // However, note that sometimes Shared I/O infrastructure struggles with timing problems,
129 // such as server/client(s) starting/stopping too early/later. Debugging can change this
130 // behavior so please keep this in mind.
132
133 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
134 outwork->data = CxxUtils::xmalloc(sizeof(int));
135 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
136 outwork->size = sizeof(int);
137
138 // ...
139 // (possible) TODO: extend outwork with some error message, which will be eventually
140 // reported in the master proces
141 // ...
142
143 // ________________________ Get RankID ________________________
144 //
145 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
146 ATH_MSG_ERROR("Unable to get rank ID!");
147 return outwork;
148 }
149 // Writer dir: mkdir
150 std::filesystem::path writer_rundir(m_subprocTopDir);
151 writer_rundir /= std::filesystem::path(m_subprocDirPrefix);
152
153 if(mkdir(writer_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
154 ATH_MSG_ERROR("Unable to make writer run directory: " << writer_rundir.string() << ". " << fmterror(errno));
155 return outwork;
156 }
157
158 // __________ Redirect logs unless we want to attach debugger ____________
159 if(!m_debug) {
160 if(redirectLog(writer_rundir.string()))
161 return outwork;
162
163 ATH_MSG_INFO("Logs redirected in the AthenaMP Shared Writer PID=" << getpid());
164 }
165
166 // Update Io Registry
167 if(updateIoReg(writer_rundir.string()))
168 return outwork;
169
170 ATH_MSG_INFO("Io registry updated in the AthenaMP Shared Writer PID=" << getpid());
171
172 // _______________________ Handle saved PFC (if any) ______________________
173 std::filesystem::path abs_writer_rundir = std::filesystem::absolute(writer_rundir);
174 if(handleSavedPfc(abs_writer_rundir))
175 return outwork;
176
177 // Reopen file descriptors
178 if(reopenFds())
179 return outwork;
180
181 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Shared Writer PID=" << getpid());
182
183 // Try to initialize AthenaRootSharedWriterSvc early on
184 SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
185 if(!sharedWriterSvc) {
186 ATH_MSG_WARNING("Error retrieving AthenaRootSharedWriterSvc from SharedWriterTool::bootstrap_func()");
187 }
188
189 // Use IDataShare to make ConversionSvc a Share Server
190 SmartIF<IDataShare> cnvSvc(m_cnvSvc);
191 if (!cnvSvc || !cnvSvc->makeServer(-m_nprocs - 1 - 1024 * m_rankId).isSuccess()) {
192 ATH_MSG_ERROR("Failed to make the conversion service a share server");
193 return outwork;
194 }
195 else {
196 ATH_MSG_DEBUG("Successfully made the conversion service a share server");
197 }
198
199 // ________________________ I/O reinit ________________________
200 if(!m_ioMgr->io_reinitialize().isSuccess()) {
201 ATH_MSG_ERROR("Failed to reinitialize I/O");
202 return outwork;
203 } else {
204 ATH_MSG_DEBUG("Successfully reinitialized I/O");
205 }
206
207 // Writer dir: chdir
208 if(chdir(writer_rundir.string().c_str())==-1) {
209 ATH_MSG_ERROR("Failed to chdir to " << writer_rundir.string());
210 return outwork;
211 }
212
213 // Declare success and return
214 *(int*)(outwork->data) = 0;
215 return outwork;
216}
217
218std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::exec_func()
219{
220 ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid());
221 bool all_ok=true;
222
223 SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
224 if(!sharedWriterSvc) {
225 ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc");
226 all_ok=false;
227 } else if(!sharedWriterSvc->share(m_nprocs, m_nMotherProcess.value()).isSuccess()) {
228 ATH_MSG_ERROR("Exec function could not share data");
229 all_ok=false;
230 }
231 AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc.get());
232 if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) {
233 ATH_MSG_ERROR("Exec function could not disconnectOutput");
234 all_ok=false;
235 }
236
237 if(m_appMgr->stop().isFailure()) {
238 ATH_MSG_ERROR("Unable to stop AppMgr");
239 all_ok=false;
240 }
241 else {
242 if(m_appMgr->finalize().isFailure()) {
243 std::cerr << "Unable to finalize AppMgr" << std::endl;
244 all_ok=false;
245 }
246 }
247
248 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
249 outwork->data = CxxUtils::xmalloc(sizeof(int));
250 *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
251 outwork->size = sizeof(int);
252
253 // ...
254 // (possible) TODO: extend outwork with some error message, which will be eventually
255 // reported in the master proces
256 // ...
257 return outwork;
258}
259
260std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::fin_func()
261{
262 // Dummy
263 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
264 outwork->data = CxxUtils::xmalloc(sizeof(int));
265 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
266 outwork->size = sizeof(int);
267 return outwork;
268}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
Base class for all conversion services.
Definition AthCnvSvc.h:66
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
virtual StatusCode initialize() override
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IAppMgrUI > m_appMgr
int m_nprocs
Number of workers spawned by the master process.
ServiceHandle< IIoComponentMgr > m_ioMgr
AthenaInterprocess::ProcessGroup * m_processGroup
std::string m_subprocDirPrefix
For ex. "worker__".
std::string fmterror(int errnum)
Gaudi::Property< bool > m_nMotherProcess
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
SmartIF< IConversionSvc > m_cnvSvc
Gaudi::Property< bool > m_debug
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
AthenaInterprocess::SharedQueue * m_sharedRankQueue
virtual void subProcessLogs(std::vector< std::string > &) override
virtual ~SharedWriterTool() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
::StatusCode StatusCode
StatusCode definition for legacy code.
Trapping version of malloc.