9#include <boost/interprocess/ipc/message_queue.hpp>
41 m_nprocs = sysconf( _SC_NPROCESSORS_ONLN );
58 p.connectOut(m_inbox);
59 pid_t newpid = p.getProcessID();
61 int status = p.mainloop();
65 if(m_processes.empty())
66 m_pgid = getpgid(newpid);
68 setpgid(newpid, m_pgid);
70 m_processes.push_back(p);
81 if(m_processes.empty()) {
87 for(std::vector<Process>::iterator iproc = m_processes.begin();
88 iproc!=m_processes.end(); ++iproc) {
90 if(!iproc->schedule(func, args)) {
98 for(std::vector<Process>::iterator iproc=m_processes.begin();
99 iproc!=m_processes.end(); ++iproc) {
100 if(iproc->getProcessID()==pid)
101 return (iproc->schedule(func,args) ? 0 : -1);
111 if(m_processes.empty())
115 if(!(options & WNOHANG) && map_async(0,0))
119 while(m_processes.size()) {
120 int child_status = 0;
121 pid_t child = waitpid(-m_pgid, &child_status, options);
130 if(WIFSIGNALED(child_status))
131 std::cout <<
"SIGNAL! " << child <<
" (" << child_status <<
"," << WTERMSIG(child_status) <<
")" << std::endl;
133 child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
135 m_statuses.push_back(p);
138 for(std::vector<Process>::iterator iproc=m_processes.begin();
139 iproc!=m_processes.end(); ++iproc) {
140 if(iproc->getProcessID()==child) {
141 m_processes.erase(iproc);
153 int child_status = 0;
154 pid_t child = waitpid(-
m_pgid, &child_status,WNOHANG);
157 flag = !(WIFSIGNALED(child_status) || WEXITSTATUS(child_status));
159 child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
163 for(std::vector<Process>::iterator iproc=
m_processes.begin();
165 if(iproc->getProcessID()==child) {
181 std::string buf =
m_inbox.try_receive( pid );
185 result->output.data = malloc(buf.size());
186 memcpy(
result->output.data,buf.data(),buf.size());
187 result->output.size = buf.size();
221 std::ostringstream s;
229 leader.connectIn( queue );
230 leader.connectOut( m_inbox );
231 pid_t lpid = leader.getProcessID();
234 int status = leader.mainloop();
236 }
else if ( lpid == -1 )
240 m_processes.push_back( leader );
241 m_pgid = getpgid( lpid );
244 for (
int i = 1;
i < m_nprocs; ++
i ) {
247 p.connectIn( queue );
248 p.connectOut( m_inbox );
249 if (
p.getProcessID() == 0 ) {
253 setpgid(
p.getProcessID(), m_pgid );
254 m_processes.push_back( p );
256 m_processesCreated = m_nprocs;
static AthenaInterprocess::SharedQueue create_queue(const std::string &owner, int count)
const std::vector< ProcessStatus > & getStatuses() const
ProcessGroup(int nprocs=-1)
pid_t wait_once(bool &flag)
ProcessResult * pullOneResult()
IdentifiedSharedQueue m_inbox
const std::vector< Process > & getChildren() const
std::vector< Process > m_processes
std::vector< ProcessStatus > m_statuses
int count(std::string s, const std::string ®x)
count how many occurances of a regx are in a string
pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE()