ATLAS Offline Software
SharedWriterTool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "SharedWriterTool.h"
7 
11 #include "CxxUtils/xmalloc.h"
12 #include "GaudiKernel/IEvtSelector.h"
13 #include "GaudiKernel/IConversionSvc.h"
14 #include "GaudiKernel/IIoComponentMgr.h"
15 
17 #include <filesystem>
18 
20  , const std::string& name
21  , const IInterface* parent)
23  , m_rankId(0)
24  , m_sharedRankQueue(nullptr)
25 {
26  m_subprocDirPrefix = "shared_writer";
27 }
28 
30 {
31 }
32 
34 {
35  ATH_MSG_DEBUG("In initialize");
36 
38  m_cnvSvc = serviceLocator()->service("AthenaPoolSharedIOCnvSvc");
39  ATH_CHECK(m_cnvSvc.isValid());
40 
41  return StatusCode::SUCCESS;
42 }
43 
45 {
46  ATH_MSG_DEBUG("In finalize");
47 
48  delete m_sharedRankQueue;
49  return StatusCode::SUCCESS;
50 }
51 
52 int SharedWriterTool::makePool(int /*maxevt*/, int nprocs, const std::string& topdir)
53 {
54  ATH_MSG_DEBUG("In makePool " << getpid());
55 
56  if(topdir.empty()) {
57  ATH_MSG_ERROR("Empty name for the top directory!");
58  return -1;
59  }
60 
61  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs) + 1;
62  m_subprocTopDir = topdir;
63 
64  SmartIF<IProperty> propertyServer(m_cnvSvc);
65  if(!propertyServer) {
66  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
67  return -1;
68  }
69  else {
70  std::string propertyName = "ParallelCompression";
71  bool parallelCompression(false);
72  BooleanProperty parallelCompressionProp(std::move(propertyName),parallelCompression);
73  if(propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
74  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
75  }
76  else {
77  SmartIF<IProperty> poolSvc(serviceLocator()->service("PoolSvc"));
78  if(!poolSvc) {
79  ATH_MSG_ERROR("Error retrieving PoolSvc");
80  }
81  else if(parallelCompressionProp.value()) {
82  if (poolSvc->setProperty("FileOpen", "update").isFailure()) {
83  ATH_MSG_ERROR("Could not change PoolSvc FileOpen Property");
84  }
85  }
86  }
87  }
88 
89  // Create rank queue and fill it
90  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedWriterTool_RankQueue_"+m_randStr,1,sizeof(int));
91  if(!m_sharedRankQueue->send_basic<int>(0)) {
92  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
93  return -1;
94  }
95 
96  // Create the process group and map_async bootstrap
98  ATH_MSG_INFO("Created shared writer process");
99  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
100  return -1;
101  ATH_MSG_INFO("Shared writer process bootstrapped");
102  return 1;
103 }
104 
106 {
107  ATH_MSG_DEBUG("In exec " << getpid());
108 
109  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
110  return StatusCode::FAILURE;
111  ATH_MSG_INFO("Shared writer started write events");
112 
113  // Set exit flag on writer
114  if(m_processGroup->map_async(0,0)){
115  ATH_MSG_ERROR("Unable to set exit to the writer");
116  return StatusCode::FAILURE;
117  }
118  return StatusCode::SUCCESS;
119 }
120 
121 void SharedWriterTool::subProcessLogs(std::vector<std::string>& filenames)
122 {
123  filenames.clear();
124  std::filesystem::path writer_rundir(m_subprocTopDir);
125  writer_rundir/= std::filesystem::path(m_subprocDirPrefix);
126  filenames.push_back(writer_rundir.string()+std::string("/AthenaMP.log"));
127 }
128 
130 {
132  return jobOutputs;
133 }
134 
135 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::bootstrap_func()
136 {
137  // It's possible to debug SharedWriter just like any other AthenaMP worker.
138  // The following procedure provides a minimal explanation on how this can be achieved:
139  //
140  // Terminal #1:
141  // * Run athena w/ debugging enabled, e.g. athena.py --debugWorker --stdcmalloc --nprocs=8 [...]
142  // * In this mode, workers will be stopped after fork(), waiting for SIGUSR1 to be resumed
143  // * Find the PID of the worker to be debugged (printed by the job in stdout)
144  //
145  // Terminal #2:
146  // * Attach gdb to the relevant worker, i.e. gdb python PID
147  // * Once the symbols are loaded, one can perform any gdb action such as setting breakpoints etc.
148  // * Once ready, send SIGUSR1 to the worker to resume work, i.e. signal SIGUSR1 (in gdb)
149  //
150  // Terminal #3:
151  // * Send SIGUSR1 to the remaining workers (easiest to use htop)
152  //
153  // However, note that sometimes Shared I/O infrastructure struggles with timing problems,
154  // such as server/client(s) starting/stopping too early/later. Debugging can change this
155  // behavior so please keep this in mind.
156  if(m_debug) waitForSignal();
157 
158  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
159  outwork->data = CxxUtils::xmalloc(sizeof(int));
160  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
161  outwork->size = sizeof(int);
162 
163  // ...
164  // (possible) TODO: extend outwork with some error message, which will be eventually
165  // reported in the master proces
166  // ...
167 
168  // ________________________ Get RankID ________________________
169  //
171  ATH_MSG_ERROR("Unable to get rank ID!");
172  return outwork;
173  }
174  // Writer dir: mkdir
175  std::filesystem::path writer_rundir(m_subprocTopDir);
176  writer_rundir /= std::filesystem::path(m_subprocDirPrefix);
177 
178  if(mkdir(writer_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
179  ATH_MSG_ERROR("Unable to make writer run directory: " << writer_rundir.string() << ". " << fmterror(errno));
180  return outwork;
181  }
182 
183  // __________ Redirect logs unless we want to attach debugger ____________
184  if(!m_debug) {
185  if(redirectLog(writer_rundir.string()))
186  return outwork;
187 
188  ATH_MSG_INFO("Logs redirected in the AthenaMP Shared Writer PID=" << getpid());
189  }
190 
191  // Update Io Registry
192  if(updateIoReg(writer_rundir.string()))
193  return outwork;
194 
195  ATH_MSG_INFO("Io registry updated in the AthenaMP Shared Writer PID=" << getpid());
196 
197  // _______________________ Handle saved PFC (if any) ______________________
198  std::filesystem::path abs_writer_rundir = std::filesystem::absolute(writer_rundir);
199  if(handleSavedPfc(abs_writer_rundir))
200  return outwork;
201 
202  // Reopen file descriptors
203  if(reopenFds())
204  return outwork;
205 
206  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Shared Writer PID=" << getpid());
207 
208  // Try to initialize AthenaRootSharedWriterSvc early on
209  SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
210  if(!sharedWriterSvc) {
211  ATH_MSG_WARNING("Error retrieving AthenaRootSharedWriterSvc from SharedWriterTool::bootstrap_func()");
212  }
213 
214  // Use IDataShare to make ConversionSvc a Share Server
215  SmartIF<IDataShare> cnvSvc(m_cnvSvc);
216  if (!cnvSvc || !cnvSvc->makeServer(-m_nprocs - 1 - 1024 * m_rankId).isSuccess()) {
217  ATH_MSG_ERROR("Failed to make the conversion service a share server");
218  return outwork;
219  }
220  else {
221  ATH_MSG_DEBUG("Successfully made the conversion service a share server");
222  }
223 
224  // ________________________ I/O reinit ________________________
225  if(!m_ioMgr->io_reinitialize().isSuccess()) {
226  ATH_MSG_ERROR("Failed to reinitialize I/O");
227  return outwork;
228  } else {
229  ATH_MSG_DEBUG("Successfully reinitialized I/O");
230  }
231 
232  // Writer dir: chdir
233  if(chdir(writer_rundir.string().c_str())==-1) {
234  ATH_MSG_ERROR("Failed to chdir to " << writer_rundir.string());
235  return outwork;
236  }
237 
238  // Declare success and return
239  *(int*)(outwork->data) = 0;
240  return outwork;
241 }
242 
243 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::exec_func()
244 {
245  ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid());
246  bool all_ok=true;
247 
248  SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
249  if(!sharedWriterSvc) {
250  ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc");
251  all_ok=false;
252  } else if(!sharedWriterSvc->share(m_nprocs, m_nMotherProcess.value()).isSuccess()) {
253  ATH_MSG_ERROR("Exec function could not share data");
254  all_ok=false;
255  }
256  AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc.get());
257  if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) {
258  ATH_MSG_ERROR("Exec function could not disconnectOutput");
259  all_ok=false;
260  }
261 
262  if(m_appMgr->stop().isFailure()) {
263  ATH_MSG_ERROR("Unable to stop AppMgr");
264  all_ok=false;
265  }
266  else {
267  if(m_appMgr->finalize().isFailure()) {
268  std::cerr << "Unable to finalize AppMgr" << std::endl;
269  all_ok=false;
270  }
271  }
272 
273  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
274  outwork->data = CxxUtils::xmalloc(sizeof(int));
275  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
276  outwork->size = sizeof(int);
277 
278  // ...
279  // (possible) TODO: extend outwork with some error message, which will be eventually
280  // reported in the master proces
281  // ...
282  return outwork;
283 }
284 
285 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::fin_func()
286 {
287  // Dummy
288  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
289  outwork->data = CxxUtils::xmalloc(sizeof(int));
290  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
291  outwork->size = sizeof(int);
292  return outwork;
293 }
SharedWriterTool::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedWriterTool.cxx:285
SharedWriterTool::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedWriterTool.cxx:243
SharedWriterTool::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedWriterTool.h:51
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:403
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
SharedWriterTool::finalize
virtual StatusCode finalize() override
Definition: SharedWriterTool.cxx:44
SharedWriterTool::SharedWriterTool
SharedWriterTool()
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
AthCnvSvc.h
SharedWriterTool::~SharedWriterTool
virtual ~SharedWriterTool() override
Definition: SharedWriterTool.cxx:29
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
SharedWriterTool::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: SharedWriterTool.cxx:129
ProcessGroup.h
SharedWriterTool::m_rankId
int m_rankId
Definition: SharedWriterTool.h:49
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
IAthenaSharedWriterSvc.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
SharedWriterTool::initialize
virtual StatusCode initialize() override
Definition: SharedWriterTool.cxx:33
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedWriterTool.h
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
SharedWriterTool::m_nMotherProcess
Gaudi::Property< bool > m_nMotherProcess
Definition: SharedWriterTool.h:42
SharedWriterTool::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedWriterTool.cxx:135
SharedWriterTool::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedWriterTool.h:45
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
LArConditionsTestConfig.poolSvc
poolSvc
Definition: LArConditionsTestConfig.py:79
grepfile.filenames
list filenames
Definition: grepfile.py:34
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:408
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
SharedWriterTool::m_cnvSvc
SmartIF< IConversionSvc > m_cnvSvc
Definition: SharedWriterTool.h:52
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
SharedWriterTool::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedWriterTool.cxx:121
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
AthCnvSvc
Definition: AthCnvSvc.h:66
xmalloc.h
Trapping version of malloc.
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340