ATLAS Offline Software
Public Member Functions | List of all members
JiveXML::ONCRPCServerSvc Class Reference

This athena service will create an ONC/RPC (aka SunRPC) server, that can provide the athena status as well as data strings from events. More...

#include <ONCRPCServerSvc.h>

Inheritance diagram for JiveXML::ONCRPCServerSvc:
Collaboration diagram for JiveXML::ONCRPCServerSvc:

Public Member Functions

 ONCRPCServerSvc (const std::string &name, ISvcLocator *sl)
 Default constructor. More...
 
virtual ~ONCRPCServerSvc ()
 Destructor. More...
 
virtual StatusCode initialize () override
 Gaudi default methods. More...
 
virtual StatusCode finalize () override
 Finalize - called once at the end. More...
 

Server methods

int m_portNumber
 
EventStreamMap m_eventStreamMap
 
pthread_mutex_t m_accessLock ATLAS_THREAD_SAFE
 
pthread_t m_ServerThreadHandle
 
bool m_runServerThread
 
virtual int GetState () const override
 get the Status of the application More...
 
virtual std::vector< std::string > GetStreamNames () const override
 get the names of all the streams More...
 
virtual const EventStreamID GetEventStreamID (const std::string &streamName) const override
 get the current EventStreamID for a particular stream More...
 
virtual const std::string GetEvent (const EventStreamID &evtStreamID) const override
 get the current event for a particular stream More...
 
virtual StatusCode UpdateEventForStream (const EventStreamID &evtStreamID, const std::string &event) override
 Put this event as new current event for stream given by name. More...
 
virtual void Message (const MSG::Level level, const std::string &mesg) const override
 This function is exposed to allow using athena messaging service from other threads. More...
 
virtual MSG::Level LogLevel () const override
 Get the logging level. More...
 
virtual bool GetRunServerFlag () const override
 The server thread will stop once this flag is set to false. More...
 
StatusCode StartServer ()
 Start the server thread. More...
 
StatusCode StopServer ()
 Stop the server thread. More...
 
virtual void ServerThreadStopped () override
 Callback when server thread terminates. More...
 

Detailed Description

This athena service will create an ONC/RPC (aka SunRPC) server, that can provide the athena status as well as data strings from events.

It can handle several different event streams, but for each stream only the latest event is available. This service is usually used by the Athena JiveXML algorithm providing XML events for an Atlantis Java client.

The server will create a single server thread, which loops over the sockets until the m_runServerThread flag is set to false. On each request, it will call the ONCRPCRequestHandler, which will generate a new thread for each request. The actual response to the requests happens in these ONCRPCDispatchThread. A list of the thread handles is kept in the a ThreadCollection, where each thread will add and remove itself. In the finalization stage, the server will wait for all dispatch threats to finish before exiting.

On all stages, the server thread will have a handle to the athena service via the thread specific key ServerSvcKey. It is thus possible to have many servers running simultaneously. Note however, that currently RPC will forbid you to start several servers with the same ONCRPCSERVERPROG.

Definition at line 42 of file ONCRPCServerSvc.h.

Constructor & Destructor Documentation

◆ ONCRPCServerSvc()

JiveXML::ONCRPCServerSvc::ONCRPCServerSvc ( const std::string &  name,
ISvcLocator *  sl 
)

Default constructor.

Constructor.

Definition at line 268 of file ONCRPCServerSvc.cxx.

268  :
269  base_class( name, sl )
270  {
271  //declare port number property
272  declareProperty("PortNumber",m_portNumber=0,"Port number to bind - assigned dynamically if set to zero [default: 0]");
273 
274  //Make sure ServerThread does not start unexpectedly
275  m_runServerThread = false ;
276 
277  ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
278 
279  }

◆ ~ONCRPCServerSvc()

JiveXML::ONCRPCServerSvc::~ONCRPCServerSvc ( )
virtual

Destructor.

Definition at line 284 of file ONCRPCServerSvc.cxx.

284  {
285 
286  ATH_MSG_VERBOSE( "Destructor() " );
287  }

Member Function Documentation

◆ finalize()

StatusCode JiveXML::ONCRPCServerSvc::finalize ( )
overridevirtual

Finalize - called once at the end.

  • shut down server

Definition at line 314 of file ONCRPCServerSvc.cxx.

314  {
315 
316  ATH_MSG_VERBOSE( "Finalize()" );
317 
318  //Stop the server thread and return status code
319  return StopServer();
320  }

◆ GetEvent()

const std::string JiveXML::ONCRPCServerSvc::GetEvent ( const EventStreamID evtStreamID) const
overridevirtual

get the current event for a particular stream

Return the event for a given stream.

Implements JiveXML::IEventServer.

Definition at line 241 of file ONCRPCServerSvc.cxx.

241  {
242 
243  //Obtain an exclusive access lock
244  int retVal = pthread_mutex_lock(&m_accessLock);
245  if ( retVal != 0 ){
246  ATH_MSG_ERROR( "Unable to obtain access lock to get event: " << fmterror(retVal) );
247  return std::string("");
248  }
249 
250  // Search the entry in the map
251  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
252 
253  //Release the lock
254  retVal = pthread_mutex_unlock(&m_accessLock);
255  if ( retVal != 0 )
256  ATH_MSG_ERROR( "Unable to release access lock after getting stream event: " << fmterror(retVal) );
257 
258  //If the element was not found return an empty string
259  if ( MapItr == m_eventStreamMap.end()) return std::string("");
260 
261  //Return a copy of the found event string
262  return std::string((*MapItr).second);
263  }

◆ GetEventStreamID()

const EventStreamID JiveXML::ONCRPCServerSvc::GetEventStreamID ( const std::string &  streamName) const
overridevirtual

get the current EventStreamID for a particular stream

Return the EventStreamID for the last event of a given stream.

Implements JiveXML::IEventServer.

Definition at line 214 of file ONCRPCServerSvc.cxx.

214  {
215 
216  //Obtain an exclusive access lock
217  int retVal = pthread_mutex_lock(&m_accessLock);
218  if ( retVal != 0 ){
219  ATH_MSG_ERROR( "Unable to obtain access lock to get stream ID: " << fmterror(retVal) );
220  return EventStreamID("");
221  }
222 
223  // Search the entry in the map
224  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
225 
226  //Release the lock
227  retVal = pthread_mutex_unlock(&m_accessLock);
228  if ( retVal != 0 )
229  ATH_MSG_ERROR( "Unable to release access lock after getting stream ID: " << fmterror(retVal) );
230 
231  //If the element was not found return an invalid ID
232  if ( MapItr == m_eventStreamMap.end()) return EventStreamID("");
233 
234  //Return the found event stream identifier
235  return (*MapItr).first;
236  }

◆ GetRunServerFlag()

virtual bool JiveXML::ONCRPCServerSvc::GetRunServerFlag ( ) const
inlineoverridevirtual

The server thread will stop once this flag is set to false.

Implements JiveXML::IEventServer.

Definition at line 82 of file ONCRPCServerSvc.h.

82 { return m_runServerThread; };

◆ GetState()

int JiveXML::ONCRPCServerSvc::GetState ( ) const
overridevirtual

get the Status of the application

Return the current athena state.

at the moment simply return state machine state - we might consider checking for state transitions (m_state != m_targetState) and return a "busy" value. At the moment, all state transitions should be instantanious, so this is not necessary

Implements JiveXML::IEventServer.

Definition at line 166 of file ONCRPCServerSvc.cxx.

166  {
173  return (int)FSMState();
174  }

◆ GetStreamNames()

std::vector< std::string > JiveXML::ONCRPCServerSvc::GetStreamNames ( ) const
overridevirtual

get the names of all the streams

Return an array with all the stream names.

Implements JiveXML::IEventServer.

Definition at line 179 of file ONCRPCServerSvc.cxx.

179  {
180 
181  //Create a vector that can be returned
182  std::vector<std::string> StreamNames;
183 
184  //Obtain an exclusive access lock
185  int retVal = pthread_mutex_lock(&m_accessLock);
186  if ( retVal != 0 ){
187  ATH_MSG_ERROR( "Unable to obtain access lock to get stream names: " << fmterror(retVal) );
188  return StreamNames;
189  }
190 
191  // Iterate over map to get entries
192  EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
193  for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
194 
195  //Get the EventStreamID object
196  EventStreamID EvtStrID = (*MapItr).first;
197 
198  //Add the name of this EventStreamID to the list of stream names
199  StreamNames.push_back(EvtStrID.StreamName());
200  }
201 
202  //Release the lock
203  retVal = pthread_mutex_unlock(&m_accessLock);
204  if ( retVal != 0 )
205  ATH_MSG_ERROR( "Unable to release access lock after getting stream names: " << fmterror(retVal) );
206 
207  //Return the list of names
208  return StreamNames;
209  }

◆ initialize()

StatusCode JiveXML::ONCRPCServerSvc::initialize ( )
overridevirtual

Gaudi default methods.

Initialize - called once in beginning.

  • create server

Create the server itself

Definition at line 293 of file ONCRPCServerSvc.cxx.

293  {
294 
295  //Initialize Service base
296  if (Service::initialize().isFailure()) return StatusCode::FAILURE;
297 
298  //Initialize message stream level
299  msg().setLevel(outputLevel());
300  ATH_MSG_VERBOSE( "Initialize()" );
301  ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
302 
306  ATH_CHECK( StartServer() );
307  return StatusCode::SUCCESS;
308  }

◆ LogLevel()

MSG::Level JiveXML::ONCRPCServerSvc::LogLevel ( ) const
overridevirtual

Get the logging level.

Return the logging level - possibly to detect whether it is worth retrieving the information to be logged.

Implements JiveXML::IMessage.

Definition at line 157 of file ONCRPCServerSvc.cxx.

157  {
158  //return the current logging level
159  return msg().level();
160  }

◆ Message()

void JiveXML::ONCRPCServerSvc::Message ( const MSG::Level  level,
const std::string &  mesg 
) const
overridevirtual

This function is exposed to allow using athena messaging service from other threads.

Deliver a message - possibly from another thread - to the athena msgSvc;.

Implements JiveXML::IMessage.

Definition at line 144 of file ONCRPCServerSvc.cxx.

146  {
147  //Deliver message
148  msg() << level << mesg << endmsg;
149  //Spit it out immediately
150  msg().flush();
151  }

◆ ServerThreadStopped()

void JiveXML::ONCRPCServerSvc::ServerThreadStopped ( )
overridevirtual

Callback when server thread terminates.

This callback will get called whenever the server thread stops.

Implements JiveXML::IServer.

Definition at line 130 of file ONCRPCServerSvc.cxx.

130  {
131  //Check if the thread stopped while the run server flag was set
132  if (m_runServerThread){
133  //Deliver an error message
134  ATH_MSG_ERROR( "Server thread stopped while run-server-flag is set!" );
135  //since we can't return statusCode failure (we're in a different thread!)
136  //set the flag to mark the server as dead
137  m_runServerThread = false;
138  }
139  }

◆ StartServer()

StatusCode JiveXML::ONCRPCServerSvc::StartServer ( )

Start the server thread.

Create the server by.

  • passing a set of arguments including a this-pointer
  • setting the thread-running flag
  • starting the server thread

Definition at line 33 of file ONCRPCServerSvc.cxx.

33  {
34 
35  //Initialize access lock mechanism to ensure that the data map is never
36  //accessed by more than one thread at the same time. NULL means default
37  int retVal = pthread_mutex_init(&m_accessLock,NULL);
38  if (retVal != 0){
39  ATH_MSG_ERROR( "Unable to initialize access lock while starting server: " << fmterror(retVal) );
40  return StatusCode::FAILURE;
41  }
42 
43  //The arguments passed on to the server - create new object on the heap that
44  //is persistent through the lifetime of the thread
46 
47  //set runServer flag to true, so the thread will start
48  m_runServerThread = true ;
49 
50  //create thread itself
51  if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
52 
53  //Create thread failed
54  ATH_MSG_ERROR( "Thread creation failed" );
55  return StatusCode::FAILURE;
56  }
57 
58  return StatusCode::SUCCESS;
59 
60  }

◆ StopServer()

StatusCode JiveXML::ONCRPCServerSvc::StopServer ( )

Stop the server thread.

Stop the server by.

  • unsetting the thread-running flag
  • waiting till the server thread has finished
  • destroying the thread handle

Ping the server which will cause another request Otherwise the server won't update its loop condition

Definition at line 68 of file ONCRPCServerSvc.cxx.

