ATLAS Offline Software
Loading...
Searching...
No Matches
JiveXML::ONCRPCServerSvc Class Reference

This athena service will create an ONC/RPC (aka SunRPC) server, that can provide the athena status as well as data strings from events. More...

#include <ONCRPCServerSvc.h>

Inheritance diagram for JiveXML::ONCRPCServerSvc:
Collaboration diagram for JiveXML::ONCRPCServerSvc:

Public Member Functions

 ONCRPCServerSvc (const std::string &name, ISvcLocator *sl)
 Default constructor.
virtual ~ONCRPCServerSvc ()
 Destructor.
virtual StatusCode initialize () override
 Gaudi default methods.
virtual StatusCode finalize () override
 Finalize - called once at the end.

Server methods

int m_portNumber
EventStreamMap m_eventStreamMap
pthread_mutex_t m_accessLock ATLAS_THREAD_SAFE {}
pthread_t m_ServerThreadHandle {}
bool m_runServerThread
virtual int GetState () const override
 get the Status of the application
virtual std::vector< std::string > GetStreamNames () const override
 get the names of all the streams
virtual const EventStreamID GetEventStreamID (const std::string &streamName) const override
 get the current EventStreamID for a particular stream
virtual const std::string GetEvent (const EventStreamID &evtStreamID) const override
 get the current event for a particular stream
virtual StatusCode UpdateEventForStream (const EventStreamID &evtStreamID, const std::string &event) override
 Put this event as new current event for stream given by name.
virtual void Message (const MSG::Level level, const std::string &mesg) const override
 This function is exposed to allow using athena messaging service from other threads.
virtual MSG::Level LogLevel () const override
 Get the logging level.
virtual bool GetRunServerFlag () const override
 The server thread will stop once this flag is set to false.
StatusCode StartServer ()
 Start the server thread.
StatusCode StopServer ()
 Stop the server thread.
virtual void ServerThreadStopped () override
 Callback when server thread terminates.

Detailed Description

This athena service will create an ONC/RPC (aka SunRPC) server, that can provide the athena status as well as data strings from events.

It can handle several different event streams, but for each stream only the latest event is available. This service is usually used by the Athena JiveXML algorithm providing XML events for an Atlantis Java client.

The server will create a single server thread, which loops over the sockets until the m_runServerThread flag is set to false. On each request, it will call the ONCRPCRequestHandler, which will generate a new thread for each request. The actual response to the requests happens in these ONCRPCDispatchThread. A list of the thread handles is kept in the a ThreadCollection, where each thread will add and remove itself. In the finalization stage, the server will wait for all dispatch threats to finish before exiting.

On all stages, the server thread will have a handle to the athena service via the thread specific key ServerSvcKey. It is thus possible to have many servers running simultaneously. Note however, that currently RPC will forbid you to start several servers with the same ONCRPCSERVERPROG.

Definition at line 42 of file ONCRPCServerSvc.h.

Constructor & Destructor Documentation

◆ ONCRPCServerSvc()

JiveXML::ONCRPCServerSvc::ONCRPCServerSvc ( const std::string & name,
ISvcLocator * sl )

Default constructor.

Constructor.

Definition at line 275 of file ONCRPCServerSvc.cxx.

275 :
276 base_class( name, sl )
277 {
278 //declare port number property
279 declareProperty("PortNumber",m_portNumber=0,"Port number to bind - assigned dynamically if set to zero [default: 0]");
280
281 //Make sure ServerThread does not start unexpectedly
282 m_runServerThread = false ;
283
284 ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
285
286 }
#define ATH_MSG_VERBOSE(x)
MsgStream & msg
Definition testRead.cxx:32

◆ ~ONCRPCServerSvc()

JiveXML::ONCRPCServerSvc::~ONCRPCServerSvc ( )
virtual

Destructor.

Definition at line 291 of file ONCRPCServerSvc.cxx.

291 {
292
293 ATH_MSG_VERBOSE( "Destructor() " );
294 }

Member Function Documentation

◆ finalize()

StatusCode JiveXML::ONCRPCServerSvc::finalize ( )
overridevirtual

Finalize - called once at the end.

  • shut down server

Definition at line 321 of file ONCRPCServerSvc.cxx.

321 {
322
323 ATH_MSG_VERBOSE( "Finalize()" );
324
325 //Stop the server thread and return status code
326 return StopServer();
327 }
StatusCode StopServer()
Stop the server thread.

◆ GetEvent()

const std::string JiveXML::ONCRPCServerSvc::GetEvent ( const EventStreamID & evtStreamID) const
overridevirtual

get the current event for a particular stream

Return the event for a given stream.

Implements JiveXML::IEventServer.

Definition at line 248 of file ONCRPCServerSvc.cxx.

248 {
249
250 //Obtain an exclusive access lock
251 int retVal = pthread_mutex_lock(&m_accessLock);
252 if ( retVal != 0 ){
253 ATH_MSG_ERROR( "Unable to obtain access lock to get event: " << fmterror(retVal) );
254 return std::string("");
255 }
256
257 // Search the entry in the map
258 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
259
260 //Release the lock
261 retVal = pthread_mutex_unlock(&m_accessLock);
262 if ( retVal != 0 )
263 ATH_MSG_ERROR( "Unable to release access lock after getting stream event: " << fmterror(retVal) );
264
265 //If the element was not found return an empty string
266 if ( MapItr == m_eventStreamMap.end()) return std::string("");
267
268 //Return a copy of the found event string
269 return std::string((*MapItr).second);
270 }
#define ATH_MSG_ERROR(x)
EventStreamMap m_eventStreamMap

◆ GetEventStreamID()

const EventStreamID JiveXML::ONCRPCServerSvc::GetEventStreamID ( const std::string & streamName) const
overridevirtual

get the current EventStreamID for a particular stream

Return the EventStreamID for the last event of a given stream.

Implements JiveXML::IEventServer.

Definition at line 221 of file ONCRPCServerSvc.cxx.

221 {
222
223 //Obtain an exclusive access lock
224 int retVal = pthread_mutex_lock(&m_accessLock);
225 if ( retVal != 0 ){
226 ATH_MSG_ERROR( "Unable to obtain access lock to get stream ID: " << fmterror(retVal) );
227 return EventStreamID("");
228 }
229
230 // Search the entry in the map
231 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
232
233 //Release the lock
234 retVal = pthread_mutex_unlock(&m_accessLock);
235 if ( retVal != 0 )
236 ATH_MSG_ERROR( "Unable to release access lock after getting stream ID: " << fmterror(retVal) );
237
238 //If the element was not found return an invalid ID
239 if ( MapItr == m_eventStreamMap.end()) return EventStreamID("");
240
241 //Return the found event stream identifier
242 return (*MapItr).first;
243 }

◆ GetRunServerFlag()

virtual bool JiveXML::ONCRPCServerSvc::GetRunServerFlag ( ) const
inlineoverridevirtual

The server thread will stop once this flag is set to false.

Implements JiveXML::IEventServer.

Definition at line 82 of file ONCRPCServerSvc.h.

82{ return m_runServerThread; };

◆ GetState()

int JiveXML::ONCRPCServerSvc::GetState ( ) const
overridevirtual

get the Status of the application

Return the current athena state.

at the moment simply return state machine state - we might consider checking for state transitions (m_state != m_targetState) and return a "busy" value. At the moment, all state transitions should be instantanious, so this is not necessary

Implements JiveXML::IEventServer.

Definition at line 173 of file ONCRPCServerSvc.cxx.

173 {
180 return (int)FSMState();
181 }

◆ GetStreamNames()

std::vector< std::string > JiveXML::ONCRPCServerSvc::GetStreamNames ( ) const
overridevirtual

get the names of all the streams

Return an array with all the stream names.

Implements JiveXML::IEventServer.

