ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaMPToolBase.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "AthenaMPToolBase.h"
8#include "CxxUtils/xmalloc.h"
9
10#include "GaudiKernel/IEvtSelector.h"
11#include "GaudiKernel/IIoComponentMgr.h"
12#include "GaudiKernel/IFileMgr.h"
13#include "GaudiKernel/IMessageSvc.h"
14
15#include <sys/stat.h>
16#include <sstream>
17#include <fstream>
18#include <unistd.h>
19#include <stdio.h>
20#include <stdint.h>
21#include <signal.h>
22
23#include <iterator>
24#include <filesystem>
25
27 std::atomic<bool> sig_done = false;
28 void pauseForDebug(int /*sig*/) {
29 sig_done = true;
30 }
31}
32
34 , const std::string& name
35 , const IInterface* parent)
36 : base_class(type,name,parent)
37 , m_evtProcessor("AthenaEventLoopMgr",name)
38 , m_appMgr("ApplicationMgr",name)
39 , m_fileMgr("FileMgr",name)
40 , m_ioMgr("IoComponentMgr",name)
41{
42}
43
47
49{
50 ATH_MSG_DEBUG("In initialize");
51
52 if(m_isPileup) {
53 m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
54 ATH_MSG_INFO("The job running in pileup mode");
55 }
56 else {
57 ATH_MSG_INFO("The job running in non-pileup mode");
58 }
59
60 ATH_CHECK(m_evtProcessor.retrieve());
61 ATH_CHECK(m_appMgr.retrieve());
62
63 SmartIF<IProperty> prpMgr(serviceLocator());
64 if(prpMgr.isValid()) {
65 // Get event selector name. Retrieve EventSelector and EventSeek
66 m_evtSelName = prpMgr->getProperty("EvtSel").toString();
67 // If the job runs with no event selector, then the name will be empty
68 if(!m_evtSelName.empty()) {
69 m_evtSelector = serviceLocator()->service(m_evtSelName);
70 ATH_CHECK(m_evtSelector.isValid());
71 }
72 }
73 else {
74 ATH_MSG_ERROR("IProperty interface not found in ApplicationMgr");
75 return StatusCode::FAILURE;
76 }
77
78 ATH_CHECK(m_fileMgr.retrieve());
79
80 SmartIF<IProperty> prpMgr1(m_fileMgr.get());
81 ATH_CHECK(prpMgr1.isValid());
82 m_fileMgrLog = prpMgr1->getProperty("LogFile").toString();
83
84 return StatusCode::SUCCESS;
85}
86
88{
89 return StatusCode::SUCCESS;
90}
91
92StatusCode AthenaMPToolBase::wait_once(pid_t& pid)
93{
94 bool flag(true);
95 pid = m_processGroup->wait_once(flag);
96
97 if(flag) { // Either none of the processes changed its status or one of the processes finished successfully
98 return StatusCode::SUCCESS;
99 }
100 else {
101 if(pid<0) { // Wait failed on the group
102 ATH_MSG_ERROR("Wait failed on the Process Group!");
103 }
104 else {
105 ATH_MSG_WARNING("Abnormal termination of the process PID=" << pid);
106 }
107 return StatusCode::FAILURE;
108 }
109}
110
112{
113 ATH_MSG_INFO("Statuses of sub-processes");
114 const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
115 for(size_t i=0; i<statuses.size(); ++i)
116 ATH_MSG_INFO("*** Process PID=" << statuses[i].pid << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS"));
117}
118
120{
122
123 if(m_fileMgrLog.empty()) {
124 ATH_MSG_WARNING(name() << " cannot make output report because FileMgr has not been configured to write log file!");
125 }
126 else {
127 // Collect output files made by the workers
128 std::string line;
129 for(int i=0;i<m_nprocs;++i) {
130 // Get the name of FileMgr log
131 std::ostringstream workindex;
132 workindex<<i;
133 std::filesystem::path logFilePath(m_subprocTopDir);
134 logFilePath /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
135 std::filesystem::path logFile(logFilePath);
136 logFile /= std::filesystem::path(m_fileMgrLog);
137 if(!(std::filesystem::exists(logFile)&&std::filesystem::is_regular_file(logFile))) {
138 ATH_MSG_WARNING(logFile.string() << " either does not exist or is not a regular file. Skipping");
139 continue;
140 }
141
142 ATH_MSG_DEBUG("FileMgr log file (" << i << ") " << logFile);
143
144 std::ifstream inpStream(logFile.string().c_str());
145 std::set<std::string> reportedFiles; // Don't report the same file twice
146 while(!inpStream.eof()) {
147 std::getline(inpStream,line);
148 if(line.find("WRITE")!=std::string::npos) {
149 // Parse the entry
150 size_t startpos(0);
151 std::vector<std::string> entries;
152 while(startpos<line.size()) {
153 while(line[startpos]==' ')
154 startpos++;
155
156 size_t endpos = line.find(' ',startpos);
157 if(endpos==std::string::npos) endpos = line.size();
158 entries.push_back(line.substr(startpos,endpos-startpos));
159 startpos=endpos+1;
160 }
161
162 // enties[0] is filename
163 std::filesystem::path filenamePath(entries[0]);
164 std::filesystem::path basename = filenamePath.filename();
165 if(reportedFiles.find(basename.string())==reportedFiles.end())
166 reportedFiles.insert(basename.string());
167 else
168 continue;
169 std::filesystem::path absolutename = basename.is_absolute() ? basename : std::filesystem::absolute(std::filesystem::path(logFilePath)/=basename);
170 AthenaMP::AllWorkerOutputsIterator it1 = jobOutputs->find(basename.string());
171 if(it1==jobOutputs->end()) {
172 (*jobOutputs)[basename.string()] = AthenaMP::SingleWorkerOutputs();
173 (*jobOutputs)[basename.string()].reserve(m_nprocs);
174 }
175
176 AthenaMP::WorkerOutput newOutput;
177 newOutput.filename = absolutename.string();
178 newOutput.technology = entries[1];
179 newOutput.description = entries[2];
180 newOutput.access_mode = entries[3];
181 newOutput.shared = (line.find("SHARED")!=std::string::npos);
182
183 (*jobOutputs)[basename.string()].emplace_back(std::move(newOutput));
184 }
185 }
186 }
187 }
188 return jobOutputs;
189}
190
191void AthenaMPToolBase::useFdsRegistry(std::shared_ptr<AthenaInterprocess::FdsRegistry> registry)
192{
193 m_fdsRegistry = std::move(registry);
194}
195
196void AthenaMPToolBase::setRandString(const std::string& randStr)
197{
198 m_randStr = randStr;
199}
200
202{
203 for(const AthenaInterprocess::Process& child : m_processGroup->getChildren()) {
204 kill(child.getProcessID(),SIGKILL);
205 }
206}
207
208std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::operator()(const AthenaInterprocess::ScheduledWork& param)
209{
210 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork;
211 bool all_ok(true);
212
213 if(param.size==sizeof(Func_Flag)) {
214 Func_Flag flag;
215 memcpy(&flag,param.data,param.size);
216 switch(flag) {
217 case FUNC_BOOTSTRAP:
218 {
219 outwork = bootstrap_func();
220 break;
221 }
222 case FUNC_EXEC:
223 {
224 outwork = exec_func();
225 break;
226 }
227 case FUNC_FIN:
228 {
229 outwork = fin_func();
230 break;
231 }
232 default:
233 {
234 ATH_MSG_ERROR("Unexpected value for the function flag");
235 all_ok = false;
236 }
237 }
238 }
239 else {
240 ATH_MSG_ERROR("Unexpected parameter size");
241 all_ok = false;
242 }
243
244 if(!all_ok) {
245 outwork = std::unique_ptr<AthenaInterprocess::ScheduledWork>(new AthenaInterprocess::ScheduledWork);
246 outwork->data = CxxUtils::xmalloc(sizeof(int));
247 *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
248 outwork->size = sizeof(int);
249 }
250
251 return outwork;
252}
253
254int AthenaMPToolBase::mapAsyncFlag(Func_Flag flag, pid_t pid)
255{
256 AthenaInterprocess::ScheduledWork params;
257 params.data = (void*)(&flag);
258 params.size = sizeof(flag);
259 if(m_processGroup->map_async(this,&params,pid)){
260 if(pid)
261 ATH_MSG_ERROR("Unable to map the flag on subprocess pid=" << pid);
262 else
263 ATH_MSG_ERROR("Unable to map the flag on all subprocesses in the group");
264 return -1;
265 }
266 return 0;
267}
268
269int AthenaMPToolBase::redirectLog(const std::string& rundir, bool addTimeStamp)
270{
271 // Redirect both stdout and stderr to the same file AthenaMP.log
272 int dup2result1(0), dup2result2(0);
273
274 int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275 if(newout==-1) {
276 ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277 return -1;
278 }
279 dup2result1 = dup2(newout, STDOUT_FILENO);
280 dup2result2 = dup2(newout, STDERR_FILENO);
281 TEMP_FAILURE_RETRY(close(newout));
282 if(dup2result1==-1) {
283 ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284 return -1;
285 }
286 if(dup2result2==-1) {
287 ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288 return -1;
289 }
290
291 if(addTimeStamp) {
292 SmartIF<IProperty> propertyServer(msgSvc());
293 if(propertyServer==0) {
294 ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295 return -1;
296 }
297
298 std::string propertyName("Format");
299 std::string oldFormat("");
300 StringProperty formatProp(propertyName,oldFormat);
301 StatusCode sc = propertyServer->getProperty(&formatProp);
302 if(sc.isFailure()) {
303 ATH_MSG_WARNING("Message Service does not have Format property");
304 }
305 else {
306 oldFormat = formatProp.value();
307 if(oldFormat.find("%t")==std::string::npos) {
308 // Add time stamps
309 std::string newFormat("%t " + oldFormat);
310 StringProperty newFormatProp(std::move(propertyName),newFormat);
311 ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312 }
313 else {
314 ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315 }
316 }
317 }
318
319 return 0;
320}
321
322int AthenaMPToolBase::updateIoReg(const std::string& rundir)
323{
324 ATH_CHECK(m_ioMgr.retrieve(), -1);
325
326 // update the IoRegistry for the new workdir - make sure we use absolute path
327 std::filesystem::path abs_rundir = std::filesystem::absolute(rundir);
328 ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329
330 return 0;
331}
332
333std::string AthenaMPToolBase::fmterror(int errnum)
334{
335 char buf[256];
336 strerror_r(errnum, buf, sizeof(buf));
337 return std::string(buf);
338}
339
341{
342 // Reopen file descriptors.
343 // First go over all open files, which have been registered with the FileMgr
344 // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345 std::set<int> fdLog;
346
347 // Query the FileMgr contents
348 std::vector<const Io::FileAttr*> filemgrFiles;
349 std::vector<const Io::FileAttr*>::const_iterator itFile;
350 unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351 if(filenum!=filemgrFiles.size())
352 ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353
354 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355 ATH_MSG_DEBUG("* " << **itFile);
356 const std::string& filename = (**itFile).name();
357 Io::Fd fd = (**itFile).fd();
358
359 if(fd==-1) {
360 // It is legal to have fd=-1 for remote inputs
361 // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362 // So, this hopefully is a temporary patch
363 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364 continue;
365 }
366
367 if(reopenFd(fd,filename))
368 return -1;
369
370 fdLog.insert(fd);
371 }
372
373 // Check the FdsRegistry
375 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376 ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377 }
378 else {
379 ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380
381 if(regEntry.fd==-1) {
382 // Same protection as the one above
383 ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384 continue;
385 }
386
387 if(reopenFd(regEntry.fd,regEntry.name))
388 return -1;
389
390 fdLog.insert(regEntry.fd);
391 }
392 }
393 return 0;
394}
395
396int AthenaMPToolBase::handleSavedPfc(const std::filesystem::path& dest_path)
397{
398 if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399 COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400 return 0;
401}
402
404{
405 ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406 sigset_t mask, oldmask;
407
408 signal(SIGUSR1, AthenaMPToolBase_d::pauseForDebug);
409
410 sigemptyset (&mask);
411 sigaddset (&mask, SIGUSR1);
412
413 sigprocmask (SIG_BLOCK, &mask, &oldmask);
415 sigsuspend (&oldmask);
416 sigprocmask (SIG_UNBLOCK, &mask, NULL);
417}
418
419int AthenaMPToolBase::reopenFd(int fd, const std::string& name)
420{
421 ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422 int old_openflags = fcntl(fd,F_GETFL,0);
423 switch(old_openflags & O_ACCMODE) {
424 case O_RDONLY: {
425 ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426 break;
427 }
428 case O_WRONLY: {
429 ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430 break;
431 }
432 case O_RDWR: {
433 ATH_MSG_DEBUG("The File Access Mode is RDWR");
434 break;
435 }
436 }
437
438 int old_descflags = fcntl(fd,F_GETFD,0);
439 off_t oldpos = lseek(fd,0,SEEK_CUR);
440 if(oldpos==-1) {
441 if(errno==ESPIPE) {
442 ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443 }
444 else {
445 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446 return -1;
447 }
448 }
449 else {
450 Io::Fd newfd = open(name.c_str(),old_openflags);
451 if(newfd==-1) {
452 ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453 return -1;
454 }
455 if(lseek(newfd,oldpos,SEEK_SET)==-1){
456 ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457 TEMP_FAILURE_RETRY(close(newfd));
458 return -1;
459 }
460 TEMP_FAILURE_RETRY(close(fd));
461 if(dup2(newfd,fd)==-1) {
462 ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463 TEMP_FAILURE_RETRY(close(newfd));
464 return -1;
465 }
466 if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467 ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468 TEMP_FAILURE_RETRY(close(newfd));
469 return -1;
470 }
471 TEMP_FAILURE_RETRY(close(newfd));
472 }
473 return 0;
474}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
int32_t pid_t
static Double_t sc
#define sigemptyset(x)
Definition SealSignal.h:82
#define sigaddset(x, y)
Definition SealSignal.h:84
int sigset_t
Definition SealSignal.h:80
virtual void killChildren() override
virtual ~AthenaMPToolBase() override
std::string m_subprocTopDir
Top run directory for subprocesses.
virtual void useFdsRegistry(std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
int handleSavedPfc(const std::filesystem::path &dest_path)
int updateIoReg(const std::string &rundir)
std::string m_evtSelName
Name of the event selector.
SmartIF< IEvtSelector > m_evtSelector
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func()=0
virtual StatusCode initialize() override
Gaudi::Property< bool > m_isPileup
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
std::string m_fileMgrLog
ServiceHandle< IAppMgrUI > m_appMgr
int m_nprocs
Number of workers spawned by the master process.
ServiceHandle< IIoComponentMgr > m_ioMgr
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
virtual void reportSubprocessStatuses() override
int reopenFd(int fd, const std::string &name)
AthenaInterprocess::ProcessGroup * m_processGroup
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator()ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func()=0
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
ServiceHandle< IEventProcessor > m_evtProcessor
ServiceHandle< IFileMgr > m_fileMgr
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func()=0
std::string m_subprocDirPrefix
For ex. "worker__".
virtual StatusCode finalize() override
virtual void setRandString(const std::string &randStr) override
std::string fmterror(int errnum)
#define COPY_FILE_HACK(_src, _dest)
double entries
Definition listroot.cxx:49
std::atomic< bool > sig_done
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
AllWorkerOutputs::iterator AllWorkerOutputsIterator
std::vector< WorkerOutput > SingleWorkerOutputs
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
bool flag
Definition master.py:29
std::string basename(std::string name)
Definition utils.cxx:207
Trapping version of malloc.