ATLAS Offline Software
Loading...
Searching...
No Matches
JiveXMLServer.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7//tdaq includes
8#include <ers/ers.h>
9
10//JiveXML includes
15
16#include <signal.h>
17
18//Define warning and error
19#define ERS_WARNING( message ) \
20{ \
21 ERS_REPORT_IMPL( ers::warning, ers::Message, message, ); \
22}
23
24#define ERS_ERROR( message ) \
25{ \
26 ERS_REPORT_IMPL( ers::error, ers::Message, message, ); \
27}
28
29namespace {
30 std::string fmterror(int code) {
31 char buf[256];
32 return std::string(strerror_r(code, buf, sizeof(buf)));
33 }
34}
35
36namespace JiveXML {
37
42 m_portNumber(port){
43
44 //Make sure ServerThread does not start unexpectedly
45 m_runServerThread = false ;
46
47 //And then start it
48 StartServingThread().ignore();
49
50 //Also register the signal handlers
51 signal( SIGINT , JiveXMLServer::signalHandler );
52 signal( SIGTERM, JiveXMLServer::signalHandler );
53 }
54
59
60 //Just stop the serving thread
61 StopServingThread().ignore();
62 }
63
71
72 ERS_DEBUG(MSG::VERBOSE,"StartServingThread()");
73
74 //Initialize access lock mechanism to ensure that the data map is never
75 //accessed by more than one thread at the same time. NULL means default
76 int retVal = pthread_mutex_init(&m_accessLock,NULL);
77 if (retVal != 0){
78 ERS_WARNING("Unable to initialize access lock while starting server: " << fmterror(retVal));
79 return StatusCode::FAILURE;
80 }
81
82 //The arguments passed on to the server - create new object on the heap that
83 //is persistent through the lifetime of the thread
85
86 //set runServer flag to true, so the thread will start
87 m_runServerThread = true ;
88
89 //create thread itself
90 if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
91
92 //Create thread failed
93 ERS_WARNING("Thread creation failed");
94 return StatusCode::FAILURE;
95 }
96
97 return StatusCode::SUCCESS;
98
99 }
100
108
109 ERS_DEBUG(MSG::VERBOSE,"StopServingThread()");
110
115
116 //Create a client - this will already cause an update of the file
117 //descriptors on the sockets
118 CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
119 if (!client){
120 ERS_ERROR("Unable to create shutdown client for local server");
121 return StatusCode::FAILURE;
122 }
123
124 // Next unset the runServerThread flag, which will cause the loop to stop
125 // This needs to happen after client creation, otherwise the next call won't
126 // be serverd anymore
127 m_runServerThread = false ;
128
129 // Now issue the call with a timeout
130 struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
131// xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
132#if __GNUC__ >= 8
133# pragma GCC diagnostic push
134# pragma GCC diagnostic ignored "-Wcast-function-type"
135#endif
136#if defined(__clang__) && __clang_major__ >= 19
137# pragma clang diagnostic push
138# pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
139#endif
140 clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
141#if defined(__clang__) && __clang_major__ >= 19
142# pragma clang diagnostic pop
143#endif
144#if __GNUC__ >= 8
145# pragma GCC diagnostic pop
146#endif
147
148 // A pointer to the return value of the thread
149 void* ret = NULL;
150 // wait till the server thread has finished
151 ERS_INFO("Waiting for server thread to terminate ...");
152 pthread_join(m_ServerThreadHandle, &ret);
153 ERS_INFO(" ... finished server thread");
154
155 //check if there was a return value
156 if (ret){
157 //Get the return value
158 unsigned long NRequests = *(unsigned long*)ret;
159 ERS_DEBUG(MSG::DEBUG,"Server thread stopped after handling " << NRequests << " requests");
160 } else
161 ERS_WARNING("Server thread stopped unexpectedly");
162
163 //Destroy the access lock
164 int retVal = pthread_mutex_destroy(&m_accessLock);
165 if (retVal != 0){
166 ERS_WARNING("Unable to destroy access lock after stopping server: " << fmterror(retVal));
167 return StatusCode::FAILURE;
168 }
169
170 return StatusCode::SUCCESS;
171 }
172
177 //Store signal
178 m_receivedSignal=signal;
179 //finish semaphore lock
180 m_lock.post();
181 }
182
189 //just wait for the lock
190 m_lock.wait();
191 //Tell why the lock was released
192 ERS_INFO("Reached post-condition after received signal " << m_receivedSignal );
193 }
194
200 //call the signal handler, so we will also reach post condition
201 signalHandler(-1);
202 }
203
208 StatusCode JiveXMLServer::UpdateEventForStream( const EventStreamID& evtStreamID, const std::string & event) {
209
210 ERS_DEBUG(MSG::VERBOSE,"UpdateEventForStream");
211
212 //Check that the event stream id is valid
213 if (!evtStreamID.isValid()){
214 ERS_ERROR("Invalid event stream identifier - cannot add event");
215 return StatusCode::FAILURE;
216 }
217
218 //Make sure we don't have already exceeded the maximum number of streams
219 if (m_eventStreamMap.size() > NSTREAMMAX ){
220 ERS_ERROR("Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event");
221 return StatusCode::FAILURE;
222 }
223
224 //Make sure the event is not larger than the allowed maximal size
225 if (event.length() > NBYTESMAX ){
226 ERS_ERROR("Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event");
227 return StatusCode::FAILURE;
228 }
229
230 //Make sure we are the only one accessing the data right now, by trying to
231 //obtain a lock. If the lock can not be obtained after a certain time, an
232 //error is reported
233
234 //Timeout of 30 second and 0 nanoseconds
235 struct timespec timeout = { 30, 0 };
236 //Try to obtain the lock
237 int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
238 if ( retVal != 0 ){
239 ERS_ERROR("Unable to obtain access lock to update event: " << fmterror(retVal));
240 return StatusCode::FAILURE;
241 }
242
243 //Using try/catch to ensure the mutex gets unlocked in any case
244 try {
245
246 //Using std::map::operator[] and std::map::insert() will create a new event
247 //if it did not exist, otherwise just replace the existing entry (making a
248 //copy of the std::string) but would not update the key which holds new
249 //event/run number. Therefore delete existing entry first.
250
251 //Delete old entry if there is one
252 EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
253 if (OldEvtItr != m_eventStreamMap.end())
254 m_eventStreamMap.erase(OldEvtItr);
255
256 //Now add the new event
257 m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
258
259 } catch ( const std::exception& e ) {
260 ERS_ERROR("Exception caught while updating event for stream "
261 << evtStreamID.StreamName() << ": " << e.what());
262 //Also release the lock in this case
263 pthread_mutex_unlock(&m_accessLock);
264 //before we return
265 return StatusCode::FAILURE;
266 }
267
268 //Finally release the lock again
269 retVal = pthread_mutex_unlock(&m_accessLock);
270 if ( retVal != 0 ){
271 ERS_ERROR("Unable to release access lock after updating event: " << fmterror(retVal));
272 return StatusCode::FAILURE;
273 }
274
275 ERS_DEBUG(MSG::DEBUG, "Updated stream " << evtStreamID.StreamName()
276 << " with event Nr. " << evtStreamID.EventNumber()
277 << " from run Nr. " << evtStreamID.RunNumber());
278
279 return StatusCode::SUCCESS;
280 }
281
282
291 return 3;
292 }
293
297 std::vector<std::string> JiveXMLServer::GetStreamNames() const {
298
299 //Create a vector that can be returned
300 std::vector<std::string> StreamNames;
301
302 //Obtain an exclusive access lock
303 int retVal = pthread_mutex_lock(&m_accessLock);
304 if ( retVal != 0 ){
305 ERS_ERROR("Unable to obtain access lock to get stream names: " << fmterror(retVal));
306 return StreamNames;
307 }
308
309 // Iterate over map to get entries
310 EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
311 for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
312
313 //Get the EventStreamID object
314 EventStreamID EvtStrID = (*MapItr).first;
315
316 //Add the name of this EventStreamID to the list of stream names
317 StreamNames.push_back(EvtStrID.StreamName());
318 }
319
320 //Release the lock
321 retVal = pthread_mutex_unlock(&m_accessLock);
322 if ( retVal != 0 )
323 ERS_ERROR("Unable to release access lock after getting stream names: " << fmterror(retVal));
324
325 //Return the list of names
326 return StreamNames;
327 }
328
332 const EventStreamID JiveXMLServer::GetEventStreamID( const std::string& StreamName) const {
333
334 //Obtain an exclusive access lock
335 int retVal = pthread_mutex_lock(&m_accessLock);
336 if ( retVal != 0 ){
337 ERS_ERROR("Unable to obtain access lock to get stream ID: " << fmterror(retVal));
338 return EventStreamID("");
339 }
340
341 // Search the entry in the map
342 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
343
344 //Initialize with an invalid event stream identifier
345 EventStreamID streamID = EventStreamID("");
346
347 //If the element is found, get a copy of the found event stream identifier
348 if ( MapItr != m_eventStreamMap.end()){
349 streamID = EventStreamID((*MapItr).first);
350 }
351
352 //Release the lock
353 retVal = pthread_mutex_unlock(&m_accessLock);
354 if ( retVal != 0 )
355 ERS_ERROR("Unable to release access lock after getting stream ID: " << fmterror(retVal));
356
357 return streamID;
358
359 }
360
364 const std::string JiveXMLServer::GetEvent( const EventStreamID& evtStreamID ) const {
365
366 //Obtain an exclusive access lock
367 int retVal = pthread_mutex_lock(&m_accessLock);
368 if ( retVal != 0 ){
369 ERS_ERROR("Unable to obtain access lock to get event: " << fmterror(retVal));
370 return std::string("");
371 }
372
373
374 // Search the entry in the map
375 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
376
377 //Initialize with an empty event stream
378 std::string event;
379
380 //If the element is found, get a copy of the found event string
381 if ( MapItr != m_eventStreamMap.end()){
382 event = std::string((*MapItr).second);
383 }
384
385 //Release the lock
386 retVal = pthread_mutex_unlock(&m_accessLock);
387 if ( retVal != 0 )
388 ERS_ERROR("Unable to release access lock after getting stream event: " << fmterror(retVal));
389
390 return event;
391 }
392
396 void JiveXMLServer::Message( const MSG::Level level, const std::string& msg) const {
397 //Deliver message to the proper stream
398 if (level <= MSG::DEBUG) ERS_REPORT_IMPL( ers::debug, ers::Message, msg, level);
399 if (level == MSG::INFO) ERS_REPORT_IMPL( ers::info, ers::Message, msg, );
400 if (level == MSG::WARNING) ERS_REPORT_IMPL( ers::warning, ers::Message, msg, );
401 if (level == MSG::ERROR) ERS_REPORT_IMPL( ers::error, ers::Message, msg, );
402 if (level >= MSG::FATAL) ERS_REPORT_IMPL( ers::fatal, ers::Message, msg, );
403 }
404
408 MSG::Level JiveXMLServer::LogLevel() const {
409 //set to fixed value for now
410 return MSG::DEBUG;
411 }
412}
#define ERS_WARNING(message)
#define ERS_ERROR(message)
#define ONCRPCSERVERVERS
#define ONCRPCSERVERPROG
For the client-server communication, each event is uniquely identified by the run number,...
Definition EventStream.h:19
unsigned int RunNumber() const
Definition EventStream.h:42
unsigned long EventNumber() const
Definition EventStream.h:41
const std::string & StreamName() const
Definition EventStream.h:43
virtual StatusCode UpdateEventForStream(const EventStreamID &evtStreamID, const std::string &event) override
Put this event as new current event for stream given by name.
virtual ~JiveXMLServer()
Destructor.
virtual std::vector< std::string > GetStreamNames() const override
get the names of all the streams
virtual void Message(const MSG::Level level, const std::string &msg) const override
This function is exposed to allow using ERS messaging service from other threads.
virtual MSG::Level LogLevel() const override
Get the logging level.
virtual void ServerThreadStopped() override
Callback whenever the server thread is stopped.
static std::atomic< int > m_receivedSignal
static void signalHandler(int signum)
When the signal handler is called, switch the lock to the post condition.
StatusCode StartServingThread()
Start the serving thread.
void Wait()
Wait for the server finish.
EventStreamMap m_eventStreamMap
StatusCode StopServingThread()
Stop the serving thread.
JiveXMLServer(int port=0)
Constructor.
virtual int GetState() const override
get the Status of the application
virtual const std::string GetEvent(const EventStreamID &evtStreamID) const override
get the current event for a particular stream
virtual const EventStreamID GetEventStreamID(const std::string &streamName) const override
get the current EventStreamID for a particular stream
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
const unsigned int NBYTESMAX
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition EventStream.h:86
const unsigned int NSTREAMMAX
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
MsgStream & msg
Definition testRead.cxx:32