ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaInterprocess Namespace Reference

Classes

struct  FdsRegistryEntry
class  IdentifiedSharedQueue
class  IMessageDecoder
class  IMPRunStop
class  Process
class  ProcessGroup
struct  ProcessResult
struct  ProcessStatus
struct  ScheduledWork
class  SharedQueue
class  UpdateAfterFork

Typedefs

typedef std::vector< FdsRegistryEntryFdsRegistry

Functions

std::string randString ()
std::string add_pid (const std::string &buf)
std::string get_pid (const std::string &buf, pid_t &pid)
pid_t ProcessGroup::launchProcess ATLAS_NOT_THREAD_SAFE ()
int ProcessGroup::map_async ATLAS_NOT_THREAD_SAFE (const IMessageDecoder *func, const ScheduledWork *args, pid_t pid)
int ProcessGroup::wait ATLAS_NOT_THREAD_SAFE (int options)
static bool do_send (message_queue *mq, const std::string &buf, bool block)
static std::string do_receive (message_queue *mq, bool block)

Variables

static const int SHAREDQUEUE_MAX_MSG = 1000
static const std::size_t MAX_MSG_SIZE = 256

Typedef Documentation

◆ FdsRegistry

Definition at line 22 of file FdsRegistry.h.

Function Documentation

◆ add_pid()

std::string AthenaInterprocess::add_pid ( const std::string & buf)
inline

Definition at line 31 of file IdentifiedSharedQueue.cxx.

32{
33 std::ostringstream s;
34 s << (long)getpid() << '\0' << buf;
35 return s.str();
36}

◆ ATLAS_NOT_THREAD_SAFE() [1/3]

bool ProcessGroup::create AthenaInterprocess::ATLAS_NOT_THREAD_SAFE ( void )

Definition at line 49 of file ProcessGroup.cxx.

50{
51 if(m_pgid==-1)
52 return -1;
53
54 // Create new process
55 SharedQueue queue = create_queue("child", m_processesCreated);
57 p.connectIn(queue);
58 p.connectOut(m_inbox);
59 pid_t newpid = p.getProcessID();
60 if(newpid==0) {
61 int status = p.mainloop();
62 exit(status);
63 }
64
65 if(m_processes.empty())
66 m_pgid = getpgid(newpid);
67 else
68 setpgid(newpid, m_pgid);
69
70 m_processes.push_back(p);
71 m_processesCreated++;
72 return newpid;
73}
int32_t pid_t
static AthenaInterprocess::SharedQueue create_queue(const std::string &owner, int count)
static Process launch()
Definition Process.cxx:20

◆ ATLAS_NOT_THREAD_SAFE() [2/3]

int ProcessGroup::map_async AthenaInterprocess::ATLAS_NOT_THREAD_SAFE ( const IMessageDecoder * func,
const ScheduledWork * args,
pid_t pid )

Definition at line 75 of file ProcessGroup.cxx.

75 {
76 // If pid=0, map the function-object 'func' onto all current child processes. Does
77 // not wait for the results, but will return success only if the writing to
78 // all the child queues succeeds.
79 //
80 // If pid!=0 then map only onto the process with given pid
81 if(m_processes.empty()) {
82 if(!create())
83 return -1;
84 }
85
86 if(pid==0) { // Map on all processes in the group
87 for(std::vector<Process>::iterator iproc = m_processes.begin();
88 iproc!=m_processes.end(); ++iproc) {
89
90 if(!iproc->schedule(func, args)) {
91 // stopping the scheduling on all other processes is debatable ...
92 return -1;
93 }
94 }
95 return 0;
96 }
97 else { // Map only onto the given pid
98 for(std::vector<Process>::iterator iproc=m_processes.begin();
99 iproc!=m_processes.end(); ++iproc) {
100 if(iproc->getProcessID()==pid)
101 return (iproc->schedule(func,args) ? 0 : -1);
102 }
103 // pid not found in the group
104 return -1;
105 }
106}

◆ ATLAS_NOT_THREAD_SAFE() [3/3]

int ProcessGroup::wait AthenaInterprocess::ATLAS_NOT_THREAD_SAFE ( int options)

Definition at line 108 of file ProcessGroup.cxx.

109{
110 // Wait for all child processes and store their status codes in m_statuses
111 if(m_processes.empty())
112 return 0;
113
114 // Schedule an exit if we are to wait forever
115 if(!(options & WNOHANG) && map_async(0,0))
116 return -1;
117
118 int result = 0;
119 while(m_processes.size()) {
120 int child_status = 0;
121 pid_t child = waitpid(-m_pgid, &child_status, options);
122 if(child < 0) {
123 result = errno;
124 break;
125 }
126 else if(child==0) {
127 continue;
128 }
129
130 if(WIFSIGNALED(child_status))
131 std::cout << "SIGNAL! " << child << " (" << child_status << "," << WTERMSIG(child_status) << ")" << std::endl;
132
133 child_status = WIFSIGNALED(child_status) ? WTERMSIG(child_status) : WEXITSTATUS(child_status);
134 ProcessStatus p = {child, child_status};
135 m_statuses.push_back(p);
136
137 // remove the process from m_processes
138 for(std::vector<Process>::iterator iproc=m_processes.begin();
139 iproc!=m_processes.end(); ++iproc) {
140 if(iproc->getProcessID()==child) {
141 m_processes.erase(iproc);
142 break;
143 }
144 }
145 }
146 return result;
147}

◆ do_receive()

std::string AthenaInterprocess::do_receive ( message_queue * mq,
bool block )
inlinestatic

Definition at line 112 of file SharedQueue.cxx.

113{
114 char buf[ MAX_MSG_SIZE ];
115
116 try {
117 std::size_t recvd_size = 0;
118 unsigned int priority = 0;
119
120 bool result = true;
121 if ( block )
122 mq->receive( buf, MAX_MSG_SIZE, recvd_size, priority );
123 else
124 result = mq->try_receive( buf, MAX_MSG_SIZE, recvd_size, priority );
125
126 if ( result ) {
127 if ( recvd_size > MAX_MSG_SIZE )
128 recvd_size = MAX_MSG_SIZE; // this is debatable, but send
129 // should have failed already, so
130 // "can not happen" applies
131 return std::string( buf, recvd_size );
132 }
133 } catch ( interprocess_exception& e ) {
134 /* silent for now: should return a callable error function */
135 }
136
137 return "";
138}
static const std::size_t MAX_MSG_SIZE
Definition SharedQueue.h:19

◆ do_send()

bool AthenaInterprocess::do_send ( message_queue * mq,
const std::string & buf,
bool block )
inlinestatic

Definition at line 86 of file SharedQueue.cxx.

87{
88 bool send_ok = true;
89
90 try {
91 if ( block )
92 mq->send( buf.data(), buf.size(), 0 );
93 else
94 send_ok = mq->try_send( buf.data(), buf.size(), 0 );
95 } catch ( interprocess_exception& e ) {
96 send_ok = false;
97 }
98
99 return send_ok;
100}

◆ get_pid()

std::string AthenaInterprocess::get_pid ( const std::string & buf,
pid_t & pid )
inline

Definition at line 48 of file IdentifiedSharedQueue.cxx.

49{
50 std::istringstream s( buf );
51 long id = -1;
52 s >> id;
53 pid = (pid_t)id;
54 std::string::size_type pos = s.tellg();
55 if ( (pos+1) <= buf.size() )
56 return buf.substr( pos+1 );
57 return "";
58}

◆ randString()

std::string AthenaInterprocess::randString ( )
inline

Definition at line 13 of file Control/AthenaInterprocess/AthenaInterprocess/Utilities.h.

14 {
15 uuid_t uuid;
16 char out[SSIZE];
17 uuid_generate(uuid);
18 uuid_unparse(uuid, out);
19 return std::string(out);
20 }

Variable Documentation

◆ MAX_MSG_SIZE

const std::size_t AthenaInterprocess::MAX_MSG_SIZE = 256
static

Definition at line 19 of file SharedQueue.h.

◆ SHAREDQUEUE_MAX_MSG

const int AthenaInterprocess::SHAREDQUEUE_MAX_MSG = 1000
static

Definition at line 18 of file SharedQueue.h.