ATLAS Offline Software
SharedWriterTool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "SharedWriterTool.h"
7 
11 #include "GaudiKernel/IEvtSelector.h"
12 #include "GaudiKernel/IConversionSvc.h"
13 #include "GaudiKernel/IIoComponentMgr.h"
14 
16 #include <filesystem>
17 
19  , const std::string& name
20  , const IInterface* parent)
22  , m_rankId(0)
23  , m_sharedRankQueue(nullptr)
24 {
25  m_subprocDirPrefix = "shared_writer";
26 }
27 
29 {
30 }
31 
33 {
34  ATH_MSG_DEBUG("In initialize");
35 
37  m_cnvSvc = serviceLocator()->service("AthenaPoolCnvSvc");
38  ATH_CHECK(m_cnvSvc.isValid());
39 
40  return StatusCode::SUCCESS;
41 }
42 
44 {
45  ATH_MSG_DEBUG("In finalize");
46 
47  delete m_sharedRankQueue;
48  return StatusCode::SUCCESS;
49 }
50 
51 int SharedWriterTool::makePool(int /*maxevt*/, int nprocs, const std::string& topdir)
52 {
53  ATH_MSG_DEBUG("In makePool " << getpid());
54 
55  if(topdir.empty()) {
56  ATH_MSG_ERROR("Empty name for the top directory!");
57  return -1;
58  }
59 
60  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs) + 1;
61  m_subprocTopDir = topdir;
62 
63  SmartIF<IProperty> propertyServer(m_cnvSvc);
64  if(!propertyServer) {
65  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
66  return -1;
67  }
68  else {
69  std::string propertyName = "ParallelCompression";
70  bool parallelCompression(false);
71  BooleanProperty parallelCompressionProp(propertyName,parallelCompression);
72  if(propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
73  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
74  }
75  else {
76  SmartIF<IProperty> poolSvc(serviceLocator()->service("PoolSvc"));
77  if(!poolSvc) {
78  ATH_MSG_ERROR("Error retrieving PoolSvc");
79  }
80  else if(parallelCompressionProp.value()) {
81  if (poolSvc->setProperty("FileOpen", "update").isFailure()) {
82  ATH_MSG_ERROR("Could not change PoolSvc FileOpen Property");
83  }
84  }
85  }
86  }
87 
88  // Create rank queue and fill it
89  m_sharedRankQueue = new AthenaInterprocess::SharedQueue("SharedWriterTool_RankQueue_"+m_randStr,1,sizeof(int));
90  if(!m_sharedRankQueue->send_basic<int>(0)) {
91  ATH_MSG_ERROR("Unable to send int to the ranks queue!");
92  return -1;
93  }
94 
95  // Create the process group and map_async bootstrap
97  ATH_MSG_INFO("Created shared writer process");
98  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
99  return -1;
100  ATH_MSG_INFO("Shared writer process bootstraped");
101  return 1;
102 }
103 
105 {
106  ATH_MSG_DEBUG("In exec " << getpid());
107 
108  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
109  return StatusCode::FAILURE;
110  ATH_MSG_INFO("Shared writer started write events");
111 
112  // Set exit flag on writer
113  if(m_processGroup->map_async(0,0)){
114  ATH_MSG_ERROR("Unable to set exit to the writer");
115  return StatusCode::FAILURE;
116  }
117  return StatusCode::SUCCESS;
118 }
119 
120 void SharedWriterTool::subProcessLogs(std::vector<std::string>& filenames)
121 {
122  filenames.clear();
123  std::filesystem::path writer_rundir(m_subprocTopDir);
124  writer_rundir/= std::filesystem::path(m_subprocDirPrefix);
125  filenames.push_back(writer_rundir.string()+std::string("/AthenaMP.log"));
126 }
127 
129 {
131  return jobOutputs;
132 }
133 
134 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::bootstrap_func()
135 {
136  // It's possible to debug SharedWriter just like any other AthenaMP worker.
137  // The following procedure provides a minimal explanation on how this can be achieved:
138  //
139  // Terminal #1:
140  // * Run athena w/ debugging enabled, e.g. athena.py --debugWorker --stdcmalloc --nprocs=8 [...]
141  // * In this mode, workers will be stopped after fork(), waiting for SIGUSR1 to be resumed
142  // * Find the PID of the worker to be debugged (printed by the job in stdout)
143  //
144  // Terminal #2:
145  // * Attach gdb to the relevant worker, i.e. gdb python PID
146  // * Once the symbols are loaded, one can perform any gdb action such as setting breakpoints etc.
147  // * Once ready, send SIGUSR1 to the worker to resume work, i.e. signal SIGUSR1 (in gdb)
148  //
149  // Terminal #3:
150  // * Send SIGUSR1 to the remaining workers (easiest to use htop)
151  //
152  // However, note that sometimes Shared I/O infrastructure struggles with timing problems,
153  // such as server/client(s) starting/stopping too early/later. Debugging can change this
154  // behavior so please keep this in mind.
155  if(m_debug) waitForSignal();
156 
157  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
158  outwork->data = malloc(sizeof(int));
159  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
160  outwork->size = sizeof(int);
161 
162  // ...
163  // (possible) TODO: extend outwork with some error message, which will be eventually
164  // reported in the master proces
165  // ...
166 
167  // ________________________ Get RankID ________________________
168  //
170  ATH_MSG_ERROR("Unable to get rank ID!");
171  return outwork;
172  }
173  // Writer dir: mkdir
174  std::filesystem::path writer_rundir(m_subprocTopDir);
175  writer_rundir /= std::filesystem::path(m_subprocDirPrefix);
176 
177  if(mkdir(writer_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
178  ATH_MSG_ERROR("Unable to make writer run directory: " << writer_rundir.string() << ". " << fmterror(errno));
179  return outwork;
180  }
181 
182  // __________ Redirect logs unless we want to attach debugger ____________
183  if(!m_debug) {
184  if(redirectLog(writer_rundir.string()))
185  return outwork;
186 
187  ATH_MSG_INFO("Logs redirected in the AthenaMP Shared Writer PID=" << getpid());
188  }
189 
190  // Update Io Registry
191  if(updateIoReg(writer_rundir.string()))
192  return outwork;
193 
194  ATH_MSG_INFO("Io registry updated in the AthenaMP Shared Writer PID=" << getpid());
195 
196  // _______________________ Handle saved PFC (if any) ______________________
197  std::filesystem::path abs_writer_rundir = std::filesystem::absolute(writer_rundir);
198  if(handleSavedPfc(abs_writer_rundir))
199  return outwork;
200 
201  // Reopen file descriptors
202  if(reopenFds())
203  return outwork;
204 
205  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Shared Writer PID=" << getpid());
206 
207  // Try to initialize AthenaRootSharedWriterSvc early on
208  SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
209  if(!sharedWriterSvc) {
210  ATH_MSG_WARNING("Error retrieving AthenaRootSharedWriterSvc from SharedWriterTool::bootstrap_func()");
211  }
212 
213  // Use IDataShare to make ConversionSvc a Share Server
214  SmartIF<IDataShare> cnvSvc(m_cnvSvc);
215  if (!cnvSvc || !cnvSvc->makeServer(-m_nprocs - 1 - 1024 * m_rankId).isSuccess()) {
216  ATH_MSG_ERROR("Failed to make the conversion service a share server");
217  return outwork;
218  }
219  else {
220  ATH_MSG_DEBUG("Successfully made the conversion service a share server");
221  }
222 
223  // ________________________ I/O reinit ________________________
224  if(!m_ioMgr->io_reinitialize().isSuccess()) {
225  ATH_MSG_ERROR("Failed to reinitialize I/O");
226  return outwork;
227  } else {
228  ATH_MSG_DEBUG("Successfully reinitialized I/O");
229  }
230 
231  // Writer dir: chdir
232  if(chdir(writer_rundir.string().c_str())==-1) {
233  ATH_MSG_ERROR("Failed to chdir to " << writer_rundir.string());
234  return outwork;
235  }
236 
237  // Declare success and return
238  *(int*)(outwork->data) = 0;
239  return outwork;
240 }
241 
242 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::exec_func()
243 {
244  ATH_MSG_INFO("Exec function in the AthenaMP Shared Writer PID=" << getpid());
245  bool all_ok=true;
246 
247  SmartIF<IAthenaSharedWriterSvc> sharedWriterSvc(serviceLocator()->service("AthenaRootSharedWriterSvc"));
248  if(!sharedWriterSvc) {
249  ATH_MSG_ERROR("Error retrieving AthenaRootSharedWriterSvc");
250  all_ok=false;
251  } else if(!sharedWriterSvc->share(m_nprocs, m_nMotherProcess.value()).isSuccess()) {
252  ATH_MSG_ERROR("Exec function could not share data");
253  all_ok=false;
254  }
255  AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_cnvSvc.get());
256  if (cnvSvc == 0 || !cnvSvc->disconnectOutput("").isSuccess()) {
257  ATH_MSG_ERROR("Exec function could not disconnectOutput");
258  all_ok=false;
259  }
260 
261  if(m_appMgr->stop().isFailure()) {
262  ATH_MSG_ERROR("Unable to stop AppMgr");
263  all_ok=false;
264  }
265  else {
266  if(m_appMgr->finalize().isFailure()) {
267  std::cerr << "Unable to finalize AppMgr" << std::endl;
268  all_ok=false;
269  }
270  }
271 
272  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
273  outwork->data = malloc(sizeof(int));
274  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
275  outwork->size = sizeof(int);
276 
277  // ...
278  // (possible) TODO: extend outwork with some error message, which will be eventually
279  // reported in the master proces
280  // ...
281  return outwork;
282 }
283 
284 std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedWriterTool::fin_func()
285 {
286  // Dummy
287  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
288  outwork->data = malloc(sizeof(int));
289  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
290  outwork->size = sizeof(int);
291  return outwork;
292 }
SharedWriterTool::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedWriterTool.cxx:284
SharedWriterTool::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedWriterTool.cxx:242
SharedWriterTool::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedWriterTool.h:51
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:428
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
IDataShare.h
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
SharedWriterTool::finalize
virtual StatusCode finalize() override
Definition: SharedWriterTool.cxx:43
SharedWriterTool::SharedWriterTool
SharedWriterTool()
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:358
AthCnvSvc.h
SharedWriterTool::~SharedWriterTool
virtual ~SharedWriterTool() override
Definition: SharedWriterTool.cxx:28
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
SharedWriterTool::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: SharedWriterTool.cxx:128
ProcessGroup.h
SharedWriterTool::m_rankId
int m_rankId
Definition: SharedWriterTool.h:49
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
IAthenaSharedWriterSvc.h
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
SharedWriterTool::initialize
virtual StatusCode initialize() override
Definition: SharedWriterTool.cxx:32
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedWriterTool.h
test_pyathena.parent
parent
Definition: test_pyathena.py:15
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:337
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
SharedWriterTool::m_nMotherProcess
Gaudi::Property< bool > m_nMotherProcess
Definition: SharedWriterTool.h:42
SharedWriterTool::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedWriterTool.cxx:134
SharedWriterTool::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedWriterTool.h:45
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:281
LArConditionsTestConfig.poolSvc
poolSvc
Definition: LArConditionsTestConfig.py:79
grepfile.filenames
list filenames
Definition: grepfile.py:34
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:408
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
SharedWriterTool::m_cnvSvc
SmartIF< IConversionSvc > m_cnvSvc
Definition: SharedWriterTool.h:52
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:421
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
SharedWriterTool::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedWriterTool.cxx:120
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthCnvSvc
Definition: AthCnvSvc.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:365