9 #include "GaudiKernel/IEvtSelector.h"
10 #include "GaudiKernel/IIoComponentMgr.h"
11 #include "GaudiKernel/IFileMgr.h"
12 #include "GaudiKernel/IMessageSvc.h"
33 ,
const std::string&
name
34 ,
const IInterface*
parent)
38 , m_subprocDirPrefix(
"")
41 , m_evtProcessor(
"AthenaEventLoopMgr",
name)
42 , m_appMgr(
"ApplicationMgr",
name)
43 , m_fileMgr(
"FileMgr",
name)
44 , m_ioMgr(
"IoComponentMgr",
name)
48 declareInterface<IAthenaMPTool>(
this);
70 SmartIF<IProperty> prpMgr(serviceLocator());
71 if(prpMgr.isValid()) {
81 ATH_MSG_ERROR(
"IProperty interface not found in ApplicationMgr");
82 return StatusCode::FAILURE;
87 SmartIF<IProperty> prpMgr1(
m_fileMgr.get());
88 if(prpMgr1.isValid()) {
89 m_fileMgrLog = prpMgr1->getProperty(
"LogFile").toString();
93 return StatusCode::FAILURE;
96 return StatusCode::SUCCESS;
101 return StatusCode::SUCCESS;
110 return StatusCode::SUCCESS;
119 return StatusCode::FAILURE;
136 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
143 std::ostringstream workindex;
156 std::ifstream inpStream(
logFile.string().c_str());
157 std::set<std::string> reportedFiles;
158 while(!inpStream.eof()) {
159 std::getline(inpStream,
line);
160 if(
line.find(
"WRITE")!=std::string::npos) {
163 std::vector<std::string>
entries;
164 while(startpos<
line.size()) {
165 while(
line[startpos]==
' ')
168 size_t endpos =
line.find(
' ',startpos);
169 if(endpos==std::string::npos) endpos =
line.size();
170 entries.push_back(
line.substr(startpos,endpos-startpos));
177 if(reportedFiles.find(
basename.string())==reportedFiles.end())
178 reportedFiles.insert(
basename.string());
183 if(
it1==jobOutputs->end()) {
189 newOutput.
filename = absolutename.string();
193 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
195 (*jobOutputs)[
basename.string()].push_back(newOutput);
216 kill(child.getProcessID(),SIGKILL);
222 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork;
259 *(
int*)(outwork->
data) = 1;
266 int AthenaMPToolBase::mapAsyncFlag(Func_Flag
flag,
pid_t pid)
275 ATH_MSG_ERROR(
"Unable to map the flag on all subprocesses in the group");
284 int dup2result1(0), dup2result2(0);
286 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
291 dup2result1 = dup2(newout, STDOUT_FILENO);
292 dup2result2 = dup2(newout, STDERR_FILENO);
293 TEMP_FAILURE_RETRY(close(newout));
294 if(dup2result1==-1) {
298 if(dup2result2==-1) {
304 SmartIF<IProperty> propertyServer(
msgSvc());
305 if(propertyServer==0) {
310 std::string propertyName(
"Format");
311 std::string oldFormat(
"");
312 StringProperty formatProp(propertyName,oldFormat);
313 StatusCode sc = propertyServer->getProperty(&formatProp);
318 oldFormat = formatProp.value();
319 if(oldFormat.find(
"%t")==std::string::npos) {
321 std::string newFormat(
"%t " + oldFormat);
322 StringProperty newFormatProp(propertyName,newFormat);
323 if(propertyServer->setProperty(newFormatProp).isFailure()) {
324 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
329 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
339 if (!
m_ioMgr.retrieve().isSuccess()) {
348 if(!
m_ioMgr->io_update_all(abs_rundir.string()).isSuccess()) {
361 strerror_r(errnum, buf,
sizeof(buf));
362 return std::string(buf);
373 std::vector<const Io::FileAttr*> filemgrFiles;
374 std::vector<const Io::FileAttr*>::const_iterator itFile;
375 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
376 if(filenum!=filemgrFiles.size())
377 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
379 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
381 const std::string&
filename = (**itFile).name();
388 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
400 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
401 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
404 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
406 if(regEntry.fd==-1) {
408 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
412 if(
reopenFd(regEntry.fd,regEntry.name))
415 fdLog.insert(regEntry.fd);
423 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
424 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
430 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
438 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
440 sigsuspend (&oldmask);
441 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
447 int old_openflags = fcntl(
fd,F_GETFL,0);
448 switch(old_openflags & O_ACCMODE) {
463 int old_descflags = fcntl(
fd,F_GETFD,0);
464 off_t oldpos = lseek(
fd,0,SEEK_CUR);
480 if(lseek(newfd,oldpos,SEEK_SET)==-1){
482 TEMP_FAILURE_RETRY(close(newfd));
485 TEMP_FAILURE_RETRY(close(
fd));
486 if(dup2(newfd,
fd)==-1) {
487 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
488 TEMP_FAILURE_RETRY(close(newfd));
491 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
492 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
493 TEMP_FAILURE_RETRY(close(newfd));
496 TEMP_FAILURE_RETRY(close(newfd));