Definition at line 186 of file ONCRPCServerSvc.cxx.

186 {
187
188 //Create a vector that can be returned
189 std::vector<std::string> StreamNames;
190
191 //Obtain an exclusive access lock
192 int retVal = pthread_mutex_lock(&m_accessLock);
193 if ( retVal != 0 ){
194 ATH_MSG_ERROR( "Unable to obtain access lock to get stream names: " << fmterror(retVal) );
195 return StreamNames;
196 }
197
198 // Iterate over map to get entries
199 EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
200 for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
201
202 //Get the EventStreamID object
203 EventStreamID EvtStrID = (*MapItr).first;
204
205 //Add the name of this EventStreamID to the list of stream names
206 StreamNames.push_back(EvtStrID.StreamName());
207 }
208
209 //Release the lock
210 retVal = pthread_mutex_unlock(&m_accessLock);
211 if ( retVal != 0 )
212 ATH_MSG_ERROR( "Unable to release access lock after getting stream names: " << fmterror(retVal) );
213
214 //Return the list of names
215 return StreamNames;
216 }

◆ initialize()

StatusCode JiveXML::ONCRPCServerSvc::initialize ( )
overridevirtual

Gaudi default methods.

Initialize - called once in beginning.

  • create server

Create the server itself

Definition at line 300 of file ONCRPCServerSvc.cxx.

300 {
301
302 //Initialize Service base
303 if (Service::initialize().isFailure()) return StatusCode::FAILURE;
304
305 //Initialize message stream level
306 msg().setLevel(outputLevel());
307 ATH_MSG_VERBOSE( "Initialize()" );
308 ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
309
314 return StatusCode::SUCCESS;
315 }
#define ATH_CHECK
Evaluate an expression and check for errors.
StatusCode StartServer()
Start the server thread.

◆ LogLevel()

MSG::Level JiveXML::ONCRPCServerSvc::LogLevel ( ) const
overridevirtual

Get the logging level.

Return the logging level - possibly to detect whether it is worth retrieving the information to be logged.

Implements JiveXML::IMessage.

Definition at line 164 of file ONCRPCServerSvc.cxx.

164 {
165 //return the current logging level
166 return msg().level();
167 }

◆ Message()

void JiveXML::ONCRPCServerSvc::Message ( const MSG::Level level,
const std::string & mesg ) const
overridevirtual

This function is exposed to allow using athena messaging service from other threads.

Deliver a message - possibly from another thread - to the athena msgSvc;.

Implements JiveXML::IMessage.

Definition at line 151 of file ONCRPCServerSvc.cxx.

153 {
154 //Deliver message
155 msg() << level << mesg << endmsg;
156 //Spit it out immediately
157 msg().flush();
158 }
#define endmsg

◆ ServerThreadStopped()

void JiveXML::ONCRPCServerSvc::ServerThreadStopped ( )
overridevirtual

Callback when server thread terminates.

This callback will get called whenever the server thread stops.

Implements JiveXML::IServer.

Definition at line 137 of file ONCRPCServerSvc.cxx.

137 {
138 //Check if the thread stopped while the run server flag was set
140 //Deliver an error message
141 ATH_MSG_ERROR( "Server thread stopped while run-server-flag is set!" );
142 //since we can't return statusCode failure (we're in a different thread!)
143 //set the flag to mark the server as dead
144 m_runServerThread = false;
145 }
146 }

◆ StartServer()

StatusCode JiveXML::ONCRPCServerSvc::StartServer ( )

Start the server thread.

Create the server by.

  • passing a set of arguments including a this-pointer
  • setting the thread-running flag
  • starting the server thread

Definition at line 33 of file ONCRPCServerSvc.cxx.

