ATLAS Offline Software
Loading...
Searching...
No Matches
JiveXML::JiveXMLServer Class Reference

#include <JiveXMLServer.h>

Inheritance diagram for JiveXML::JiveXMLServer:
Collaboration diagram for JiveXML::JiveXMLServer:

Public Member Functions

 JiveXMLServer (int port=0)
 Constructor.
virtual ~JiveXMLServer ()
 Destructor.
IEventReceiver methods
virtual StatusCode UpdateEventForStream (const EventStreamID &evtStreamID, const std::string &event) override
 Put this event as new current event for stream given by name.
IEventServer methods
virtual std::vector< std::string > GetStreamNames () const override
 get the names of all the streams
virtual const EventStreamID GetEventStreamID (const std::string &streamName) const override
 get the current EventStreamID for a particular stream
virtual const std::string GetEvent (const EventStreamID &evtStreamID) const override
 get the current event for a particular stream
virtual int GetState () const override
 get the Status of the application
virtual void Message (const MSG::Level level, const std::string &msg) const override
 This function is exposed to allow using ERS messaging service from other threads.
virtual MSG::Level LogLevel () const override
 Get the logging level.

Event serving thread control

static std::promise< int > m_receivedSignal ATLAS_THREAD_SAFE
int m_portNumber
EventStreamMap m_eventStreamMap
pthread_mutex_t m_accessLock ATLAS_THREAD_SAFE
pthread_t m_ServerThreadHandle
bool m_runServerThread
StatusCode StartServingThread ()
 Start the serving thread.
StatusCode StopServingThread ()
 Stop the serving thread.
virtual bool GetRunServerFlag () const override
 The server thread will stop once this flag is set to false.
virtual void ServerThreadStopped () override
 Callback whenever the server thread is stopped.
void Wait ()
 Wait for the server finish.
static void signalHandler (int signum)
 When the signal handler is called, switch the lock to the post condition.

Detailed Description

Definition at line 36 of file JiveXMLServer.h.

Constructor & Destructor Documentation

◆ JiveXMLServer()

JiveXMLServer::JiveXMLServer ( int port = 0)

Constructor.

Definition at line 41 of file JiveXMLServer.cxx.

41 :
42 m_portNumber(port){
43
44 //Make sure ServerThread does not start unexpectedly
45 m_runServerThread = false ;
46
47 //And then start it
48 StartServingThread().ignore();
49
50 //Also register the signal handlers
53 }
static void signalHandler(int signum)
When the signal handler is called, switch the lock to the post condition.
StatusCode StartServingThread()
Start the serving thread.

◆ ~JiveXMLServer()

JiveXMLServer::~JiveXMLServer ( )
virtual

Destructor.

Definition at line 58 of file JiveXMLServer.cxx.

58 {
59
60 //Just stop the serving thread
61 StopServingThread().ignore();
62 }
StatusCode StopServingThread()
Stop the serving thread.

Member Function Documentation

◆ GetEvent()

const std::string JiveXMLServer::GetEvent ( const EventStreamID & evtStreamID) const
overridevirtual

get the current event for a particular stream

Return the event for a given stream.

Implements JiveXML::IEventServer.

Definition at line 363 of file JiveXMLServer.cxx.

363 {
364
365 //Obtain an exclusive access lock
366 int retVal = pthread_mutex_lock(&m_accessLock);
367 if ( retVal != 0 ){
368 ERS_ERROR("Unable to obtain access lock to get event: " << fmterror(retVal));
369 return std::string("");
370 }
371
372
373 // Search the entry in the map
374 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
375
376 //Initialize with an empty event stream
377 std::string event;
378
379 //If the element is found, get a copy of the found event string
380 if ( MapItr != m_eventStreamMap.end()){
381 event = std::string((*MapItr).second);
382 }
383
384 //Release the lock
385 retVal = pthread_mutex_unlock(&m_accessLock);
386 if ( retVal != 0 )
387 ERS_ERROR("Unable to release access lock after getting stream event: " << fmterror(retVal));
388
389 return event;
390 }
#define ERS_ERROR(message)
EventStreamMap m_eventStreamMap

◆ GetEventStreamID()

const EventStreamID JiveXMLServer::GetEventStreamID ( const std::string & streamName) const
overridevirtual

get the current EventStreamID for a particular stream

Return the EventStreamID for the last event of a given stream.

Implements JiveXML::IEventServer.

Definition at line 331 of file JiveXMLServer.cxx.

