ATLAS Offline Software
ProcessGroup.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
3 */
4 
8 
9 #include <boost/interprocess/ipc/message_queue.hpp>
10 
11 #include <sys/wait.h>
12 #include <errno.h>
13 #include <stdlib.h>
14 #include <unistd.h>
15 
16 #include <sstream>
17 #include <iostream>
18 
19 using namespace boost::interprocess;
20 
21 
22 //- helper -------------------------------------------------------------------
23 static inline AthenaInterprocess::SharedQueue create_queue( const std::string& owner, int count )
24 {
25  std::ostringstream s;
26  s << "athenamp_" << owner << '_' << getpid() << '_' << count << '_' << AthenaInterprocess::randString() << std::ends;
28  return queue;
29 }
30 
31 namespace AthenaInterprocess {
32 
33 //- construction/destruction -------------------------------------------------
34 ProcessGroup::ProcessGroup(int nprocs)
35  : m_nprocs(nprocs)
36  , m_pgid(-1)
37  , m_processesCreated(0)
38 {
39  if ( m_nprocs < 0 ) {
40  // determine number of cores from system (available ones only)
41  m_nprocs = sysconf( _SC_NPROCESSORS_ONLN );
42  }
43 }
44 
46 {
47 }
48 
49 pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE ()
50 {
51  if(m_pgid==-1)
52  return -1;
53 
54  // Create new process
55  SharedQueue queue = create_queue("child", m_processesCreated);
57  p.connectIn(queue);
58  p.connectOut(m_inbox);
59  pid_t newpid = p.getProcessID();
60  if(newpid==0) {
61  int status = p.mainloop();
62  exit(status);
63  }
64 
65  if(m_processes.empty())
66  m_pgid = getpgid(newpid);
67  else
68  setpgid(newpid, m_pgid);
69 
70  m_processes.push_back(p);
71  m_processesCreated++;
72  return newpid;
73 }
74 
75 int ProcessGroup::map_async ATLAS_NOT_THREAD_SAFE (const IMessageDecoder* func, const ScheduledWork* args, pid_t pid) {
76  // If pid=0, map the function-object 'func' onto all current child processes. Does
77  // not wait for the results, but will return success only if the writing to
78  // all the child queues succeeds.
79  //
80  // If pid!=0 then map only onto the process with given pid
81  if(m_processes.empty()) {
82  if(!create())
83  return -1;
84  }
85 
86  if(pid==0) { // Map on all processes in the group
87  for(std::vector<Process>::iterator iproc = m_processes.begin();
88  iproc!=m_processes.end(); ++iproc) {
89 
90  if(!iproc->schedule(func, args)) {
91  // stopping the scheduling on all other processes is debatable ...
92  return -1;
93  }
94  }
95  return 0;
96  }
97  else { // Map only onto the given pid
98  for(std::vector<Process>::iterator iproc=m_processes.begin();
99  iproc!=m_processes.end(); ++iproc) {
100  if(iproc->getProcessID()==pid)
101  return (iproc->schedule(func,args) ? 0 : -1);
102  }
103  // pid not found in the group
104  return -1;
105  }
106 }
107 
108 int ProcessGroup::wait ATLAS_NOT_THREAD_SAFE (int options)
109 {
110  // Wait for all child processes and store their status codes in m_statuses
111  if(m_processes.empty())
112  return 0;
113 
114  // Schedule an exit if we are to wait forever
115  if(!(options & WNOHANG) && map_async(0,0))
116  return -1;
117 
118  int result = 0;
119  while(m_processes.size()) {
120  int child_status = 0;
121  pid_t child = waitpid(-m_pgid, &child_status, options);
122  if(child < 0) {
123  result = errno;
124  break;
125  }
126  else if(child==0) {
127  continue;
128  }
129 
130  if(WIFSIGNALED(child_status))
131  std::cout << "SIGNAL! " << child << " (" << child_status << "," << WTERMSIG(child_status) << ")" << std::endl;
132 
133  child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
134  ProcessStatus p = {child, child_status};
135  m_statuses.push_back(p);
136 
137  // remove the process from m_processes
138  for(std::vector<Process>::iterator iproc=m_processes.begin();
139  iproc!=m_processes.end(); ++iproc) {
140  if(iproc->getProcessID()==child) {
141  m_processes.erase(iproc);
142  break;
143  }
144  }
145  }
146  return result;
147 }
148 
150 {
151  flag = true;
152  if(m_processes.empty()) return 0;
153  int child_status = 0;
154  pid_t child = waitpid(-m_pgid, &child_status,WNOHANG);
155  if(child>0) {
156  // Check if the process has failed
157  flag = !(WIFSIGNALED(child_status) || WEXITSTATUS(child_status));
158  // save the status
159  child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
160  ProcessStatus p = {child, child_status};
161  m_statuses.push_back(p);
162  // remove the process from m_processes
164  iproc!=m_processes.end(); ++iproc) {
165  if(iproc->getProcessID()==child) {
166  m_processes.erase(iproc);
167  break;
168  }
169  }
170  }
171  else if(child<0) {
172  flag = false;
173  }
174  return child;
175 }
176 
178 {
179  ProcessResult* result(0);
180  pid_t pid = -1;
181  std::string buf = m_inbox.try_receive( pid );
182  if(pid>0) {
183  result = new ProcessResult;
184  result->pid = pid;
185  result->output.data = malloc(buf.size());
186  memcpy(result->output.data,buf.data(),buf.size());
187  result->output.size = buf.size();
188  }
189  return result;
190 }
191 
193 {
194  return m_pgid;
195 }
196 
197 const std::vector<Process>& ProcessGroup::getChildren() const
198 {
199 // Give access to the processes; this is temporarily needed for the python
200 // interface, but it is better broken up into the actual needs).
201  return m_processes;
202 }
203 
204 const std::vector<ProcessStatus>& ProcessGroup::getStatuses() const
205 {
206  return m_statuses;
207 }
208 
209 bool ProcessGroup::create ATLAS_NOT_THREAD_SAFE ()
210 {
211 // TODO: this code leaves the queues from the previous children visible to all
212 // their subsequent siblings. This can be helped by creating the queue first
213 // in the children, but the current code at least requires no synchronization,
214 // and has a minimum chance of leaving resources on failure.
215 
216  if ( m_nprocs <= 0 )
217  return false;
218 
219 // a single queue for posting back onto the mother
220  if ( ! m_inbox ) {
221  std::ostringstream s;
222  s << "athenamp_mother_" << getpid() << '_' << AthenaInterprocess::randString() << std::ends;
223  m_inbox = AthenaInterprocess::IdentifiedSharedQueue( s.str() );
224  }
225 
226 // create process group leader
227  SharedQueue queue = create_queue( "child", 0 );
228  Process leader = Process::launch();
229  leader.connectIn( queue );
230  leader.connectOut( m_inbox );
231  pid_t lpid = leader.getProcessID();
232  if ( lpid == 0 ) {
233  //m_tool->setRankId(0);
234  int status = leader.mainloop();
235  exit( status );
236  } else if ( lpid == -1 )
237  return false;
238 
239  setpgid( lpid, 0 );
240  m_processes.push_back( leader );
241  m_pgid = getpgid( lpid );
242 
243 // create rest of the group
244  for ( int i = 1; i < m_nprocs; ++i ) {
245  SharedQueue queue = create_queue( "child", i );
246  Process p = Process::launch();
247  p.connectIn( queue );
248  p.connectOut( m_inbox );
249  if ( p.getProcessID() == 0 ) {
250  int status = p.mainloop();
251  exit( status );
252  }
253  setpgid( p.getProcessID(), m_pgid );
254  m_processes.push_back( p );
255  }
256  m_processesCreated = m_nprocs;
257  return true;
258 }
259 
260 } // namespace AthenaInterprocess
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess
Definition: FdsRegistry.h:11
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
python.SystemOfUnits.s
int s
Definition: SystemOfUnits.py:131
AthenaInterprocess::ProcessGroup::pullOneResult
ProcessResult * pullOneResult()
Definition: ProcessGroup.cxx:177
get_generator_info.result
result
Definition: get_generator_info.py:21
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
ProcessGroup.h
XMLtoHeader.count
count
Definition: XMLtoHeader.py:85
AthenaInterprocess::IdentifiedSharedQueue::try_receive
virtual std::string try_receive()
Definition: IdentifiedSharedQueue.cxx:61
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::ProcessGroup::m_statuses
std::vector< ProcessStatus > m_statuses
Definition: ProcessGroup.h:48
python.utils.AtlRunQueryDQUtils.p
p
Definition: AtlRunQueryDQUtils.py:210
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaInterprocess::ProcessGroup::m_processes
std::vector< Process > m_processes
Definition: ProcessGroup.h:47
AthenaInterprocess::Process::launch
static Process launch()
Definition: Process.cxx:20
AthenaInterprocess::ProcessGroup::getGroupID
pid_t getGroupID() const
Definition: ProcessGroup.cxx:192
AthenaInterprocess::ATLAS_NOT_THREAD_SAFE
pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE()
Definition: ProcessGroup.cxx:49
lumiFormat.i
int i
Definition: lumiFormat.py:85
master.flag
bool flag
Definition: master.py:29
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
Utilities.h
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:379
beamspotman.queue
string queue
Definition: beamspotman.py:347
AthenaInterprocess::ProcessGroup::m_inbox
IdentifiedSharedQueue m_inbox
Definition: ProcessGroup.h:49
AthenaInterprocess::randString
std::string randString()
Definition: Control/AthenaInterprocess/AthenaInterprocess/Utilities.h:13
calibdata.exit
exit
Definition: calibdata.py:236
AthenaInterprocess::ProcessGroup::m_pgid
pid_t m_pgid
Definition: ProcessGroup.h:51
AthenaInterprocess::IMessageDecoder
Definition: IMessageDecoder.h:18
AthenaInterprocess::Process
Definition: Process.h:17
AthenaInterprocess::ProcessGroup::~ProcessGroup
virtual ~ProcessGroup()
Definition: ProcessGroup.cxx:45
AthenaInterprocess::IdentifiedSharedQueue
Definition: IdentifiedSharedQueue.h:14
SharedQueue.h
AthenaInterprocess::ProcessGroup::m_nprocs
int m_nprocs
Definition: ProcessGroup.h:50
merge.status
status
Definition: merge.py:17
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
boost::interprocess
Definition: AthenaSharedMemoryTool.h:24
AthenaInterprocess::ProcessGroup::wait_once
pid_t wait_once(bool &flag)
Definition: ProcessGroup.cxx:149
AthenaInterprocess::ProcessStatus
Definition: ProcessGroup.h:17
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80