ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaInterprocess::ProcessGroup Class Reference

#include <ProcessGroup.h>

Collaboration diagram for AthenaInterprocess::ProcessGroup:

Public Member Functions

 ProcessGroup (int nprocs=-1)
virtual ~ProcessGroup ()
pid_t launchProcess ATLAS_NOT_THREAD_SAFE ()
int map_async ATLAS_NOT_THREAD_SAFE (const IMessageDecoder *func, const ScheduledWork *args, pid_t pid=0)
int wait ATLAS_NOT_THREAD_SAFE (int options=0)
pid_t wait_once (bool &flag)
ProcessResultpullOneResult ()
pid_t getGroupID () const
const std::vector< Process > & getChildren () const
const std::vector< ProcessStatus > & getStatuses () const

Private Member Functions

bool create ATLAS_NOT_THREAD_SAFE ()

Private Attributes

std::vector< Processm_processes
std::vector< ProcessStatusm_statuses
IdentifiedSharedQueue m_inbox
int m_nprocs
pid_t m_pgid
int m_processesCreated

Detailed Description

Definition at line 27 of file ProcessGroup.h.

Constructor & Destructor Documentation

◆ ProcessGroup()

AthenaInterprocess::ProcessGroup::ProcessGroup ( int nprocs = -1)
explicit

Definition at line 34 of file ProcessGroup.cxx.

35 : m_nprocs(nprocs)
36 , m_pgid(-1)
38{
39 if ( m_nprocs < 0 ) {
40 // determine number of cores from system (available ones only)
41 m_nprocs = sysconf( _SC_NPROCESSORS_ONLN );
42 }
43}

◆ ~ProcessGroup()

AthenaInterprocess::ProcessGroup::~ProcessGroup ( )
virtual

Definition at line 45 of file ProcessGroup.cxx.

46{
47}

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/4]

pid_t launchProcess AthenaInterprocess::ProcessGroup::ATLAS_NOT_THREAD_SAFE ( )

◆ ATLAS_NOT_THREAD_SAFE() [2/4]

bool create AthenaInterprocess::ProcessGroup::ATLAS_NOT_THREAD_SAFE ( )
private

◆ ATLAS_NOT_THREAD_SAFE() [3/4]

int map_async AthenaInterprocess::ProcessGroup::ATLAS_NOT_THREAD_SAFE ( const IMessageDecoder * func,
const ScheduledWork * args,
pid_t pid = 0 )

◆ ATLAS_NOT_THREAD_SAFE() [4/4]

int wait AthenaInterprocess::ProcessGroup::ATLAS_NOT_THREAD_SAFE ( int options = 0)

◆ getChildren()

const std::vector< Process > & AthenaInterprocess::ProcessGroup::getChildren ( ) const

Definition at line 197 of file ProcessGroup.cxx.

198{
199// Give access to the processes; this is temporarily needed for the python
200// interface, but it is better broken up into the actual needs).
201 return m_processes;
202}
std::vector< Process > m_processes

◆ getGroupID()

pid_t AthenaInterprocess::ProcessGroup::getGroupID ( ) const

Definition at line 192 of file ProcessGroup.cxx.

193{
194 return m_pgid;
195}

◆ getStatuses()

const std::vector< ProcessStatus > & AthenaInterprocess::ProcessGroup::getStatuses ( ) const

Definition at line 204 of file ProcessGroup.cxx.

205{
206 return m_statuses;
207}
std::vector< ProcessStatus > m_statuses

◆ pullOneResult()

ProcessResult * AthenaInterprocess::ProcessGroup::pullOneResult ( )

Definition at line 177 of file ProcessGroup.cxx.

178{
179 ProcessResult* result(0);
180 pid_t pid = -1;
181 std::string buf = m_inbox.try_receive( pid );
182 if(pid>0) {
183 result = new ProcessResult;
184 result->pid = pid;
185 result->output.data = malloc(buf.size());
186 memcpy(result->output.data,buf.data(),buf.size());
187 result->output.size = buf.size();
188 }
189 return result;
190}
int32_t pid_t
IdentifiedSharedQueue m_inbox

◆ wait_once()

pid_t AthenaInterprocess::ProcessGroup::wait_once ( bool & flag)

Definition at line 149 of file ProcessGroup.cxx.

150{
151 flag = true;
152 if(m_processes.empty()) return 0;
153 int child_status = 0;
154 pid_t child = waitpid(-m_pgid, &child_status,WNOHANG);
155 if(child>0) {
156 // Check if the process has failed
157 flag = !(WIFSIGNALED(child_status) || WEXITSTATUS(child_status));
158 // save the status
159 child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
160 ProcessStatus p = {child, child_status};
161 m_statuses.push_back(p);
162 // remove the process from m_processes
163 for(std::vector<Process>::iterator iproc=m_processes.begin();
164 iproc!=m_processes.end(); ++iproc) {
165 if(iproc->getProcessID()==child) {
166 m_processes.erase(iproc);
167 break;
168 }
169 }
170 }
171 else if(child<0) {
172 flag = false;
173 }
174 return child;
175}
bool flag
Definition master.py:29

Member Data Documentation

◆ m_inbox

IdentifiedSharedQueue AthenaInterprocess::ProcessGroup::m_inbox
private

Definition at line 49 of file ProcessGroup.h.

◆ m_nprocs

int AthenaInterprocess::ProcessGroup::m_nprocs
private

Definition at line 50 of file ProcessGroup.h.

◆ m_pgid

pid_t AthenaInterprocess::ProcessGroup::m_pgid
private

Definition at line 51 of file ProcessGroup.h.

◆ m_processes

std::vector<Process> AthenaInterprocess::ProcessGroup::m_processes
private

Definition at line 47 of file ProcessGroup.h.

◆ m_processesCreated

int AthenaInterprocess::ProcessGroup::m_processesCreated
private

Definition at line 52 of file ProcessGroup.h.

◆ m_statuses

std::vector<ProcessStatus> AthenaInterprocess::ProcessGroup::m_statuses
private

Definition at line 48 of file ProcessGroup.h.


The documentation for this class was generated from the following files: