9 #include "GaudiKernel/IEvtSelector.h"
10 #include "GaudiKernel/IIoComponentMgr.h"
11 #include "GaudiKernel/IFileMgr.h"
12 #include "GaudiKernel/IMessageSvc.h"
33 ,
const std::string&
name
34 ,
const IInterface*
parent)
39 , m_subprocDirPrefix(
"")
42 , m_evtProcessor(
"AthenaEventLoopMgr",
name)
43 , m_appMgr(
"ApplicationMgr",
name)
44 , m_fileMgr(
"FileMgr",
name)
45 , m_ioMgr(
"IoComponentMgr",
name)
49 declareInterface<IAthenaMPTool>(
this);
71 SmartIF<IProperty> prpMgr(serviceLocator());
72 if(prpMgr.isValid()) {
82 ATH_MSG_ERROR(
"IProperty interface not found in ApplicationMgr");
83 return StatusCode::FAILURE;
88 SmartIF<IProperty> prpMgr1(
m_fileMgr.get());
89 if(prpMgr1.isValid()) {
90 m_fileMgrLog = prpMgr1->getProperty(
"LogFile").toString();
94 return StatusCode::FAILURE;
97 return StatusCode::SUCCESS;
102 return StatusCode::SUCCESS;
111 return StatusCode::SUCCESS;
120 return StatusCode::FAILURE;
137 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
144 std::ostringstream workindex;
157 std::ifstream inpStream(
logFile.string().c_str());
158 std::set<std::string> reportedFiles;
159 while(!inpStream.eof()) {
160 std::getline(inpStream,
line);
161 if(
line.find(
"WRITE")!=std::string::npos) {
164 std::vector<std::string>
entries;
165 while(startpos<
line.size()) {
166 while(
line[startpos]==
' ')
169 size_t endpos =
line.find(
' ',startpos);
170 if(endpos==std::string::npos) endpos =
line.size();
171 entries.push_back(
line.substr(startpos,endpos-startpos));
178 if(reportedFiles.find(
basename.string())==reportedFiles.end())
179 reportedFiles.insert(
basename.string());
184 if(
it1==jobOutputs->end()) {
190 newOutput.
filename = absolutename.string();
194 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
196 (*jobOutputs)[
basename.string()].push_back(newOutput);
217 kill(child.getProcessID(),SIGKILL);
223 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork;
260 *(
int*)(outwork->
data) = 1;
267 int AthenaMPToolBase::mapAsyncFlag(Func_Flag
flag,
pid_t pid)
276 ATH_MSG_ERROR(
"Unable to map the flag on all subprocesses in the group");
285 int dup2result1(0), dup2result2(0);
287 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
292 dup2result1 = dup2(newout, STDOUT_FILENO);
293 dup2result2 = dup2(newout, STDERR_FILENO);
294 TEMP_FAILURE_RETRY(close(newout));
295 if(dup2result1==-1) {
299 if(dup2result2==-1) {
305 SmartIF<IProperty> propertyServer(
msgSvc());
306 if(propertyServer==0) {
311 std::string propertyName(
"Format");
312 std::string oldFormat(
"");
313 StringProperty formatProp(propertyName,oldFormat);
314 StatusCode sc = propertyServer->getProperty(&formatProp);
319 oldFormat = formatProp.value();
320 if(oldFormat.find(
"%t")==std::string::npos) {
322 std::string newFormat(
"%t " + oldFormat);
323 StringProperty newFormatProp(propertyName,newFormat);
324 if(propertyServer->setProperty(newFormatProp).isFailure()) {
325 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
330 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
340 if (!
m_ioMgr.retrieve().isSuccess()) {
349 if(!
m_ioMgr->io_update_all(abs_rundir.string()).isSuccess()) {
362 strerror_r(errnum, buf,
sizeof(buf));
363 return std::string(buf);
374 std::vector<const Io::FileAttr*> filemgrFiles;
375 std::vector<const Io::FileAttr*>::const_iterator itFile;
376 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
377 if(filenum!=filemgrFiles.size())
378 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
380 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
382 const std::string&
filename = (**itFile).name();
389 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
401 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
402 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
405 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
407 if(regEntry.fd==-1) {
409 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
413 if(
reopenFd(regEntry.fd,regEntry.name))
416 fdLog.insert(regEntry.fd);
424 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
425 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
431 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
439 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
441 sigsuspend (&oldmask);
442 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
448 int old_openflags = fcntl(
fd,F_GETFL,0);
449 switch(old_openflags & O_ACCMODE) {
464 int old_descflags = fcntl(
fd,F_GETFD,0);
465 off_t oldpos = lseek(
fd,0,SEEK_CUR);
481 if(lseek(newfd,oldpos,SEEK_SET)==-1){
483 TEMP_FAILURE_RETRY(close(newfd));
486 TEMP_FAILURE_RETRY(close(
fd));
487 if(dup2(newfd,
fd)==-1) {
488 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
489 TEMP_FAILURE_RETRY(close(newfd));
492 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
493 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
494 TEMP_FAILURE_RETRY(close(newfd));
497 TEMP_FAILURE_RETRY(close(newfd));