ATLAS Offline Software
Classes | Typedefs | Functions
AthenaInterprocess Namespace Reference

Classes

struct  FdsRegistryEntry
 
class  IdentifiedSharedQueue
 
class  IMessageDecoder
 
class  Process
 
class  ProcessGroup
 
struct  ProcessResult
 
struct  ProcessStatus
 
struct  ScheduledWork
 
class  SharedQueue
 
class  UpdateAfterFork
 

Typedefs

typedef std::vector< FdsRegistryEntryFdsRegistry
 

Functions

std::string randString ()
 
std::string add_pid (const std::string &buf)
 
std::string get_pid (const std::string &buf, pid_t &pid)
 
pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE ()
 
int ProcessGroup::map_async ATLAS_NOT_THREAD_SAFE (const IMessageDecoder *func, const ScheduledWork *args, pid_t pid)
 
int ProcessGroup::wait ATLAS_NOT_THREAD_SAFE (int options)
 

Typedef Documentation

◆ FdsRegistry

Definition at line 22 of file FdsRegistry.h.

Function Documentation

◆ add_pid()

std::string AthenaInterprocess::add_pid ( const std::string &  buf)
inline

Definition at line 31 of file IdentifiedSharedQueue.cxx.

32 {
33  std::ostringstream s;
34  s << (long)getpid() << '\0' << buf;
35  return s.str();
36 }

◆ ATLAS_NOT_THREAD_SAFE() [1/3]

bool ProcessGroup::create AthenaInterprocess::ATLAS_NOT_THREAD_SAFE ( )

Definition at line 49 of file ProcessGroup.cxx.

50 {
51  if(m_pgid==-1)
52  return -1;
53 
54  // Create new process
55  SharedQueue queue = create_queue("child", m_processesCreated);
56  Process p = Process::launch();
57  p.connectIn(queue);
58  p.connectOut(m_inbox);
59  pid_t newpid = p.getProcessID();
60  if(newpid==0) {
61  int status = p.mainloop();
62  exit(status);
63  }
64 
65  if(m_processes.empty())
66  m_pgid = getpgid(newpid);
67  else
68  setpgid(newpid, m_pgid);
69 
70  m_processes.push_back(p);
71  m_processesCreated++;
72  return newpid;
73 }

◆ ATLAS_NOT_THREAD_SAFE() [2/3]

int ProcessGroup::map_async AthenaInterprocess::ATLAS_NOT_THREAD_SAFE ( const IMessageDecoder func,
const ScheduledWork args,
pid_t  pid 
)

Definition at line 75 of file ProcessGroup.cxx.

75  {
76  // If pid=0, map the function-object 'func' onto all current child processes. Does
77  // not wait for the results, but will return success only if the writing to
78  // all the child queues succeeds.
79  //
80  // If pid!=0 then map only onto the process with given pid
81  if(m_processes.empty()) {
82  if(!create())
83  return -1;
84  }
85 
86  if(pid==0) { // Map on all processes in the group
87  for(std::vector<Process>::iterator iproc = m_processes.begin();
88  iproc!=m_processes.end(); ++iproc) {
89 
90  if(!iproc->schedule(func, args)) {
91  // stopping the scheduling on all other processes is debatable ...
92  return -1;
93  }
94  }
95  return 0;
96  }
97  else { // Map only onto the given pid
98  for(std::vector<Process>::iterator iproc=m_processes.begin();
99  iproc!=m_processes.end(); ++iproc) {
100  if(iproc->getProcessID()==pid)
101  return (iproc->schedule(func,args) ? 0 : -1);
102  }
103  // pid not found in the group
104  return -1;
105  }
106 }

◆ ATLAS_NOT_THREAD_SAFE() [3/3]

int ProcessGroup::wait AthenaInterprocess::ATLAS_NOT_THREAD_SAFE ( int  options)

Definition at line 108 of file ProcessGroup.cxx.

109 {
110  // Wait for all child processes and store their status codes in m_statuses
111  if(m_processes.empty())
112  return 0;
113 
114  // Schedule an exit if we are to wait forever
115  if(!(options & WNOHANG) && map_async(0,0))
116  return -1;
117 
118  int result = 0;
119  while(m_processes.size()) {
120  int child_status = 0;
121  pid_t child = waitpid(-m_pgid, &child_status, options);
122  if(child < 0) {
123  result = errno;
124  break;
125  }
126  else if(child==0) {
127  continue;
128  }
129 
130  if(WIFSIGNALED(child_status))
131  std::cout << "SIGNAL! " << child << " (" << child_status << "," << WTERMSIG(child_status) << ")" << std::endl;
132 
133  child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
134  ProcessStatus p = {child, child_status};
135  m_statuses.push_back(p);
136 
137  // remove the process from m_processes
138  for(std::vector<Process>::iterator iproc=m_processes.begin();
139  iproc!=m_processes.end(); ++iproc) {
140  if(iproc->getProcessID()==child) {
141  m_processes.erase(iproc);
142  break;
143  }
144  }
145  }
146  return result;
147 }

◆ get_pid()

std::string AthenaInterprocess::get_pid ( const std::string &  buf,
pid_t pid 
)
inline

Definition at line 48 of file IdentifiedSharedQueue.cxx.

49 {
50  std::istringstream s( buf );
51  long id = -1;
52  s >> id;
53  pid = (pid_t)id;
54  std::string::size_type pos = s.tellg();
55  if ( (pos+1) <= buf.size() )
56  return buf.substr( pos+1 );
57  return "";
58 }

◆ randString()

std::string AthenaInterprocess::randString ( )
inline

Definition at line 13 of file Control/AthenaInterprocess/AthenaInterprocess/Utilities.h.

14  {
15  uuid_t uuid;
16  char out[SSIZE];
17  uuid_generate(uuid);
18  uuid_unparse(uuid, out);
19  return std::string(out);
20  }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
python.SystemOfUnits.s
int s
Definition: SystemOfUnits.py:131
get_generator_info.result
result
Definition: get_generator_info.py:21
python.PerfMonSerializer.p
def p
Definition: PerfMonSerializer.py:743
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:379
beamspotman.queue
string queue
Definition: beamspotman.py:347
calibdata.exit
exit
Definition: calibdata.py:236
SSIZE
#define SSIZE
Definition: Control/AthenaInterprocess/AthenaInterprocess/Utilities.h:10
id
SG::auxid_t id
Definition: Control/AthContainers/Root/debug.cxx:191
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:18
merge.status
status
Definition: merge.py:17
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
python.AnalysisApp.launch
def launch(flags, ca)
Definition: AnalysisApp.py:115