331 {
332
333 //Obtain an exclusive access lock
334 int retVal = pthread_mutex_lock(&m_accessLock);
335 if ( retVal != 0 ){
336 ERS_ERROR("Unable to obtain access lock to get stream ID: " << fmterror(retVal));
337 return EventStreamID("");
338 }
339
340 // Search the entry in the map
341 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
342
343 //Initialize with an invalid event stream identifier
344 EventStreamID streamID = EventStreamID("");
345
346 //If the element is found, get a copy of the found event stream identifier
347 if ( MapItr != m_eventStreamMap.end()){
348 streamID = EventStreamID((*MapItr).first);
349 }
350
351 //Release the lock
352 retVal = pthread_mutex_unlock(&m_accessLock);
353 if ( retVal != 0 )
354 ERS_ERROR("Unable to release access lock after getting stream ID: " << fmterror(retVal));
355
356 return streamID;
357
358 }

◆ GetRunServerFlag()

virtual bool JiveXML::JiveXMLServer::GetRunServerFlag ( ) const
inlineoverridevirtual

The server thread will stop once this flag is set to false.

Implements JiveXML::IEventServer.

Definition at line 78 of file JiveXMLServer.h.

78{ return m_runServerThread; };

◆ GetState()

int JiveXMLServer::GetState ( ) const
overridevirtual

get the Status of the application

Return the current athena state.

at the moement return a fixed value - will change this to some timeout condition soon.

Implements JiveXML::IEventServer.

Definition at line 285 of file JiveXMLServer.cxx.

285 {
290 return 3;
291 }

◆ GetStreamNames()

std::vector< std::string > JiveXMLServer::GetStreamNames ( ) const
overridevirtual

get the names of all the streams

Return an array with all the stream names.

Implements JiveXML::IEventServer.

Definition at line 296 of file JiveXMLServer.cxx.

296 {
297
298 //Create a vector that can be returned
299 std::vector<std::string> StreamNames;
300
301 //Obtain an exclusive access lock
302 int retVal = pthread_mutex_lock(&m_accessLock);
303 if ( retVal != 0 ){
304 ERS_ERROR("Unable to obtain access lock to get stream names: " << fmterror(retVal));
305 return StreamNames;
306 }
307
308 // Iterate over map to get entries
309 EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
310 for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
311
312 //Get the EventStreamID object
313 EventStreamID EvtStrID = (*MapItr).first;
314
315 //Add the name of this EventStreamID to the list of stream names
316 StreamNames.push_back(EvtStrID.StreamName());
317 }
318
319 //Release the lock
320 retVal = pthread_mutex_unlock(&m_accessLock);
321 if ( retVal != 0 )
322 ERS_ERROR("Unable to release access lock after getting stream names: " << fmterror(retVal));
323
324 //Return the list of names
325 return StreamNames;
326 }

◆ LogLevel()

MSG::Level JiveXMLServer::LogLevel ( ) const
overridevirtual

Get the logging level.

Currently only used to suppress client hostname lookup if not in debug mode.

Implements JiveXML::IMessage.

Definition at line 407 of file JiveXMLServer.cxx.

407 {
408 //set to fixed value for now
409 return MSG::DEBUG;
410 }

◆ Message()

void JiveXMLServer::Message ( const MSG::Level level,
const std::string & msg ) const
overridevirtual

This function is exposed to allow using ERS messaging service from other threads.

Deliver a message - possibly from another thread - to ERS;.

Implements JiveXML::IMessage.

Definition at line 395 of file JiveXMLServer.cxx.

395 {
396 //Deliver message to the proper stream
397 if (level <= MSG::DEBUG) ERS_REPORT_IMPL( ers::debug, ers::Message, msg, level);
398 if (level == MSG::INFO) ERS_REPORT_IMPL( ers::info, ers::Message, msg, );
399 if (level == MSG::WARNING) ERS_REPORT_IMPL( ers::warning, ers::Message, msg, );
400 if (level == MSG::ERROR) ERS_REPORT_IMPL( ers::error, ers::Message, msg, );
401 if (level >= MSG::FATAL) ERS_REPORT_IMPL( ers::fatal, ers::Message, msg, );
402 }
MsgStream & msg
Definition testRead.cxx:32

◆ ServerThreadStopped()

void JiveXMLServer::ServerThreadStopped ( )
overridevirtual

Callback whenever the server thread is stopped.

When the server thread stopped, we will also call the the signal handler with the special value -1.

Implements JiveXML::IServer.

Definition at line 198 of file JiveXMLServer.cxx.

198 {
199 //call the signal handler, so we will also reach post condition
200 signalHandler(-1);
201 }

◆ signalHandler()

void JiveXMLServer::signalHandler ( int signum)
staticprivate

When the signal handler is called, switch the lock to the post condition.

Definition at line 176 of file JiveXMLServer.cxx.

176 {
177 //Store signal and notify thread
178 m_receivedSignal.set_value(signal);
179 }

◆ StartServingThread()

StatusCode JiveXMLServer::StartServingThread ( )

Start the serving thread.

Create the server by.

  • passing a this-pointer as an argument
  • setting the thread-running flag
  • starting the server thread

Definition at line 70 of file JiveXMLServer.cxx.

70 {
71
72 ERS_DEBUG(MSG::VERBOSE,"StartServingThread()");
73
74 //Initialize access lock mechanism to ensure that the data map is never
75 //accessed by more than one thread at the same time. NULL means default
76 int retVal = pthread_mutex_init(&m_accessLock,NULL);
77 if (retVal != 0){
78 ERS_WARNING("Unable to initialize access lock while starting server: " << fmterror(retVal));
79 return StatusCode::FAILURE;
80 }
81
82 //The arguments passed on to the server - create new object on the heap that
83 //is persistent through the lifetime of the thread
85
86 //set runServer flag to true, so the thread will start
87 m_runServerThread = true ;
88
89 //create thread itself
90 if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
91
92 //Create thread failed
93 ERS_WARNING("Thread creation failed");
94 return StatusCode::FAILURE;
95 }
96
97 return StatusCode::SUCCESS;
98
99 }
#define ERS_WARNING(message)
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.

◆ StopServingThread()

StatusCode JiveXMLServer::StopServingThread ( )

Stop the serving thread.

Stop the server by.

  • unsetting the thread-running flag
  • waiting till the server thread has finished
  • destroying the thread handle

Ping the server which will cause another request Otherwise the server won't update its loop condition

Definition at line 107 of file JiveXMLServer.cxx.

107 {
108
109 ERS_DEBUG(MSG::VERBOSE,"StopServingThread()");
110
115
116 //Create a client - this will already cause an update of the file
117 //descriptors on the sockets
118 CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
119 if (!client){
120 ERS_ERROR("Unable to create shutdown client for local server");
121 return StatusCode::FAILURE;
122 }
123
124 // Next unset the runServerThread flag, which will cause the loop to stop
125 // This needs to happen after client creation, otherwise the next call won't
126 // be serverd anymore
127 m_runServerThread = false ;
128
129 // Now issue the call with a timeout
130 struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
131// xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
132#if __GNUC__ >= 8
133# pragma GCC diagnostic push
134# pragma GCC diagnostic ignored "-Wcast-function-type"
135#endif
136#if defined(__clang__) && __clang_major__ >= 19
137# pragma clang diagnostic push
138# pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
139#endif
140 clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
141#if defined(__clang__) && __clang_major__ >= 19
142# pragma clang diagnostic pop
143#endif
144#if __GNUC__ >= 8
145# pragma GCC diagnostic pop
146#endif
147
148 // A pointer to the return value of the thread
149 void* ret = NULL;
150 // wait till the server thread has finished
151 ERS_INFO("Waiting for server thread to terminate ...");
152 pthread_join(m_ServerThreadHandle, &ret);
153 ERS_INFO(" ... finished server thread");
154
155 //check if there was a return value
156 if (ret){
157 //Get the return value
158 unsigned long NRequests = *(unsigned long*)ret;
159 ERS_DEBUG(MSG::DEBUG,"Server thread stopped after handling " << NRequests << " requests");
160 } else
161 ERS_WARNING("Server thread stopped unexpectedly");
162
163 //Destroy the access lock
164 int retVal = pthread_mutex_destroy(&m_accessLock);
165 if (retVal != 0){
166 ERS_WARNING("Unable to destroy access lock after stopping server: " << fmterror(retVal));
167 return StatusCode::FAILURE;
168 }
169
170 return StatusCode::SUCCESS;
171 }
#define ONCRPCSERVERVERS
#define ONCRPCSERVERPROG

◆ UpdateEventForStream()

StatusCode JiveXMLServer::UpdateEventForStream ( const EventStreamID & evtStreamID,
const std::string & event )
overridevirtual

Put this event as new current event for stream given by name.

Get one event and put it as the new event for the this stream, which is identified by EventStreamID.

Implements JiveXML::IEventReceiver.

Definition at line 207 of file JiveXMLServer.cxx.