68  {
69 
70  ATH_MSG_VERBOSE( "StopServer()" );
71 
77  //Create a client - this will already cause an update of the file
78  //descriptors on the sockets
79  CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
80  if (!client){
81  ATH_MSG_ERROR( "Unable to create shutdown client for local server" );
82  return StatusCode::FAILURE;
83  }
84 
85  // Next unset the runServerThread flag, which will cause the loop to stop
86  // This needs to happen after client creation, otherwise the next call won't
87  // be serverd anymore
88  m_runServerThread = false ;
89 
90 // xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
91 #if __GNUC__ >= 8
92 # pragma GCC diagnostic push
93 # pragma GCC diagnostic ignored "-Wcast-function-type"
94 #endif
95  // Now issue the call with a timeout
96  struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
97  clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
98 #if __GNUC__ >= 8
99 # pragma GCC diagnostic pop
100 #endif
101 
102  // A pointer to the return value of the thread
103  void* ret;
104  // wait till the server thread has finished
105  ATH_MSG_INFO( "Waiting for server thread to terminate ..." );
106  pthread_join(m_ServerThreadHandle, &ret);
107  ATH_MSG_INFO( " ... finished server thread" );
108 
109  //check if there was a return value
110  if (ret){
111  //Get the return value
112  unsigned long NRequests = *(unsigned long*)ret;
113  ATH_MSG_DEBUG( "Server thread stopped after handling " << NRequests << " requests" );
114  } else
115  ATH_MSG_WARNING( "Server thread had stopped unexpectedly" );
116 
117  //Destroy the access lock
118  int retVal = pthread_mutex_destroy(&m_accessLock);
119  if (retVal != 0){
120  ATH_MSG_ERROR( "Unable to destroy access lock after stopping server: " << fmterror(retVal) );
121  return StatusCode::FAILURE;
122  }
123 
124  return StatusCode::SUCCESS;
125  }

◆ UpdateEventForStream()

StatusCode JiveXML::ONCRPCServerSvc::UpdateEventForStream ( const EventStreamID evtStreamID,
const std::string &  event 
)
overridevirtual

Put this event as new current event for stream given by name.

Implementation of IServerSvc method.

Get one event and put it as the new event for the this stream, which is identified by EventStreamID

Implements JiveXML::IEventReceiver.

Definition at line 326 of file ONCRPCServerSvc.cxx.

326  {
327 
328  ATH_MSG_VERBOSE( "UpdateEventForStream()" );
329 
330  //Make sure the server is still running
331  if (! m_runServerThread ){
332  ATH_MSG_ERROR( "Server thread is not running - refusing to update event" );
333  return StatusCode::FAILURE;
334  }
335 
336  //Check that the event stream id is valid
337  if (!evtStreamID.isValid()){
338  ATH_MSG_ERROR( "Invalid event stream identifier - cannot add event" );
339  return StatusCode::FAILURE;
340  }
341 
342  //Make sure we don't have already exceeded the maximum number of streams
343  if (m_eventStreamMap.size() > NSTREAMMAX ){
344  ATH_MSG_ERROR( "Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event" );
345  return StatusCode::FAILURE;
346  }
347 
348  //Make sure the event is not larger than the allowed maximal size
349  if (event.length() > NBYTESMAX ){
350  ATH_MSG_ERROR( "Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event" );
351  return StatusCode::FAILURE;
352  }
353 
354  //Make sure we are the only one accessing the data right now, by trying to
355  //obtain a lock. If the lock can not be obtained after a certain time, an
356  //error is reported
357 
358  //Timeout of 5 second and 0 nanoseconds
359  struct timespec timeout = { 5, 0 };
360  //Try to obtain the lock
361 #ifndef __APPLE__
362  int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
363 #else
364  int retVal = pthread_mutex_lock(&m_accessLock);
365 #endif
366  if ( retVal != 0 ){
367  ATH_MSG_ERROR( "Unable to obtain access lock to update event: " << fmterror(retVal) );
368  return StatusCode::FAILURE;
369  }
370 
371  //Using try/catch to ensure the mutex gets unlocked in any case
372  try {
373 
374  //Using std::map::operator[] and std::map::insert() will create a new event
375  //if it did not exist, otherwise just replace the existing entry (making a
376  //copy of the std::string) but would not update the key which holds new
377  //event/run number. Therefore delete existing entry first.
378 
379  //Delete old entry if there is one
380  EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
381  if (OldEvtItr != m_eventStreamMap.end())
382  m_eventStreamMap.erase(OldEvtItr);
383 
384  //Now add the new event
385  m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
386 
387  } catch ( const std::exception& e ) {
388  ATH_MSG_ERROR( "Exception caught while updating event for stream " << evtStreamID.StreamName()
389  << ": " << e.what() );
390  //Also release the lock in this case
391  pthread_mutex_unlock(&m_accessLock);
392  //before we return
393  return StatusCode::FAILURE;
394  }
395 
396  //Finally release the lock again
397  retVal = pthread_mutex_unlock(&m_accessLock);
398  if ( retVal != 0 ){
399  ATH_MSG_ERROR( "Unable to release access lock after updating event: " << fmterror(retVal) );
400  return StatusCode::FAILURE;
401  }
402 
403  ATH_MSG_DEBUG( "Updated stream " << evtStreamID.StreamName()
404  << " with event Nr. " << evtStreamID.EventNumber()
405  << " from run Nr. " << evtStreamID.RunNumber() );
406 
407  return StatusCode::SUCCESS;
408  }

Member Data Documentation

◆ ATLAS_THREAD_SAFE

pthread_mutex_t m_accessLock JiveXML::ONCRPCServerSvc::ATLAS_THREAD_SAFE
mutableprivate

Definition at line 100 of file ONCRPCServerSvc.h.

◆ m_eventStreamMap

EventStreamMap JiveXML::ONCRPCServerSvc::m_eventStreamMap
private

Definition at line 97 of file ONCRPCServerSvc.h.

◆ m_portNumber

int JiveXML::ONCRPCServerSvc::m_portNumber
private

Definition at line 94 of file ONCRPCServerSvc.h.

◆ m_runServerThread

bool JiveXML::ONCRPCServerSvc::m_runServerThread
private

Definition at line 107 of file ONCRPCServerSvc.h.

◆ m_ServerThreadHandle

pthread_t JiveXML::ONCRPCServerSvc::m_ServerThreadHandle
private

Definition at line 103 of file ONCRPCServerSvc.h.


The documentation for this class was generated from the following files:
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
JiveXML::ONCRPCServerSvc::m_ServerThreadHandle
pthread_t m_ServerThreadHandle
Definition: ONCRPCServerSvc.h:103
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
initialize
void initialize()
Definition: run_EoverP.cxx:894
JiveXML::ServerThreadArguments
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
Definition: ONCRPCServerThreads.h:22
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
python.iconfTool.models.loaders.level
level
Definition: loaders.py:20
rerun_display.client
client
Definition: rerun_display.py:31
StateLessPT_NewConfig.StreamNames
StreamNames
Definition: StateLessPT_NewConfig.py:315
JiveXML::ONCRPCServerThread
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
Definition: ONCRPCServerThreads.cxx:252
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventInfoWrite.StreamName
string StreamName
Definition: EventInfoWrite.py:28
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
ret
T ret(T t)
Definition: rootspy.cxx:260
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
calibdata.exception
exception
Definition: calibdata.py:496
JiveXML::ONCRPCServerSvc::StartServer
StatusCode StartServer()
Start the server thread.
Definition: ONCRPCServerSvc.cxx:33
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
JiveXML::EventStreamPair
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition: EventStream.h:86
JiveXML::NBYTESMAX
const unsigned int NBYTESMAX
Definition: ONCRPCServer.h:40
ONCRPCSERVERPROG
#define ONCRPCSERVERPROG
Definition: ONCRPCServer.h:30
JiveXML::ONCRPCServerSvc::m_eventStreamMap
EventStreamMap m_eventStreamMap
Definition: ONCRPCServerSvc.h:97
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
JiveXML::ONCRPCServerSvc::m_runServerThread
bool m_runServerThread
Definition: ONCRPCServerSvc.h:107
python.TrigPSCPythonDbSetup.outputLevel
outputLevel
Definition: TrigPSCPythonDbSetup.py:30
JiveXML::ONCRPCServerSvc::StopServer
StatusCode StopServer()
Stop the server thread.
Definition: ONCRPCServerSvc.cxx:68
ONCRPCSERVERVERS
#define ONCRPCSERVERVERS
Definition: ONCRPCServer.h:31
DiTauMassTools::MaxHistStrategyV2::e
e
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:26
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
declareProperty
#define declareProperty(n, p, h)
Definition: BaseFakeBkgTool.cxx:15
python.TrigInDetArtSteps.timeout
timeout
Definition: TrigInDetArtSteps.py:35
JiveXML::ONCRPCServerSvc::m_portNumber
int m_portNumber
Definition: ONCRPCServerSvc.h:94
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
JiveXML::NSTREAMMAX
const unsigned int NSTREAMMAX
Definition: ONCRPCServer.h:38