9 #include "GaudiKernel/IEvtSelector.h"
10 #include "GaudiKernel/IIoComponentMgr.h"
11 #include "GaudiKernel/IFileMgr.h"
12 #include "GaudiKernel/IMessageSvc.h"
33 ,
const std::string&
name
34 ,
const IInterface*
parent)
38 , m_subprocDirPrefix(
"")
41 , m_evtProcessor(
"AthenaEventLoopMgr",
name)
42 , m_appMgr(
"ApplicationMgr",
name)
43 , m_fileMgr(
"FileMgr",
name)
44 , m_ioMgr(
"IoComponentMgr",
name)
48 declareInterface<IAthenaMPTool>(
this);
70 SmartIF<IProperty> prpMgr(serviceLocator());
71 if(prpMgr.isValid()) {
78 ATH_MSG_ERROR(
"IProperty interface not found in ApplicationMgr");
79 return StatusCode::FAILURE;
84 SmartIF<IProperty> prpMgr1(
m_fileMgr.get());
85 if(prpMgr1.isValid()) {
86 m_fileMgrLog = prpMgr1->getProperty(
"LogFile").toString();
90 return StatusCode::FAILURE;
93 return StatusCode::SUCCESS;
98 return StatusCode::SUCCESS;
107 return StatusCode::SUCCESS;
116 return StatusCode::FAILURE;
133 ATH_MSG_WARNING(
name() <<
" cannot make output report because FileMgr has not been configured to write log file!");
140 std::ostringstream workindex;
153 std::ifstream inpStream(
logFile.string().c_str());
154 std::set<std::string> reportedFiles;
155 while(!inpStream.eof()) {
156 std::getline(inpStream,
line);
157 if(
line.find(
"WRITE")!=std::string::npos) {
160 std::vector<std::string>
entries;
161 while(startpos<
line.size()) {
162 while(
line[startpos]==
' ')
165 size_t endpos =
line.find(
' ',startpos);
166 if(endpos==std::string::npos) endpos =
line.size();
167 entries.push_back(
line.substr(startpos,endpos-startpos));
174 if(reportedFiles.find(
basename.string())==reportedFiles.end())
175 reportedFiles.insert(
basename.string());
180 if(
it1==jobOutputs->end()) {
186 newOutput.
filename = absolutename.string();
190 newOutput.
shared = (
line.find(
"SHARED")!=std::string::npos);
192 (*jobOutputs)[
basename.string()].push_back(newOutput);
213 kill(child.getProcessID(),SIGKILL);
219 std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork;
256 *(
int*)(outwork->
data) = 1;
263 int AthenaMPToolBase::mapAsyncFlag(Func_Flag
flag,
pid_t pid)
272 ATH_MSG_ERROR(
"Unable to map the flag on all subprocesses in the group");
281 int dup2result1(0), dup2result2(0);
283 int newout =
open(std::string(
rundir+
"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
288 dup2result1 = dup2(newout, STDOUT_FILENO);
289 dup2result2 = dup2(newout, STDERR_FILENO);
290 TEMP_FAILURE_RETRY(close(newout));
291 if(dup2result1==-1) {
295 if(dup2result2==-1) {
301 IMessageSvc* messageSvc(0);
302 StatusCode sc = serviceLocator()->service(
"MessageSvc",messageSvc);
308 IProperty* propertyServer =
dynamic_cast<IProperty*
>(messageSvc);
309 if(propertyServer==0) {
314 std::string propertyName(
"Format");
315 std::string oldFormat(
"");
316 StringProperty formatProp(propertyName,oldFormat);
317 sc = propertyServer->getProperty(&formatProp);
322 oldFormat = formatProp.value();
323 if(oldFormat.find(
"%t")==std::string::npos) {
325 std::string newFormat(
"%t " + oldFormat);
326 StringProperty newFormatProp(propertyName,newFormat);
327 if(propertyServer->setProperty(newFormatProp).isFailure()) {
328 ATH_MSG_ERROR(
"Unable to set new Format property on the Message Service");
333 ATH_MSG_DEBUG(
"MsgSvc format already contains timestamps. Nothing to be done");
343 if (!
m_ioMgr.retrieve().isSuccess()) {
352 if(!
m_ioMgr->io_update_all(abs_rundir.string()).isSuccess()) {
365 strerror_r(errnum, buf,
sizeof(buf));
366 return std::string(buf);
377 std::vector<const Io::FileAttr*> filemgrFiles;
378 std::vector<const Io::FileAttr*>::const_iterator itFile;
379 unsigned filenum =
m_fileMgr->getFiles(filemgrFiles);
380 if(filenum!=filemgrFiles.size())
381 ATH_MSG_WARNING(
"getFiles returned " << filenum <<
" while vector size is " << filemgrFiles.size());
383 for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
385 const std::string&
filename = (**itFile).name();
392 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " <<
filename);
404 if(fdLog.find(regEntry.fd)!=fdLog.end()) {
405 ATH_MSG_DEBUG(
"The file from FdsRegistry " << regEntry.name <<
" was registered with FileMgr. Skip reopening");
408 ATH_MSG_WARNING(
"The file " << regEntry.name <<
" has not been registered with the FileMgr!");
410 if(regEntry.fd==-1) {
412 ATH_MSG_WARNING(
"FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
416 if(
reopenFd(regEntry.fd,regEntry.name))
419 fdLog.insert(regEntry.fd);
427 if(std::filesystem::is_regular_file(
"PoolFileCatalog.xml.AthenaMP-saved"))
428 COPY_FILE_HACK(
"PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+
"/PoolFileCatalog.xml");
434 ATH_MSG_INFO(
"Bootstrap worker PID " << getpid() <<
" - waiting for SIGUSR1");
442 sigprocmask (SIG_BLOCK, &
mask, &oldmask);
444 sigsuspend (&oldmask);
445 sigprocmask (SIG_UNBLOCK, &
mask, NULL);
451 int old_openflags = fcntl(
fd,F_GETFL,0);
452 switch(old_openflags & O_ACCMODE) {
467 int old_descflags = fcntl(
fd,F_GETFD,0);
468 off_t oldpos = lseek(
fd,0,SEEK_CUR);
484 if(lseek(newfd,oldpos,SEEK_SET)==-1){
486 TEMP_FAILURE_RETRY(close(newfd));
489 TEMP_FAILURE_RETRY(close(
fd));
490 if(dup2(newfd,
fd)==-1) {
491 ATH_MSG_ERROR(
"When re-opening file descriptors unable to duplicate descriptor for " <<
name <<
". " <<
fmterror(errno));
492 TEMP_FAILURE_RETRY(close(newfd));
495 if(fcntl(
fd,F_SETFD,old_descflags)==-1) {
496 ATH_MSG_ERROR(
"When re-opening file descriptors unable to set descriptor flags for " <<
name <<
". " <<
fmterror(errno));
497 TEMP_FAILURE_RETRY(close(newfd));
500 TEMP_FAILURE_RETRY(close(newfd));