33 {
34
35 //Initialize access lock mechanism to ensure that the data map is never
36 //accessed by more than one thread at the same time. NULL means default
37 int retVal = pthread_mutex_init(&m_accessLock,NULL);
38 if (retVal != 0){
39 ATH_MSG_ERROR( "Unable to initialize access lock while starting server: " << fmterror(retVal) );
40 return StatusCode::FAILURE;
41 }
42
43 //The arguments passed on to the server - create new object on the heap that
44 //is persistent through the lifetime of the thread
46
47 //set runServer flag to true, so the thread will start
48 m_runServerThread = true ;
49
50 //create thread itself
51 if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
52
53 //Create thread failed
54 ATH_MSG_ERROR( "Thread creation failed" );
55 return StatusCode::FAILURE;
56 }
57
58 return StatusCode::SUCCESS;
59
60 }
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.

◆ StopServer()

StatusCode JiveXML::ONCRPCServerSvc::StopServer ( )

Stop the server thread.

Stop the server by.

  • unsetting the thread-running flag
  • waiting till the server thread has finished
  • destroying the thread handle

Ping the server which will cause another request Otherwise the server won't update its loop condition

Definition at line 68 of file ONCRPCServerSvc.cxx.

68 {
69
70 ATH_MSG_VERBOSE( "StopServer()" );
71
76
77 //Create a client - this will already cause an update of the file
78 //descriptors on the sockets
79 CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
80 if (!client){
81 ATH_MSG_ERROR( "Unable to create shutdown client for local server" );
82 return StatusCode::FAILURE;
83 }
84
85 // Next unset the runServerThread flag, which will cause the loop to stop
86 // This needs to happen after client creation, otherwise the next call won't
87 // be serverd anymore
88 m_runServerThread = false ;
89
90// xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
91#if __GNUC__ >= 8
92# pragma GCC diagnostic push
93# pragma GCC diagnostic ignored "-Wcast-function-type"
94#endif
95#if defined(__clang__) && __clang_major__ >= 19
96# pragma clang diagnostic push
97# pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
98#endif
99 // Now issue the call with a timeout
100 struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
101 clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
102#if defined(__clang__) && __clang_major__ >= 19
103# pragma clang diagnostic pop
104#endif
105#if __GNUC__ >= 8
106# pragma GCC diagnostic pop
107#endif
108
109 // A pointer to the return value of the thread
110 void* ret;
111 // wait till the server thread has finished
112 ATH_MSG_INFO( "Waiting for server thread to terminate ..." );
113 pthread_join(m_ServerThreadHandle, &ret);
114 ATH_MSG_INFO( " ... finished server thread" );
115
116 //check if there was a return value
117 if (ret){
118 //Get the return value
119 unsigned long NRequests = *(unsigned long*)ret;
120 ATH_MSG_DEBUG( "Server thread stopped after handling " << NRequests << " requests" );
121 } else
122 ATH_MSG_WARNING( "Server thread had stopped unexpectedly" );
123
124 //Destroy the access lock
125 int retVal = pthread_mutex_destroy(&m_accessLock);
126 if (retVal != 0){
127 ATH_MSG_ERROR( "Unable to destroy access lock after stopping server: " << fmterror(retVal) );
128 return StatusCode::FAILURE;
129 }
130
131 return StatusCode::SUCCESS;
132 }
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
#define ONCRPCSERVERVERS
#define ONCRPCSERVERPROG

◆ UpdateEventForStream()

StatusCode JiveXML::ONCRPCServerSvc::UpdateEventForStream ( const EventStreamID & evtStreamID,
const std::string & event )
overridevirtual

Put this event as new current event for stream given by name.

Implementation of IServerSvc method.

Get one event and put it as the new event for the this stream, which is identified by EventStreamID

Implements JiveXML::IEventReceiver.

Definition at line 333 of file ONCRPCServerSvc.cxx.

