ATLAS Offline Software
Process.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 #include <boost/interprocess/ipc/message_queue.hpp>
8 
9 #include <sys/prctl.h>
10 #include <dlfcn.h>
11 #include <signal.h>
12 
13 #include <iostream>
14 
15 using namespace boost::interprocess;
16 
17 
18 namespace AthenaInterprocess {
19 
21 {
22  std::cout.flush();
23  std::cerr.flush();
24 
25  pid_t pid = fork();
26  if ( pid == 0 )
27  prctl( PR_SET_PDEATHSIG, SIGHUP ); // Linux only
28 
29  return Process(pid);
30 }
31 
32 
33 //- construction/destruction -------------------------------------------------
34 Process::Process(pid_t pid ) : m_inbox(), m_outbox(), m_pid( pid )
35 {
36 
37 }
38 
40  m_inbox( other.m_inbox ), m_outbox( other.m_outbox ), m_pid( other.m_pid )
41 {
42 
43 }
44 
46 {
47  if ( this != &other ) {
48  m_inbox = other.m_inbox;
49  m_outbox = other.m_outbox;
50  m_pid = other.m_pid;
51  }
52  return *this;
53 }
54 
56 {
57 
58 }
59 
60 
61 //- public member functions --------------------------------------------------
63 {
64  return m_pid;
65 }
66 
68 {
69  if ( ! queue.name().empty() ) {
70  m_inbox = queue;
71  /* TODO: check that the queue is valid and can receive messages */
72  return true;
73  }
74 
75  return false;
76 }
77 
79 {
80  if ( ! queue.name().empty() ) {
81  m_outbox = queue;
82  /* TODO: check that the queue is valid and can send messages */
83  return true;
84  }
85 
86  return false;
87 }
88 
90 {
91  //Send func
92  std::string strptr("");
93  if(func) {
94  size_t ptrsize = sizeof(func);
95  char* charptr = new char[ptrsize];
96  memcpy(charptr,&func,ptrsize);
97  strptr=std::string(charptr,ptrsize);
98  delete[] charptr;
99  }
100  bool send_ok = m_inbox.try_send(strptr);
101 
102  //Send params
103  if(send_ok) {
104  std::string strparam = (args?std::string((char*)args->data,args->size):std::string(""));
105  send_ok = m_inbox.try_send(strparam);
106  }
107 
108  return send_ok;
109 }
110 
111 int Process::mainloop() {
112  // TODO: the exit codes used here continue where the "miscellaneous" exit codes
113  // in AthenaCommon/ExitCodes.py end. There's nothing to enforce that, though,
114  // and it's not documented either.
115  if(m_pid!=0)
116  return 1;
117 
118  int exit_code = 0;
119 
120  while(true) {
121  std::string message = m_inbox.receive();
122 
123  // One way to stop the loop: From "outside" (i.e. originated by ProcessGroup)
124  // Just send an empty string as first message in (ptr,params) message pair
125  if(message.empty()) break;
126 
127  // Decode first message into pointer
129  memcpy(&decoder,message.data(),message.size());
130 
131  message = m_inbox.receive();
132  // Decode second message into params
134  params.data = (void*)message.data();
135  params.size = message.size();
136 
137  if(decoder) {
138  std::unique_ptr<ScheduledWork> outwork = (*decoder)(params);
139 
140  if(outwork) {
141  bool posted = m_outbox.try_send(std::string((char*)outwork->data,outwork->size));
142 
143  // Convention: first int in the outwork->data buffer is an error flag: 0 - success, 1 - failure
144  int errflag = *((int*)outwork->data);
145  free(outwork->data);
146 
147  if(errflag) {
148  exit_code = 1; // TODO: what should be the exit code here ????
149  break;
150  }
151 
152  if(!posted) {
153  exit_code = 0x0A;
154  break;
155  }
156  }
157  else {
158  // Another way to stop the loop. From "inside" (i.e. originated by Message Receiver)
159  // Return 0 pointer to ScheduledWork
160  break;
161  }
162  }//if(decoder)
163  }//loop
164  return exit_code;
165 }
166 
167 } // namespace AthenaInterprocess
AthenaInterprocess::Process::connectOut
bool connectOut(const IdentifiedSharedQueue &queue)
Definition: Process.cxx:78
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::IdentifiedSharedQueue::try_send
virtual bool try_send(const std::string &)
Definition: IdentifiedSharedQueue.cxx:38
AthenaInterprocess
Definition: FdsRegistry.h:11
python.LArCondContChannels.decoder
decoder
def channelSelection(self, channelList, groupType): if groupType == self.SingleGroup: pass elif group...
Definition: LArCondContChannels.py:618
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
Process.h
ReweightUtils.message
message
Definition: ReweightUtils.py:15
AthenaInterprocess::Process::~Process
virtual ~Process()
Definition: Process.cxx:55
AthenaInterprocess::Process::schedule
bool schedule(const IMessageDecoder *func, const ScheduledWork *args)
Definition: Process.cxx:89
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaInterprocess::Process::getProcessID
pid_t getProcessID() const
Definition: Process.cxx:62
AthenaInterprocess::Process::m_outbox
IdentifiedSharedQueue m_outbox
Definition: Process.h:37
AthenaInterprocess::Process::Process
Process(pid_t pid)
Definition: Process.cxx:34
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaInterprocess::Process::m_pid
pid_t m_pid
Definition: Process.h:38
beamspotman.queue
string queue
Definition: beamspotman.py:347
AthenaInterprocess::Process::m_inbox
SharedQueue m_inbox
Definition: Process.h:36
AthenaInterprocess::IMessageDecoder
Definition: IMessageDecoder.h:18
AthenaInterprocess::Process::operator=
Process & operator=(const Process &other)
Definition: Process.cxx:45
AthenaInterprocess::Process
Definition: Process.h:17
AthenaInterprocess::Process::connectIn
bool connectIn(const SharedQueue &queue)
Definition: Process.cxx:67
InDetDD::other
@ other
Definition: InDetDD_Defs.h:16
AthenaInterprocess::IdentifiedSharedQueue
Definition: IdentifiedSharedQueue.h:14
PowhegControl_ttFCNC_NLO.params
params
Definition: PowhegControl_ttFCNC_NLO.py:226
AthenaInterprocess::SharedQueue::try_send
virtual bool try_send(const std::string &)
Definition: SharedQueue.cxx:102
boost::interprocess
Definition: AthenaSharedMemoryTool.h:24
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
AthenaInterprocess::SharedQueue::receive
virtual std::string receive()
Definition: SharedQueue.cxx:145
python.AnalysisApp.launch
def launch(flags, ca)
Definition: AnalysisApp.py:115