ATLAS Offline Software
Loading...
Searching...
No Matches
ProcessGroup.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
3*/
4
8
9#include <boost/interprocess/ipc/message_queue.hpp>
10
11#include <sys/wait.h>
12#include <errno.h>
13#include <stdlib.h>
14#include <unistd.h>
15
16#include <sstream>
17#include <iostream>
18
19using namespace boost::interprocess;
20
21
22//- helper -------------------------------------------------------------------
23static inline AthenaInterprocess::SharedQueue create_queue( const std::string& owner, int count )
24{
25 std::ostringstream s;
26 s << "athenamp_" << owner << '_' << getpid() << '_' << count << '_' << AthenaInterprocess::randString() << std::ends;
28 return queue;
29}
30
31namespace AthenaInterprocess {
32
33//- construction/destruction -------------------------------------------------
35 : m_nprocs(nprocs)
36 , m_pgid(-1)
38{
39 if ( m_nprocs < 0 ) {
40 // determine number of cores from system (available ones only)
41 m_nprocs = sysconf( _SC_NPROCESSORS_ONLN );
42 }
43}
44
48
49pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE ()
50{
51 if(m_pgid==-1)
52 return -1;
53
54 // Create new process
55 SharedQueue queue = create_queue("child", m_processesCreated);
57 p.connectIn(queue);
58 p.connectOut(m_inbox);
59 pid_t newpid = p.getProcessID();
60 if(newpid==0) {
61 int status = p.mainloop();
62 exit(status);
63 }
64
65 if(m_processes.empty())
66 m_pgid = getpgid(newpid);
67 else
68 setpgid(newpid, m_pgid);
69
70 m_processes.push_back(p);
71 m_processesCreated++;
72 return newpid;
73}
74
75int ProcessGroup::map_async ATLAS_NOT_THREAD_SAFE (const IMessageDecoder* func, const ScheduledWork* args, pid_t pid) {
76 // If pid=0, map the function-object 'func' onto all current child processes. Does
77 // not wait for the results, but will return success only if the writing to
78 // all the child queues succeeds.
79 //
80 // If pid!=0 then map only onto the process with given pid
81 if(m_processes.empty()) {
82 if(!create())
83 return -1;
84 }
85
86 if(pid==0) { // Map on all processes in the group
87 for(std::vector<Process>::iterator iproc = m_processes.begin();
88 iproc!=m_processes.end(); ++iproc) {
89
90 if(!iproc->schedule(func, args)) {
91 // stopping the scheduling on all other processes is debatable ...
92 return -1;
93 }
94 }
95 return 0;
96 }
97 else { // Map only onto the given pid
98 for(std::vector<Process>::iterator iproc=m_processes.begin();
99 iproc!=m_processes.end(); ++iproc) {
100 if(iproc->getProcessID()==pid)
101 return (iproc->schedule(func,args) ? 0 : -1);
102 }
103 // pid not found in the group
104 return -1;
105 }
106}
107
108int ProcessGroup::wait ATLAS_NOT_THREAD_SAFE (int options)
109{
110 // Wait for all child processes and store their status codes in m_statuses
111 if(m_processes.empty())
112 return 0;
113
114 // Schedule an exit if we are to wait forever
115 if(!(options & WNOHANG) && map_async(0,0))
116 return -1;
117
118 int result = 0;
119 while(m_processes.size()) {
120 int child_status = 0;
121 pid_t child = waitpid(-m_pgid, &child_status, options);
122 if(child < 0) {
123 result = errno;
124 break;
125 }
126 else if(child==0) {
127 continue;
128 }
129
130 if(WIFSIGNALED(child_status))
131 std::cout << "SIGNAL! " << child << " (" << child_status << "," << WTERMSIG(child_status) << ")" << std::endl;
132
133 child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
134 ProcessStatus p = {child, child_status};
135 m_statuses.push_back(p);
136
137 // remove the process from m_processes
138 for(std::vector<Process>::iterator iproc=m_processes.begin();
139 iproc!=m_processes.end(); ++iproc) {
140 if(iproc->getProcessID()==child) {
141 m_processes.erase(iproc);
142 break;
143 }
144 }
145 }
146 return result;
147}
148
150{
151 flag = true;
152 if(m_processes.empty()) return 0;
153 int child_status = 0;
154 pid_t child = waitpid(-m_pgid, &child_status,WNOHANG);
155 if(child>0) {
156 // Check if the process has failed
157 flag = !(WIFSIGNALED(child_status) || WEXITSTATUS(child_status));
158 // save the status
159 child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
160 ProcessStatus p = {child, child_status};
161 m_statuses.push_back(p);
162 // remove the process from m_processes
163 for(std::vector<Process>::iterator iproc=m_processes.begin();
164 iproc!=m_processes.end(); ++iproc) {
165 if(iproc->getProcessID()==child) {
166 m_processes.erase(iproc);
167 break;
168 }
169 }
170 }
171 else if(child<0) {
172 flag = false;
173 }
174 return child;
175}
176
178{
180 pid_t pid = -1;
181 std::string buf = m_inbox.try_receive( pid );
182 if(pid>0) {
183 result = new ProcessResult;
184 result->pid = pid;
185 result->output.data = malloc(buf.size());
186 memcpy(result->output.data,buf.data(),buf.size());
187 result->output.size = buf.size();
188 }
189 return result;
190}
191
193{
194 return m_pgid;
195}
196
197const std::vector<Process>& ProcessGroup::getChildren() const
198{
199// Give access to the processes; this is temporarily needed for the python
200// interface, but it is better broken up into the actual needs).
201 return m_processes;
202}
203
204const std::vector<ProcessStatus>& ProcessGroup::getStatuses() const
205{
206 return m_statuses;
207}
208
209bool ProcessGroup::create ATLAS_NOT_THREAD_SAFE ()
210{
211// TODO: this code leaves the queues from the previous children visible to all
212// their subsequent siblings. This can be helped by creating the queue first
213// in the children, but the current code at least requires no synchronization,
214// and has a minimum chance of leaving resources on failure.
215
216 if ( m_nprocs <= 0 )
217 return false;
218
219// a single queue for posting back onto the mother
220 if ( ! m_inbox ) {
221 std::ostringstream s;
222 s << "athenamp_mother_" << getpid() << '_' << AthenaInterprocess::randString() << std::ends;
224 }
225
226// create process group leader
227 SharedQueue queue = create_queue( "child", 0 );
228 Process leader = Process::launch();
229 leader.connectIn( queue );
230 leader.connectOut( m_inbox );
231 pid_t lpid = leader.getProcessID();
232 if ( lpid == 0 ) {
233 //m_tool->setRankId(0);
234 int status = leader.mainloop();
235 exit( status );
236 } else if ( lpid == -1 )
237 return false;
238
239 setpgid( lpid, 0 );
240 m_processes.push_back( leader );
241 m_pgid = getpgid( lpid );
242
243// create rest of the group
244 for ( int i = 1; i < m_nprocs; ++i ) {
245 SharedQueue queue = create_queue( "child", i );
247 p.connectIn( queue );
248 p.connectOut( m_inbox );
249 if ( p.getProcessID() == 0 ) {
250 int status = p.mainloop();
251 exit( status );
252 }
253 setpgid( p.getProcessID(), m_pgid );
254 m_processes.push_back( p );
255 }
256 m_processesCreated = m_nprocs;
257 return true;
258}
259
260} // namespace AthenaInterprocess
int32_t pid_t
static AthenaInterprocess::SharedQueue create_queue(const std::string &owner, int count)
const std::vector< ProcessStatus > & getStatuses() const
IdentifiedSharedQueue m_inbox
const std::vector< Process > & getChildren() const
std::vector< Process > m_processes
std::vector< ProcessStatus > m_statuses
static Process launch()
Definition Process.cxx:20
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE()
status
Definition merge.py:16