207 {
208
209 ERS_DEBUG(MSG::VERBOSE,"UpdateEventForStream");
210
211 //Check that the event stream id is valid
212 if (!evtStreamID.isValid()){
213 ERS_ERROR("Invalid event stream identifier - cannot add event");
214 return StatusCode::FAILURE;
215 }
216
217 //Make sure we don't have already exceeded the maximum number of streams
218 if (m_eventStreamMap.size() > NSTREAMMAX ){
219 ERS_ERROR("Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event");
220 return StatusCode::FAILURE;
221 }
222
223 //Make sure the event is not larger than the allowed maximal size
224 if (event.length() > NBYTESMAX ){
225 ERS_ERROR("Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event");
226 return StatusCode::FAILURE;
227 }
228
229 //Make sure we are the only one accessing the data right now, by trying to
230 //obtain a lock. If the lock can not be obtained after a certain time, an
231 //error is reported
232
233 //Timeout of 30 second and 0 nanoseconds
234 struct timespec timeout = { 30, 0 };
235 //Try to obtain the lock
236 int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
237 if ( retVal != 0 ){
238 ERS_ERROR("Unable to obtain access lock to update event: " << fmterror(retVal));
239 return StatusCode::FAILURE;
240 }
241
242 //Using try/catch to ensure the mutex gets unlocked in any case
243 try {
244
245 //Using std::map::operator[] and std::map::insert() will create a new event
246 //if it did not exist, otherwise just replace the existing entry (making a
247 //copy of the std::string) but would not update the key which holds new
248 //event/run number. Therefore delete existing entry first.
249
250 //Delete old entry if there is one
251 EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
252 if (OldEvtItr != m_eventStreamMap.end())
253 m_eventStreamMap.erase(OldEvtItr);
254
255 //Now add the new event
256 m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
257
258 } catch ( const std::exception& e ) {
259 ERS_ERROR("Exception caught while updating event for stream "
260 << evtStreamID.StreamName() << ": " << e.what());
261 //Also release the lock in this case
262 pthread_mutex_unlock(&m_accessLock);
263 //before we return
264 return StatusCode::FAILURE;
265 }
266
267 //Finally release the lock again
268 retVal = pthread_mutex_unlock(&m_accessLock);
269 if ( retVal != 0 ){
270 ERS_ERROR("Unable to release access lock after updating event: " << fmterror(retVal));
271 return StatusCode::FAILURE;
272 }
273
274 ERS_DEBUG(MSG::DEBUG, "Updated stream " << evtStreamID.StreamName()
275 << " with event Nr. " << evtStreamID.EventNumber()
276 << " from run Nr. " << evtStreamID.RunNumber());
277
278 return StatusCode::SUCCESS;
279 }
const unsigned int NBYTESMAX
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition EventStream.h:86
const unsigned int NSTREAMMAX

◆ Wait()

void JiveXMLServer::Wait ( )

Wait for the server finish.

Wait for the server thread to finish.

This can be because a) we received a SIGTERM or SIGINT signal b) the server thread stopped by itself

Definition at line 186 of file JiveXMLServer.cxx.

186 {
187 auto signal = m_receivedSignal.get_future();
188 //just wait for a signal
189 signal.wait();
190 //Tell why the lock was released
191 ERS_INFO("Reached post-condition after received signal " << signal.get() );
192 }

Member Data Documentation

◆ ATLAS_THREAD_SAFE [1/2]

std::promise<int> m_receivedSignal JiveXML::JiveXMLServer::ATLAS_THREAD_SAFE
inlinestaticprivate

Definition at line 95 of file JiveXMLServer.h.

◆ ATLAS_THREAD_SAFE [2/2]

pthread_mutex_t m_accessLock JiveXML::JiveXMLServer::ATLAS_THREAD_SAFE
mutableprivate

Definition at line 105 of file JiveXMLServer.h.

◆ m_eventStreamMap

EventStreamMap JiveXML::JiveXMLServer::m_eventStreamMap
private

Definition at line 102 of file JiveXMLServer.h.

◆ m_portNumber

int JiveXML::JiveXMLServer::m_portNumber
private

Definition at line 99 of file JiveXMLServer.h.

◆ m_runServerThread

bool JiveXML::JiveXMLServer::m_runServerThread
private

Definition at line 112 of file JiveXMLServer.h.

◆ m_ServerThreadHandle

pthread_t JiveXML::JiveXMLServer::m_ServerThreadHandle
private

Definition at line 108 of file JiveXMLServer.h.


The documentation for this class was generated from the following files: