ATLAS Offline Software
ONCRPCServerSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "GaudiKernel/ServiceHandle.h"
6 #include "GaudiKernel/StateMachine.h"
7 #include "GaudiKernel/IAppMgrUI.h"
8 #include "GaudiKernel/GaudiHandle.h"
9 
13 #include "JiveXML/ONCRPCXDRProcs.h"
14 #include "JiveXML/ONCRPCServer.h"
15 
16 
17 namespace {
18  std::string fmterror(int code) {
19  char buf[256];
20  return std::string(strerror_r(code, buf, sizeof(buf)));
21  }
22 }
23 
24 
25 namespace JiveXML {
26 
34 
35  //Initialize access lock mechanism to ensure that the data map is never
36  //accessed by more than one thread at the same time. NULL means default
37  int retVal = pthread_mutex_init(&m_accessLock,NULL);
38  if (retVal != 0){
39  ATH_MSG_ERROR( "Unable to initialize access lock while starting server: " << fmterror(retVal) );
40  return StatusCode::FAILURE;
41  }
42 
43  //The arguments passed on to the server - create new object on the heap that
44  //is persistent through the lifetime of the thread
46 
47  //set runServer flag to true, so the thread will start
48  m_runServerThread = true ;
49 
50  //create thread itself
51  if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
52 
53  //Create thread failed
54  ATH_MSG_ERROR( "Thread creation failed" );
55  return StatusCode::FAILURE;
56  }
57 
58  return StatusCode::SUCCESS;
59 
60  }
61 
69 
70  ATH_MSG_VERBOSE( "StopServer()" );
71 
77  //Create a client - this will already cause an update of the file
78  //descriptors on the sockets
79  CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
80  if (!client){
81  ATH_MSG_ERROR( "Unable to create shutdown client for local server" );
82  return StatusCode::FAILURE;
83  }
84 
85  // Next unset the runServerThread flag, which will cause the loop to stop
86  // This needs to happen after client creation, otherwise the next call won't
87  // be serverd anymore
88  m_runServerThread = false ;
89 
90 // xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
91 #if __GNUC__ >= 8
92 # pragma GCC diagnostic push
93 # pragma GCC diagnostic ignored "-Wcast-function-type"
94 #endif
95  // Now issue the call with a timeout
96  struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
97  clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
98 #if __GNUC__ >= 8
99 # pragma GCC diagnostic pop
100 #endif
101 
102  // A pointer to the return value of the thread
103  void* ret;
104  // wait till the server thread has finished
105  ATH_MSG_INFO( "Waiting for server thread to terminate ..." );
106  pthread_join(m_ServerThreadHandle, &ret);
107  ATH_MSG_INFO( " ... finished server thread" );
108 
109  //check if there was a return value
110  if (ret){
111  //Get the return value
112  unsigned long NRequests = *(unsigned long*)ret;
113  ATH_MSG_DEBUG( "Server thread stopped after handling " << NRequests << " requests" );
114  } else
115  ATH_MSG_WARNING( "Server thread had stopped unexpectedly" );
116 
117  //Destroy the access lock
118  int retVal = pthread_mutex_destroy(&m_accessLock);
119  if (retVal != 0){
120  ATH_MSG_ERROR( "Unable to destroy access lock after stopping server: " << fmterror(retVal) );
121  return StatusCode::FAILURE;
122  }
123 
124  return StatusCode::SUCCESS;
125  }
126 
131  //Check if the thread stopped while the run server flag was set
132  if (m_runServerThread){
133  //Deliver an error message
134  ATH_MSG_ERROR( "Server thread stopped while run-server-flag is set!" );
135  //since we can't return statusCode failure (we're in a different thread!)
136  //set the flag to mark the server as dead
137  m_runServerThread = false;
138  }
139  }
140 
145  const std::string& mesg) const
146  {
147  //Deliver message
148  msg() << level << mesg << endmsg;
149  //Spit it out immediately
150  msg().flush();
151  }
152 
158  //return the current logging level
159  return msg().level();
160  }
161 
162 
173  return (int)FSMState();
174  }
175 
179  std::vector<std::string> ONCRPCServerSvc::GetStreamNames() const {
180 
181  //Create a vector that can be returned
182  std::vector<std::string> StreamNames;
183 
184  //Obtain an exclusive access lock
185  int retVal = pthread_mutex_lock(&m_accessLock);
186  if ( retVal != 0 ){
187  ATH_MSG_ERROR( "Unable to obtain access lock to get stream names: " << fmterror(retVal) );
188  return StreamNames;
189  }
190 
191  // Iterate over map to get entries
192  EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
193  for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
194 
195  //Get the EventStreamID object
196  EventStreamID EvtStrID = (*MapItr).first;
197 
198  //Add the name of this EventStreamID to the list of stream names
199  StreamNames.push_back(EvtStrID.StreamName());
200  }
201 
202  //Release the lock
203  retVal = pthread_mutex_unlock(&m_accessLock);
204  if ( retVal != 0 )
205  ATH_MSG_ERROR( "Unable to release access lock after getting stream names: " << fmterror(retVal) );
206 
207  //Return the list of names
208  return StreamNames;
209  }
210 
214  const EventStreamID ONCRPCServerSvc::GetEventStreamID( const std::string& StreamName) const {
215 
216  //Obtain an exclusive access lock
217  int retVal = pthread_mutex_lock(&m_accessLock);
218  if ( retVal != 0 ){
219  ATH_MSG_ERROR( "Unable to obtain access lock to get stream ID: " << fmterror(retVal) );
220  return EventStreamID("");
221  }
222 
223  // Search the entry in the map
224  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
225 
226  //Release the lock
227  retVal = pthread_mutex_unlock(&m_accessLock);
228  if ( retVal != 0 )
229  ATH_MSG_ERROR( "Unable to release access lock after getting stream ID: " << fmterror(retVal) );
230 
231  //If the element was not found return an invalid ID
232  if ( MapItr == m_eventStreamMap.end()) return EventStreamID("");
233 
234  //Return the found event stream identifier
235  return (*MapItr).first;
236  }
237 
241  const std::string ONCRPCServerSvc::GetEvent( const EventStreamID& evtStreamID ) const {
242 
243  //Obtain an exclusive access lock
244  int retVal = pthread_mutex_lock(&m_accessLock);
245  if ( retVal != 0 ){
246  ATH_MSG_ERROR( "Unable to obtain access lock to get event: " << fmterror(retVal) );
247  return std::string("");
248  }
249 
250  // Search the entry in the map
251  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
252 
253  //Release the lock
254  retVal = pthread_mutex_unlock(&m_accessLock);
255  if ( retVal != 0 )
256  ATH_MSG_ERROR( "Unable to release access lock after getting stream event: " << fmterror(retVal) );
257 
258  //If the element was not found return an empty string
259  if ( MapItr == m_eventStreamMap.end()) return std::string("");
260 
261  //Return a copy of the found event string
262  return std::string((*MapItr).second);
263  }
264 
268  ONCRPCServerSvc::ONCRPCServerSvc( const std::string& name, ISvcLocator* sl) :
269  base_class( name, sl )
270  {
271  //declare port number property
272  declareProperty("PortNumber",m_portNumber=0,"Port number to bind - assigned dynamically if set to zero [default: 0]");
273 
274  //Make sure ServerThread does not start unexpectedly
275  m_runServerThread = false ;
276 
277  ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
278 
279  }
280 
285 
286  ATH_MSG_VERBOSE( "Destructor() " );
287  }
288 
294 
295  //Initialize Service base
296  if (Service::initialize().isFailure()) return StatusCode::FAILURE;
297 
298  //Initialize message stream level
299  msg().setLevel(outputLevel());
300  ATH_MSG_VERBOSE( "Initialize()" );
301  ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
302 
306  ATH_CHECK( StartServer() );
307  return StatusCode::SUCCESS;
308  }
309 
315 
316  ATH_MSG_VERBOSE( "Finalize()" );
317 
318  //Stop the server thread and return status code
319  return StopServer();
320  }
321 
326  StatusCode ONCRPCServerSvc::UpdateEventForStream( const EventStreamID& evtStreamID, const std::string & event) {
327 
328  ATH_MSG_VERBOSE( "UpdateEventForStream()" );
329 
330  //Make sure the server is still running
331  if (! m_runServerThread ){
332  ATH_MSG_ERROR( "Server thread is not running - refusing to update event" );
333  return StatusCode::FAILURE;
334  }
335 
336  //Check that the event stream id is valid
337  if (!evtStreamID.isValid()){
338  ATH_MSG_ERROR( "Invalid event stream identifier - cannot add event" );
339  return StatusCode::FAILURE;
340  }
341 
342  //Make sure we don't have already exceeded the maximum number of streams
343  if (m_eventStreamMap.size() > NSTREAMMAX ){
344  ATH_MSG_ERROR( "Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event" );
345  return StatusCode::FAILURE;
346  }
347 
348  //Make sure the event is not larger than the allowed maximal size
349  if (event.length() > NBYTESMAX ){
350  ATH_MSG_ERROR( "Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event" );
351  return StatusCode::FAILURE;
352  }
353 
354  //Make sure we are the only one accessing the data right now, by trying to
355  //obtain a lock. If the lock can not be obtained after a certain time, an
356  //error is reported
357 
358  //Timeout of 5 second and 0 nanoseconds
359  struct timespec timeout = { 5, 0 };
360  //Try to obtain the lock
361 #ifndef __APPLE__
362  int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
363 #else
364  int retVal = pthread_mutex_lock(&m_accessLock);
365 #endif
366  if ( retVal != 0 ){
367  ATH_MSG_ERROR( "Unable to obtain access lock to update event: " << fmterror(retVal) );
368  return StatusCode::FAILURE;
369  }
370 
371  //Using try/catch to ensure the mutex gets unlocked in any case
372  try {
373 
374  //Using std::map::operator[] and std::map::insert() will create a new event
375  //if it did not exist, otherwise just replace the existing entry (making a
376  //copy of the std::string) but would not update the key which holds new
377  //event/run number. Therefore delete existing entry first.
378 
379  //Delete old entry if there is one
380  EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
381  if (OldEvtItr != m_eventStreamMap.end())
382  m_eventStreamMap.erase(OldEvtItr);
383 
384  //Now add the new event
385  m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
386 
387  } catch ( const std::exception& e ) {
388  ATH_MSG_ERROR( "Exception caught while updating event for stream " << evtStreamID.StreamName()
389  << ": " << e.what() );
390  //Also release the lock in this case
391  pthread_mutex_unlock(&m_accessLock);
392  //before we return
393  return StatusCode::FAILURE;
394  }
395 
396  //Finally release the lock again
397  retVal = pthread_mutex_unlock(&m_accessLock);
398  if ( retVal != 0 ){
399  ATH_MSG_ERROR( "Unable to release access lock after updating event: " << fmterror(retVal) );
400  return StatusCode::FAILURE;
401  }
402 
403  ATH_MSG_DEBUG( "Updated stream " << evtStreamID.StreamName()
404  << " with event Nr. " << evtStreamID.EventNumber()
405  << " from run Nr. " << evtStreamID.RunNumber() );
406 
407  return StatusCode::SUCCESS;
408  }
409 
410 } //namespace
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
JiveXML::ONCRPCServerSvc::m_ServerThreadHandle
pthread_t m_ServerThreadHandle
Definition: ONCRPCServerSvc.h:103
JiveXML::ONCRPCServerSvc::GetEvent
virtual const std::string GetEvent(const EventStreamID &evtStreamID) const override
get the current event for a particular stream
Definition: ONCRPCServerSvc.cxx:241
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
JiveXML::EventStreamID::EventNumber
unsigned long EventNumber() const
Definition: EventStream.h:41
initialize
void initialize()
Definition: run_EoverP.cxx:894
ONCRPCThreadCollection.h
JiveXML::ServerThreadArguments
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
Definition: ONCRPCServerThreads.h:22
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
JiveXML::ONCRPCServerSvc::ServerThreadStopped
virtual void ServerThreadStopped() override
Callback when server thread terminates.
Definition: ONCRPCServerSvc.cxx:130
ONCRPCServer.h
python.iconfTool.models.loaders.level
level
Definition: loaders.py:20
rerun_display.client
client
Definition: rerun_display.py:31
StateLessPT_NewConfig.StreamNames
StreamNames
Definition: StateLessPT_NewConfig.py:315
JiveXML::ONCRPCServerThread
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
Definition: ONCRPCServerThreads.cxx:252
ONCRPCServerSvc.h
JiveXML::ServerThreadArguments_t
Definition: ONCRPCServerThreads.h:23
JiveXML::ONCRPCServerSvc::GetStreamNames
virtual std::vector< std::string > GetStreamNames() const override
get the names of all the streams
Definition: ONCRPCServerSvc.cxx:179
JiveXML::ONCRPCServerSvc::ONCRPCServerSvc
ONCRPCServerSvc(const std::string &name, ISvcLocator *sl)
Default constructor.
Definition: ONCRPCServerSvc.cxx:268
TrigConf::MSGTC::Level
Level
Definition: Trigger/TrigConfiguration/TrigConfBase/TrigConfBase/MsgStream.h:21
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventInfoWrite.StreamName
string StreamName
Definition: EventInfoWrite.py:28
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
ret
T ret(T t)
Definition: rootspy.cxx:260
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
JiveXML::ONCRPCServerSvc::finalize
virtual StatusCode finalize() override
Finalize - called once at the end.
Definition: ONCRPCServerSvc.cxx:314
calibdata.exception
exception
Definition: calibdata.py:496
JiveXML::ONCRPCServerSvc::StartServer
StatusCode StartServer()
Start the server thread.
Definition: ONCRPCServerSvc.cxx:33
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
JiveXML::EventStreamID::isValid
bool isValid() const
Definition: EventStream.h:55
JiveXML::EventStreamPair
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition: EventStream.h:86
ONCRPCXDRProcs.h
JiveXML::NBYTESMAX
const unsigned int NBYTESMAX
Definition: ONCRPCServer.h:40
ONCRPCServerThreads.h
JiveXML::EventStreamID
For the client-server communication, each event is uniquely identified by the run number,...
Definition: EventStream.h:19
ONCRPCSERVERPROG
#define ONCRPCSERVERPROG
Definition: ONCRPCServer.h:30
JiveXML
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
Definition: BadLArRetriever.cxx:21
JiveXML::ONCRPCServerSvc::GetState
virtual int GetState() const override
get the Status of the application
Definition: ONCRPCServerSvc.cxx:166
JiveXML::EventStreamID::StreamName
std::string StreamName() const
Definition: EventStream.h:43
JiveXML::ONCRPCServerSvc::Message
virtual void Message(const MSG::Level level, const std::string &mesg) const override
This function is exposed to allow using athena messaging service from other threads.
Definition: ONCRPCServerSvc.cxx:144
pmontree.code
code
Definition: pmontree.py:443
JiveXML::ONCRPCServerSvc::m_eventStreamMap
EventStreamMap m_eventStreamMap
Definition: ONCRPCServerSvc.h:97
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
JiveXML::ONCRPCServerSvc::m_runServerThread
bool m_runServerThread
Definition: ONCRPCServerSvc.h:107
python.TrigPSCPythonDbSetup.outputLevel
outputLevel
Definition: TrigPSCPythonDbSetup.py:30
JiveXML::ONCRPCServerSvc::initialize
virtual StatusCode initialize() override
Gaudi default methods.
Definition: ONCRPCServerSvc.cxx:293
JiveXML::ONCRPCServerSvc::StopServer
StatusCode StopServer()
Stop the server thread.
Definition: ONCRPCServerSvc.cxx:68
ONCRPCSERVERVERS
#define ONCRPCSERVERVERS
Definition: ONCRPCServer.h:31
DiTauMassTools::MaxHistStrategyV2::e
e
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:26
JiveXML::ONCRPCServerSvc::~ONCRPCServerSvc
virtual ~ONCRPCServerSvc()
Destructor.
Definition: ONCRPCServerSvc.cxx:284
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
JiveXML::ONCRPCServerSvc::GetEventStreamID
virtual const EventStreamID GetEventStreamID(const std::string &streamName) const override
get the current EventStreamID for a particular stream
Definition: ONCRPCServerSvc.cxx:214
declareProperty
#define declareProperty(n, p, h)
Definition: BaseFakeBkgTool.cxx:15
JiveXML::ONCRPCServerSvc::UpdateEventForStream
virtual StatusCode UpdateEventForStream(const EventStreamID &evtStreamID, const std::string &event) override
Put this event as new current event for stream given by name.
Definition: ONCRPCServerSvc.cxx:326
JiveXML::ONCRPCServerSvc::LogLevel
virtual MSG::Level LogLevel() const override
Get the logging level.
Definition: ONCRPCServerSvc.cxx:157
JiveXML::EventStreamID::RunNumber
unsigned int RunNumber() const
Definition: EventStream.h:42
python.TrigInDetArtSteps.timeout
timeout
Definition: TrigInDetArtSteps.py:35
JiveXML::ONCRPCServerSvc::m_portNumber
int m_portNumber
Definition: ONCRPCServerSvc.h:94
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
JiveXML::NSTREAMMAX
const unsigned int NSTREAMMAX
Definition: ONCRPCServer.h:38