ATLAS Offline Software
Loading...
Searching...
No Matches
SharedWriterTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "SharedWriterTool.h"
7
11#include "CxxUtils/xmalloc.h"
12#include "GaudiKernel/IEvtSelector.h"
13#include "GaudiKernel/IConversionSvc.h"
14#include "GaudiKernel/IIoComponentMgr.h"
15
17#include <filesystem>
18
20 , const std::string& name
21 , const IInterface* parent)
22 : AthenaMPToolBase(type,name,parent)
23 , m_rankId(0)
24 , m_sharedRankQueue(nullptr)
25{
26 m_subprocDirPrefix = "shared_writer";
27}
28
32
34{
35 ATH_MSG_DEBUG("In initialize");
36
38 m_cnvSvc = serviceLocator()->service("AthenaPoolSharedIOCnvSvc");
39 ATH_CHECK(m_cnvSvc.isValid());
40
41 return StatusCode::SUCCESS;
42}
43
45{
46 ATH_MSG_DEBUG("In finalize");
47
48 delete m_sharedRankQueue;
49 return StatusCode::SUCCESS;
50}
51
52int SharedWriterTool::makePool(int /*maxevt*/, int nprocs, const std::string& topdir)
53{
54 ATH_MSG_DEBUG("In makePool " << getpid());
55
56 if(topdir.empty()) {
57 ATH_MSG_ERROR("Empty name for the top directory!");
58 return -1;
59 }
60
61 m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs) + 1;
62 m_subprocTopDir = topdir;
63
64 SmartIF<IProperty> propertyServer(m_cnvSvc);
65 if(!propertyServer) {
66 ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
67 return -1;
68 }
69 else {
70 std::string propertyName = "ParallelCompression";
71 bool parallelCompression(false);
72 BooleanProperty parallelCompressionProp(std::move(propertyName),parallelCompression);
73 if(propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
74 ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
75 }
76 else {
77 SmartIF<IProperty> poolSvc(serviceLocator()->service("PoolSvc"));
78 if(!poolSvc) {
79 ATH_MSG_ERROR("Error retrieving PoolSvc");
80 }
81 else if(parallelCompressionProp.value()) {
82 if (poolSvc->setProperty("FileOpen", "update").isFailure()) {
83 ATH_MSG_ERROR("Could not change PoolSvc FileOpen Property");
84 }
85 }
86 }
87 }
88
89 // Create rank queue and fill it
90 m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedWriterTool_RankQueue_"+m_randStr,1,sizeof(int));
91 if(!m_sharedRankQueue->send_basic<int>(0)) {
92 ATH_MSG_ERROR("Unable to send int to the ranks queue!");
93 return -1;
94 }
95
96 // Create the process group and map_async bootstrap
97 m_processGroup = new AthenaInterprocess::ProcessGroup(1);
98 ATH_MSG_INFO("Created shared writer process");
99 if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
100 return -1;
101 ATH_MSG_INFO("Shared writer process bootstrapped");
102 return 1;
103}
104
105StatusCode SharedWriterTool::exec()
106{
107 ATH_MSG_DEBUG("In exec " << getpid());
108
109 if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
110 return StatusCode::FAILURE;
111 ATH_MSG_INFO("Shared writer started write events");
112
113 // Set exit flag on writer
114 if(m_processGroup->map_async(0,0)){
115 ATH_MSG_ERROR("Unable to set exit to the writer");
116 return StatusCode::FAILURE;
117 }
118 return StatusCode::SUCCESS;
119}
120
121void SharedWriterTool::subProcessLogs(std::vector<std::string>& filenames)
122{
123 filenames.clear();
124 std::filesystem::path writer_rundir(m_subprocTopDir);
125 writer_rundir/= std::filesystem::path(m_subprocDirPrefix);
126 filenames.push_back(writer_rundir.string()+std::string("/AthenaMP.log"));
127}
128
134
135std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::bootstrap_func()
136{
137 // It's possible to debug SharedWriter just like any other AthenaMP worker.
138 // The following procedure provides a minimal explanation on how this can be achieved:
139 //
140 // Terminal #1:
141 // * Run athena w/ debugging enabled, e.g. athena.py --debugWorker --stdcmalloc --nprocs=8 [...]
142 // * In this mode, workers will be stopped after fork(), waiting for SIGUSR1 to be resumed
143 // * Find the PID of the worker to be debugged (printed by the job in stdout)
144 //
145 // Terminal #2:
146 // * Attach gdb to the relevant worker, i.e. gdb python PID
147 // * Once the symbols are loaded, one can perform any gdb action such as setting breakpoints etc.
148 // * Once ready, send SIGUSR1 to the worker to resume work, i.e. signal SIGUSR1 (in gdb)
149 //
150 // Terminal #3:
151 // * Send SIGUSR1 to the remaining workers (easiest to use htop)
152 //
153 // However, note that sometimes Shared I/O infrastructure struggles with timing problems,
154 // such as server/client(s) starting/stopping too early/later. Debugging can change this
155 // behavior so please keep this in mind.
157
158 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
159 outwork->data = CxxUtils::xmalloc(sizeof(int));
160 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
161 outwork->size = sizeof(int);
162
163 // ...
164 // (possible) TODO: extend outwork with some error message, which will be eventually
165 // reported in the master proces
166 // ...
167
168 // ________________________ Get RankID ________________________
169 //
170 if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
171 ATH_MSG_ERROR("Unable to get rank ID!");
172 return outwork;
173 }
174 // Writer dir: mkdir
175 std::filesystem::path writer_rundir(m_subprocTopDir);
176 writer_rundir /= std::filesystem::path(m_subprocDirPrefix);
177
178 if(mkdir(writer_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
179 ATH_MSG_ERROR("Unable to make writer run directory: " << writer_rundir.string() << ". " << fmterror(errno));
180 return outwork;
181 }
182
183 // __________ Redirect logs unless we want to attach debugger ____________
184 if(!m_debug) {
185 if(redirectLog(writer_rundir.string()))
186 return outwork;
187
188 ATH_MSG_INFO("Logs redirected in the AthenaMP Shared Writer PID=" << getpid());
189 }
190
191 // Update Io Registry
192 if(updateIoReg(writer_rundir.string()))
193 return outwork;
194
195 ATH_MSG_INFO("Io registry updated in the AthenaMP Shared Writer PID=" << getpid());
196
197 // _______________________ Handle saved PFC (if any) ______________________
198 std::filesystem::path abs_writer_rundir = std::filesystem::absolute(writer_rundir);
199 if(handleSavedPfc(abs_writer_rundir))
200 return outwork;
201
202 // Reopen file descriptors
203 if(reopenFds())
204 return outwork;
205
206 ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Shared Writer PID=" << getpid());
207
208 // Try to initialize AthenaRootSharedWriterSvc early on
209 SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
210 if(!sharedWriterSvc) {
211 ATH_MSG_WARNING("Error retrieving AthenaRootSharedWriterSvc from SharedWriterTool::bootstrap_func()");
212 }
213
214 // Use IDataShare to make ConversionSvc a Share Server
215 SmartIF<IDataShare> cnvSvc(m_cnvSvc);
216 if (!cnvSvc || !cnvSvc->makeServer(-m_nprocs - 1 - 1024 * m_rankId).isSuccess()) {
217 ATH_MSG_ERROR("Failed to make the conversion service a share server");
218 return outwork;
219 }
220 else {
221 ATH_MSG_DEBUG("Successfully made the conversion service a share server");
222 }
223
224 // ________________________ I/O reinit ________________________
225 if(!m_ioMgr->io_reinitialize().isSuccess()) {
226 ATH_MSG_ERROR("Failed to reinitialize I/O");
227 return outwork;
228 } else {
229 ATH_MSG_DEBUG("Successfully reinitialized I/O");
230 }
231
232 // Writer dir: chdir
233 if(chdir(writer_rundir.string().c_str())==-1) {
234 ATH_MSG_ERROR("Failed to chdir to " << writer_rundir.string());
235 return outwork;
236 }
237
238 // Declare success and return
239 *(int*)(outwork->data) = 0;
240 return outwork;
241}
242
243std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::exec_func()
244{
245 ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid());
246 bool all_ok=true;
247
248 SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
249 if(!sharedWriterSvc) {
250 ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc");
251 all_ok=false;
252 } else if(!sharedWriterSvc->share(m_nprocs, m_nMotherProcess.value()).isSuccess()) {
253 ATH_MSG_ERROR("Exec function could not share data");
254 all_ok=false;
255 }
256 AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc.get());
257 if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) {
258 ATH_MSG_ERROR("Exec function could not disconnectOutput");
259 all_ok=false;
260 }
261
262 if(m_appMgr->stop().isFailure()) {
263 ATH_MSG_ERROR("Unable to stop AppMgr");
264 all_ok=false;
265 }
266 else {
267 if(m_appMgr->finalize().isFailure()) {
268 std::cerr << "Unable to finalize AppMgr" << std::endl;
269 all_ok=false;
270 }
271 }
272
273 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
274 outwork->data = CxxUtils::xmalloc(sizeof(int));
275 *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
276 outwork->size = sizeof(int);
277
278 // ...
279 // (possible) TODO: extend outwork with some error message, which will be eventually
280 // reported in the master proces
281 // ...
282 return outwork;
283}
284
285std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::fin_func()
286{
287 // Dummy
288 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
289 outwork->data = CxxUtils::xmalloc(sizeof(int));
290 *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
291 outwork->size = sizeof(int);
292 return outwork;
293}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
Base class for all conversion services.
Definition AthCnvSvc.h:66
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
std::string m_subprocTopDir
Top run directory for subprocesses.
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
virtual StatusCode initialize() override
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
ServiceHandle< IAppMgrUI > m_appMgr
int m_nprocs
Number of workers spawned by the master process.
ServiceHandle< IIoComponentMgr > m_ioMgr
AthenaInterprocess::ProcessGroup * m_processGroup
std::string m_subprocDirPrefix
For ex. "worker__".
std::string fmterror(int errnum)
Gaudi::Property< bool > m_nMotherProcess
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
SmartIF< IConversionSvc > m_cnvSvc
Gaudi::Property< bool > m_debug
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
AthenaInterprocess::SharedQueue * m_sharedRankQueue
virtual void subProcessLogs(std::vector< std::string > &) override
virtual ~SharedWriterTool() override
virtual StatusCode finalize() override
virtual StatusCode initialize() override
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
Trapping version of malloc.