ATLAS Offline Software
JiveXMLServer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 //tdaq includes
8 #include <ers/ers.h>
9 
10 //JiveXML includes
13 #include <JiveXML/ONCRPCXDRProcs.h>
14 #include <JiveXML/ONCRPCServer.h>
15 
16 #include <signal.h>
17 
18 //Define warning and error
19 #define ERS_WARNING( message ) \
20 { \
21  ERS_REPORT_IMPL( ers::warning, ers::Message, message, ); \
22 }
23 
24 #define ERS_ERROR( message ) \
25 { \
26  ERS_REPORT_IMPL( ers::error, ers::Message, message, ); \
27 }
28 
29 namespace {
30  std::string fmterror(int code) {
31  char buf[256];
32  return std::string(strerror_r(code, buf, sizeof(buf)));
33  }
34 }
35 
36 namespace JiveXML {
37 
42  portNumber(port){
43 
44  //Make sure ServerThread does not start unexpectedly
45  m_runServerThread = false ;
46 
47  //And then start it
48  StartServingThread().ignore();
49 
50  //Also register the signal handlers
53  }
54 
59 
60  //Just stop the serving thread
61  StopServingThread().ignore();
62  }
63 
71 
72  ERS_DEBUG(MSG::VERBOSE,"StartServingThread()");
73 
74  //Initialize access lock mechanism to ensure that the data map is never
75  //accessed by more than one thread at the same time. NULL means default
76  int retVal = pthread_mutex_init(&m_accessLock,NULL);
77  if (retVal != 0){
78  ERS_WARNING("Unable to initialize access lock while starting server: " << fmterror(retVal));
79  return StatusCode::FAILURE;
80  }
81 
82  //The arguments passed on to the server - create new object on the heap that
83  //is persistent through the lifetime of the thread
85 
86  //set runServer flag to true, so the thread will start
87  m_runServerThread = true ;
88 
89  //create thread itself
90  if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
91 
92  //Create thread failed
93  ERS_WARNING("Thread creation failed");
94  return StatusCode::FAILURE;
95  }
96 
97  return StatusCode::SUCCESS;
98 
99  }
100 
108 
109  ERS_DEBUG(MSG::VERBOSE,"StopServingThread()");
110 
116  //Create a client - this will already cause an update of the file
117  //descriptors on the sockets
118  CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
119  if (!client){
120  ERS_ERROR("Unable to create shutdown client for local server");
121  return StatusCode::FAILURE;
122  }
123 
124  // Next unset the runServerThread flag, which will cause the loop to stop
125  // This needs to happen after client creation, otherwise the next call won't
126  // be serverd anymore
127  m_runServerThread = false ;
128 
129  // Now issue the call with a timeout
130  struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
131 // xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
132 #if __GNUC__ >= 8
133 # pragma GCC diagnostic push
134 # pragma GCC diagnostic ignored "-Wcast-function-type"
135 #endif
136  clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
137 #if __GNUC__ >= 8
138 # pragma GCC diagnostic pop
139 #endif
140 
141  // A pointer to the return value of the thread
142  void* ret = NULL;
143  // wait till the server thread has finished
144  ERS_INFO("Waiting for server thread to terminate ...");
145  pthread_join(m_ServerThreadHandle, &ret);
146  ERS_INFO(" ... finished server thread");
147 
148  //check if there was a return value
149  if (ret){
150  //Get the return value
151  unsigned long NRequests = *(unsigned long*)ret;
152  ERS_DEBUG(MSG::DEBUG,"Server thread stopped after handling " << NRequests << " requests");
153  } else
154  ERS_WARNING("Server thread stopped unexpectedly");
155 
156  //Destroy the access lock
157  int retVal = pthread_mutex_destroy(&m_accessLock);
158  if (retVal != 0){
159  ERS_WARNING("Unable to destroy access lock after stopping server: " << fmterror(retVal));
160  return StatusCode::FAILURE;
161  }
162 
163  return StatusCode::SUCCESS;
164  }
165 
170  //Store signal
172  //finish semaphore lock
173  lock.post();
174  }
175 
182  //just wait for the lock
183  lock.wait();
184  //Tell why the lock was released
185  ERS_INFO("Reached post-condition after received signal " << m_receivedSignal );
186  }
187 
193  //call the signal handler, so we will also reach post condition
194  signalHandler(-1);
195  }
196 
201  StatusCode JiveXMLServer::UpdateEventForStream( const EventStreamID& evtStreamID, const std::string & event) {
202 
203  ERS_DEBUG(MSG::VERBOSE,"UpdateEventForStream");
204 
205  //Check that the event stream id is valid
206  if (!evtStreamID.isValid()){
207  ERS_ERROR("Invalid event stream identifier - cannot add event");
208  return StatusCode::FAILURE;
209  }
210 
211  //Make sure we don't have already exceeded the maximum number of streams
212  if (m_eventStreamMap.size() > NSTREAMMAX ){
213  ERS_ERROR("Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event");
214  return StatusCode::FAILURE;
215  }
216 
217  //Make sure the event is not larger than the allowed maximal size
218  if (event.length() > NBYTESMAX ){
219  ERS_ERROR("Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event");
220  return StatusCode::FAILURE;
221  }
222 
223  //Make sure we are the only one accessing the data right now, by trying to
224  //obtain a lock. If the lock can not be obtained after a certain time, an
225  //error is reported
226 
227  //Timeout of 30 second and 0 nanoseconds
228  struct timespec timeout = { 30, 0 };
229  //Try to obtain the lock
230  int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
231  if ( retVal != 0 ){
232  ERS_ERROR("Unable to obtain access lock to update event: " << fmterror(retVal));
233  return StatusCode::FAILURE;
234  }
235 
236  //Using try/catch to ensure the mutex gets unlocked in any case
237  try {
238 
239  //Using std::map::operator[] and std::map::insert() will create a new event
240  //if it did not exist, otherwise just replace the existing entry (making a
241  //copy of the std::string) but would not update the key which holds new
242  //event/run number. Therefore delete existing entry first.
243 
244  //Delete old entry if there is one
245  EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
246  if (OldEvtItr != m_eventStreamMap.end())
247  m_eventStreamMap.erase(OldEvtItr);
248 
249  //Now add the new event
250  m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
251 
252  } catch ( const std::exception& e ) {
253  ERS_ERROR("Exception caught while updating event for stream "
254  << evtStreamID.StreamName() << ": " << e.what());
255  //Also release the lock in this case
256  pthread_mutex_unlock(&m_accessLock);
257  //before we return
258  return StatusCode::FAILURE;
259  }
260 
261  //Finally release the lock again
262  retVal = pthread_mutex_unlock(&m_accessLock);
263  if ( retVal != 0 ){
264  ERS_ERROR("Unable to release access lock after updating event: " << fmterror(retVal));
265  return StatusCode::FAILURE;
266  }
267 
268  ERS_DEBUG(MSG::DEBUG, "Updated stream " << evtStreamID.StreamName()
269  << " with event Nr. " << evtStreamID.EventNumber()
270  << " from run Nr. " << evtStreamID.RunNumber());
271 
272  return StatusCode::SUCCESS;
273  }
274 
275 
284  return 3;
285  }
286 
290  std::vector<std::string> JiveXMLServer::GetStreamNames() const {
291 
292  //Create a vector that can be returned
293  std::vector<std::string> StreamNames;
294 
295  //Obtain an exclusive access lock
296  int retVal = pthread_mutex_lock(&m_accessLock);
297  if ( retVal != 0 ){
298  ERS_ERROR("Unable to obtain access lock to get stream names: " << fmterror(retVal));
299  return StreamNames;
300  }
301 
302  // Iterate over map to get entries
303  EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
304  for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
305 
306  //Get the EventStreamID object
307  EventStreamID EvtStrID = (*MapItr).first;
308 
309  //Add the name of this EventStreamID to the list of stream names
310  StreamNames.push_back(EvtStrID.StreamName());
311  }
312 
313  //Release the lock
314  retVal = pthread_mutex_unlock(&m_accessLock);
315  if ( retVal != 0 )
316  ERS_ERROR("Unable to release access lock after getting stream names: " << fmterror(retVal));
317 
318  //Return the list of names
319  return StreamNames;
320  }
321 
325  const EventStreamID JiveXMLServer::GetEventStreamID( const std::string& StreamName) const {
326 
327  //Obtain an exclusive access lock
328  int retVal = pthread_mutex_lock(&m_accessLock);
329  if ( retVal != 0 ){
330  ERS_ERROR("Unable to obtain access lock to get stream ID: " << fmterror(retVal));
331  return EventStreamID("");
332  }
333 
334  // Search the entry in the map
335  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
336 
337  //Initialize with an invalid event stream identifier
338  EventStreamID streamID = EventStreamID("");
339 
340  //If the element is found, get a copy of the found event stream identifier
341  if ( MapItr != m_eventStreamMap.end()){
342  streamID = EventStreamID((*MapItr).first);
343  }
344 
345  //Release the lock
346  retVal = pthread_mutex_unlock(&m_accessLock);
347  if ( retVal != 0 )
348  ERS_ERROR("Unable to release access lock after getting stream ID: " << fmterror(retVal));
349 
350  return streamID;
351 
352  }
353 
357  const std::string JiveXMLServer::GetEvent( const EventStreamID& evtStreamID ) const {
358 
359  //Obtain an exclusive access lock
360  int retVal = pthread_mutex_lock(&m_accessLock);
361  if ( retVal != 0 ){
362  ERS_ERROR("Unable to obtain access lock to get event: " << fmterror(retVal));
363  return std::string("");
364  }
365 
366 
367  // Search the entry in the map
368  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
369 
370  //Initialize with an empty event stream
371  std::string event;
372 
373  //If the element is found, get a copy of the found event string
374  if ( MapItr != m_eventStreamMap.end()){
375  event = std::string((*MapItr).second);
376  }
377 
378  //Release the lock
379  retVal = pthread_mutex_unlock(&m_accessLock);
380  if ( retVal != 0 )
381  ERS_ERROR("Unable to release access lock after getting stream event: " << fmterror(retVal));
382 
383  return event;
384  }
385 
389  void JiveXMLServer::Message( const MSG::Level level, const std::string& msg) const {
390  //Deliver message to the proper stream
391  if (level <= MSG::DEBUG) ERS_REPORT_IMPL( ers::debug, ers::Message, msg, level);
392  if (level == MSG::INFO) ERS_REPORT_IMPL( ers::info, ers::Message, msg, );
393  if (level == MSG::WARNING) ERS_REPORT_IMPL( ers::warning, ers::Message, msg, );
394  if (level == MSG::ERROR) ERS_REPORT_IMPL( ers::error, ers::Message, msg, );
395  if (level >= MSG::FATAL) ERS_REPORT_IMPL( ers::fatal, ers::Message, msg, );
396  }
397 
402  //set to fixed value for now
403  return MSG::DEBUG;
404  }
405 }
grepfile.info
info
Definition: grepfile.py:38
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
JiveXML::JiveXMLServer::signalHandler
static void signalHandler(int signum)
When the signal handler is called, switch the lock to the post condition.
Definition: JiveXMLServer.cxx:169
JiveXML::JiveXMLServer::m_eventStreamMap
EventStreamMap m_eventStreamMap
Definition: JiveXMLServer.h:107
python.Constants.FATAL
int FATAL
Definition: Control/AthenaCommon/python/Constants.py:19
JiveXML::JiveXMLServer::GetEvent
virtual const std::string GetEvent(const EventStreamID &evtStreamID) const override
get the current event for a particular stream
Definition: JiveXMLServer.cxx:357
JiveXML::JiveXMLServer::portNumber
int portNumber
Definition: JiveXMLServer.h:104
JiveXMLServer.h
JiveXML::EventStreamID::EventNumber
unsigned long EventNumber() const
Definition: EventStream.h:41
ONCRPCThreadCollection.h
JiveXML::JiveXMLServer::GetState
virtual int GetState() const override
get the Status of the application
Definition: JiveXMLServer.cxx:279
JiveXML::JiveXMLServer::StopServingThread
StatusCode StopServingThread()
Stop the serving thread.
Definition: JiveXMLServer.cxx:107
JiveXML::ServerThreadArguments
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
Definition: ONCRPCServerThreads.h:22
JiveXML::JiveXMLServer::GetStreamNames
virtual std::vector< std::string > GetStreamNames() const override
get the names of all the streams
Definition: JiveXMLServer.cxx:290
JiveXML::JiveXMLServer::Message
virtual void Message(const MSG::Level level, const std::string &msg) const override
This function is exposed to allow using ERS messaging service from other threads.
Definition: JiveXMLServer.cxx:389
ONCRPCServer.h
python.iconfTool.models.loaders.level
level
Definition: loaders.py:20
rerun_display.client
client
Definition: rerun_display.py:31
StateLessPT_NewConfig.StreamNames
StreamNames
Definition: StateLessPT_NewConfig.py:315
JiveXML::ONCRPCServerThread
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
Definition: ONCRPCServerThreads.cxx:252
JiveXML::JiveXMLServer::m_runServerThread
bool m_runServerThread
Definition: JiveXMLServer.h:117
JiveXML::JiveXMLServer::JiveXMLServer
JiveXMLServer(int port=0)
Constructor.
Definition: JiveXMLServer.cxx:41
JiveXML::ServerThreadArguments_t
Definition: ONCRPCServerThreads.h:23
TrigConf::MSGTC::Level
Level
Definition: Trigger/TrigConfiguration/TrigConfBase/TrigConfBase/MsgStream.h:21
EventInfoWrite.StreamName
string StreamName
Definition: EventInfoWrite.py:28
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
ret
T ret(T t)
Definition: rootspy.cxx:260
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
calibdata.exception
exception
Definition: calibdata.py:496
JiveXML::EventStreamID::isValid
bool isValid() const
Definition: EventStream.h:55
JiveXML::JiveXMLServer::LogLevel
virtual MSG::Level LogLevel() const override
Get the logging level.
Definition: JiveXMLServer.cxx:401
JiveXML::EventStreamPair
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition: EventStream.h:86
ONCRPCXDRProcs.h
JiveXML::NBYTESMAX
const unsigned int NBYTESMAX
Definition: ONCRPCServer.h:40
ERS_WARNING
#define ERS_WARNING(message)
Definition: JiveXMLServer.cxx:19
ONCRPCServerThreads.h
JiveXML::EventStreamID
For the client-server communication, each event is uniquely identified by the run number,...
Definition: EventStream.h:19
ONCRPCSERVERPROG
#define ONCRPCSERVERPROG
Definition: ONCRPCServer.h:30
JiveXML
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
Definition: BadLArRetriever.cxx:21
JiveXML::EventStreamID::StreamName
std::string StreamName() const
Definition: EventStream.h:43
JiveXML::JiveXMLServer::GetEventStreamID
virtual const EventStreamID GetEventStreamID(const std::string &streamName) const override
get the current EventStreamID for a particular stream
Definition: JiveXMLServer.cxx:325
pmontree.code
code
Definition: pmontree.py:443
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:64
debug
const bool debug
Definition: MakeUncertaintyPlots.cxx:53
ERS_ERROR
#define ERS_ERROR(message)
Definition: JiveXMLServer.cxx:24
JiveXML::JiveXMLServer::m_receivedSignal
static std::atomic< int > m_receivedSignal
Definition: JiveXMLServer.h:100
ONCRPCSERVERVERS
#define ONCRPCSERVERVERS
Definition: ONCRPCServer.h:31
DiTauMassTools::MaxHistStrategyV2::e
e
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:26
JiveXML::JiveXMLServer::StartServingThread
StatusCode StartServingThread()
Start the serving thread.
Definition: JiveXMLServer.cxx:70
DEBUG
#define DEBUG
Definition: page_access.h:11
JiveXML::JiveXMLServer::m_ServerThreadHandle
pthread_t m_ServerThreadHandle
Definition: JiveXMLServer.h:113
JiveXML::JiveXMLServer::ServerThreadStopped
virtual void ServerThreadStopped() override
Callback whenever the server thread is stopped.
Definition: JiveXMLServer.cxx:192
python.Constants.VERBOSE
int VERBOSE
Definition: Control/AthenaCommon/python/Constants.py:14
JiveXML::JiveXMLServer::Wait
void Wait()
Wait for the server finish.
Definition: JiveXMLServer.cxx:181
get_generator_info.error
error
Definition: get_generator_info.py:40
JiveXML::JiveXMLServer::~JiveXMLServer
virtual ~JiveXMLServer()
Destructor.
Definition: JiveXMLServer.cxx:58
JiveXML::EventStreamID::RunNumber
unsigned int RunNumber() const
Definition: EventStream.h:42
python.TrigInDetArtSteps.timeout
timeout
Definition: TrigInDetArtSteps.py:35
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
run.Message
Message
Definition: run.py:57
JiveXML::JiveXMLServer::UpdateEventForStream
virtual StatusCode UpdateEventForStream(const EventStreamID &evtStreamID, const std::string &event) override
Put this event as new current event for stream given by name.
Definition: JiveXMLServer.cxx:201
JiveXML::NSTREAMMAX
const unsigned int NSTREAMMAX
Definition: ONCRPCServer.h:38