333 {
334
335 ATH_MSG_VERBOSE( "UpdateEventForStream()" );
336
337 //Make sure the server is still running
338 if (! m_runServerThread ){
339 ATH_MSG_ERROR( "Server thread is not running - refusing to update event" );
340 return StatusCode::FAILURE;
341 }
342
343 //Check that the event stream id is valid
344 if (!evtStreamID.isValid()){
345 ATH_MSG_ERROR( "Invalid event stream identifier - cannot add event" );
346 return StatusCode::FAILURE;
347 }
348
349 //Make sure we don't have already exceeded the maximum number of streams
350 if (m_eventStreamMap.size() > NSTREAMMAX ){
351 ATH_MSG_ERROR( "Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event" );
352 return StatusCode::FAILURE;
353 }
354
355 //Make sure the event is not larger than the allowed maximal size
356 if (event.length() > NBYTESMAX ){
357 ATH_MSG_ERROR( "Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event" );
358 return StatusCode::FAILURE;
359 }
360
361 //Make sure we are the only one accessing the data right now, by trying to
362 //obtain a lock. If the lock can not be obtained after a certain time, an
363 //error is reported
364
365 //Timeout of 5 second and 0 nanoseconds
366 struct timespec timeout = { 5, 0 };
367 //Try to obtain the lock
368#ifndef __APPLE__
369 int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
370#else
371 int retVal = pthread_mutex_lock(&m_accessLock);
372#endif
373 if ( retVal != 0 ){
374 ATH_MSG_ERROR( "Unable to obtain access lock to update event: " << fmterror(retVal) );
375 return StatusCode::FAILURE;
376 }
377
378 //Using try/catch to ensure the mutex gets unlocked in any case
379 try {
380
381 //Using std::map::operator[] and std::map::insert() will create a new event
382 //if it did not exist, otherwise just replace the existing entry (making a
383 //copy of the std::string) but would not update the key which holds new
384 //event/run number. Therefore delete existing entry first.
385
386 //Delete old entry if there is one
387 EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
388 if (OldEvtItr != m_eventStreamMap.end())
389 m_eventStreamMap.erase(OldEvtItr);
390
391 //Now add the new event
392 m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
393
394 } catch ( const std::exception& e ) {
395 ATH_MSG_ERROR( "Exception caught while updating event for stream " << evtStreamID.StreamName()
396 << ": " << e.what() );
397 //Also release the lock in this case
398 pthread_mutex_unlock(&m_accessLock);
399 //before we return
400 return StatusCode::FAILURE;
401 }
402
403 //Finally release the lock again
404 retVal = pthread_mutex_unlock(&m_accessLock);
405 if ( retVal != 0 ){
406 ATH_MSG_ERROR( "Unable to release access lock after updating event: " << fmterror(retVal) );
407 return StatusCode::FAILURE;
408 }
409
410 ATH_MSG_DEBUG( "Updated stream " << evtStreamID.StreamName()
411 << " with event Nr. " << evtStreamID.EventNumber()
412 << " from run Nr. " << evtStreamID.RunNumber() );
413
414 return StatusCode::SUCCESS;
415 }
const unsigned int NBYTESMAX
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition EventStream.h:86
const unsigned int NSTREAMMAX

Member Data Documentation

◆ ATLAS_THREAD_SAFE

pthread_mutex_t m_accessLock JiveXML::ONCRPCServerSvc::ATLAS_THREAD_SAFE {}
mutableprivate

Definition at line 100 of file ONCRPCServerSvc.h.

100{};

◆ m_eventStreamMap

EventStreamMap JiveXML::ONCRPCServerSvc::m_eventStreamMap
private

Definition at line 97 of file ONCRPCServerSvc.h.

◆ m_portNumber

int JiveXML::ONCRPCServerSvc::m_portNumber
private

Definition at line 94 of file ONCRPCServerSvc.h.

◆ m_runServerThread

bool JiveXML::ONCRPCServerSvc::m_runServerThread
private

Definition at line 107 of file ONCRPCServerSvc.h.

◆ m_ServerThreadHandle

pthread_t JiveXML::ONCRPCServerSvc::m_ServerThreadHandle {}
private

Definition at line 103 of file ONCRPCServerSvc.h.

103{};

The documentation for this class was generated from the following files: