ATLAS Offline Software
Loading...
Searching...
No Matches
Process.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <boost/interprocess/ipc/message_queue.hpp>
8
9#include <sys/prctl.h>
10#include <dlfcn.h>
11#include <signal.h>
12
13#include <iostream>
14
15using namespace boost::interprocess;
16
17
18namespace AthenaInterprocess {
19
21{
22 std::cout.flush();
23 std::cerr.flush();
24
25 pid_t pid = fork();
26 if ( pid == 0 )
27 prctl( PR_SET_PDEATHSIG, SIGHUP ); // Linux only
28
29 return Process(pid);
30}
31
32
33//- construction/destruction -------------------------------------------------
35{
36
37}
38
39Process::Process( const Process& other ) :
40 m_inbox( other.m_inbox ), m_outbox( other.m_outbox ), m_pid( other.m_pid )
41{
42
43}
44
46{
47 if ( this != &other ) {
48 m_inbox = other.m_inbox;
49 m_outbox = other.m_outbox;
50 m_pid = other.m_pid;
51 }
52 return *this;
53}
54
56{
57
58}
59
60
61//- public member functions --------------------------------------------------
63{
64 return m_pid;
65}
66
67bool Process::connectIn( const SharedQueue& queue )
68{
69 if ( ! queue.name().empty() ) {
70 m_inbox = queue;
71 /* TODO: check that the queue is valid and can receive messages */
72 return true;
73 }
74
75 return false;
76}
77
79{
80 if ( ! queue.name().empty() ) {
81 m_outbox = queue;
82 /* TODO: check that the queue is valid and can send messages */
83 return true;
84 }
85
86 return false;
87}
88
89bool Process::schedule(const IMessageDecoder* func, const ScheduledWork* args)
90{
91 //Send func
92 std::string strptr("");
93 if(func) {
94 size_t ptrsize = sizeof(func);
95 char* charptr = new char[ptrsize];
96 memcpy(charptr,&func,ptrsize);
97 strptr=std::string(charptr,ptrsize);
98 delete[] charptr;
99 }
100 bool send_ok = m_inbox.try_send(strptr);
101
102 //Send params
103 if(send_ok) {
104 std::string strparam = (args?std::string((char*)args->data,args->size):std::string(""));
105 send_ok = m_inbox.try_send(strparam);
106 }
107
108 return send_ok;
109}
110
111int Process::mainloop() {
112 // TODO: the exit codes used here continue where the "miscellaneous" exit codes
113 // in AthenaCommon/ExitCodes.py end. There's nothing to enforce that, though,
114 // and it's not documented either.
115 if(m_pid!=0)
116 return 1;
117
118 int exit_code = 0;
119
120 while(true) {
121 std::string message = m_inbox.receive();
122
123 // One way to stop the loop: From "outside" (i.e. originated by ProcessGroup)
124 // Just send an empty string as first message in (ptr,params) message pair
125 if(message.empty()) break;
126
127 // Decode first message into pointer
128 IMessageDecoder* decoder(0);
129 memcpy(&decoder,message.data(),message.size());
130
131 message = m_inbox.receive();
132 // Decode second message into params
133 ScheduledWork params;
134 params.data = (void*)message.data();
135 params.size = message.size();
136
137 if(decoder) {
138 std::unique_ptr<ScheduledWork> outwork = (*decoder)(params);
139
140 if(outwork) {
141 bool posted = m_outbox.try_send(std::string((char*)outwork->data,outwork->size));
142
143 // Convention: first int in the outwork->data buffer is an error flag: 0 - success, 1 - failure
144 int errflag = *((int*)outwork->data);
145 free(outwork->data);
146
147 if(errflag) {
148 exit_code = 1; // TODO: what should be the exit code here ????
149 break;
150 }
151
152 if(!posted) {
153 exit_code = 0x0A;
154 break;
155 }
156 }
157 else {
158 // Another way to stop the loop. From "inside" (i.e. originated by Message Receiver)
159 // Return 0 pointer to ScheduledWork
160 break;
161 }
162 }//if(decoder)
163 }//loop
164 return exit_code;
165}
166
167} // namespace AthenaInterprocess
int32_t pid_t
virtual bool try_send(const std::string &)
IdentifiedSharedQueue m_outbox
Definition Process.h:37
bool connectOut(const IdentifiedSharedQueue &queue)
Definition Process.cxx:78
static Process launch()
Definition Process.cxx:20
bool connectIn(const SharedQueue &queue)
Definition Process.cxx:67
bool schedule(const IMessageDecoder *func, const ScheduledWork *args)
Definition Process.cxx:89
Process & operator=(const Process &other)
Definition Process.cxx:45
pid_t getProcessID() const
Definition Process.cxx:62
virtual std::string receive()