Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
ONCRPCServerSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "GaudiKernel/ServiceHandle.h"
6 #include "GaudiKernel/StateMachine.h"
7 #include "GaudiKernel/IAppMgrUI.h"
8 #include "GaudiKernel/GaudiHandle.h"
9 
13 #include "JiveXML/ONCRPCXDRProcs.h"
14 #include "JiveXML/ONCRPCServer.h"
15 
16 
17 namespace {
18  std::string fmterror(int code) {
19  char buf[256];
20  return std::string(strerror_r(code, buf, sizeof(buf)));
21  }
22 }
23 
24 
25 namespace JiveXML {
26 
34 
35  //Initialize access lock mechanism to ensure that the data map is never
36  //accessed by more than one thread at the same time. NULL means default
37  int retVal = pthread_mutex_init(&m_accessLock,NULL);
38  if (retVal != 0){
39  ATH_MSG_ERROR( "Unable to initialize access lock while starting server: " << fmterror(retVal) );
40  return StatusCode::FAILURE;
41  }
42 
43  //The arguments passed on to the server - create new object on the heap that
44  //is persistent through the lifetime of the thread
46 
47  //set runServer flag to true, so the thread will start
48  m_runServerThread = true ;
49 
50  //create thread itself
51  if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
52 
53  //Create thread failed
54  ATH_MSG_ERROR( "Thread creation failed" );
55  return StatusCode::FAILURE;
56  }
57 
58  return StatusCode::SUCCESS;
59 
60  }
61 
69 
70  ATH_MSG_VERBOSE( "StopServer()" );
71 
77  //Create a client - this will already cause an update of the file
78  //descriptors on the sockets
79  CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
80  if (!client){
81  ATH_MSG_ERROR( "Unable to create shutdown client for local server" );
82  return StatusCode::FAILURE;
83  }
84 
85  // Next unset the runServerThread flag, which will cause the loop to stop
86  // This needs to happen after client creation, otherwise the next call won't
87  // be serverd anymore
88  m_runServerThread = false ;
89 
90 // xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
91 #if __GNUC__ >= 8
92 # pragma GCC diagnostic push
93 # pragma GCC diagnostic ignored "-Wcast-function-type"
94 #endif
95 #if defined(__clang__) && __clang_major__ >= 19
96 # pragma clang diagnostic push
97 # pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
98 #endif
99  // Now issue the call with a timeout
100  struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
101  clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
102 #if defined(__clang__) && __clang_major__ >= 19
103 # pragma clang diagnostic pop
104 #endif
105 #if __GNUC__ >= 8
106 # pragma GCC diagnostic pop
107 #endif
108 
109  // A pointer to the return value of the thread
110  void* ret;
111  // wait till the server thread has finished
112  ATH_MSG_INFO( "Waiting for server thread to terminate ..." );
113  pthread_join(m_ServerThreadHandle, &ret);
114  ATH_MSG_INFO( " ... finished server thread" );
115 
116  //check if there was a return value
117  if (ret){
118  //Get the return value
119  unsigned long NRequests = *(unsigned long*)ret;
120  ATH_MSG_DEBUG( "Server thread stopped after handling " << NRequests << " requests" );
121  } else
122  ATH_MSG_WARNING( "Server thread had stopped unexpectedly" );
123 
124  //Destroy the access lock
125  int retVal = pthread_mutex_destroy(&m_accessLock);
126  if (retVal != 0){
127  ATH_MSG_ERROR( "Unable to destroy access lock after stopping server: " << fmterror(retVal) );
128  return StatusCode::FAILURE;
129  }
130 
131  return StatusCode::SUCCESS;
132  }
133 
138  //Check if the thread stopped while the run server flag was set
139  if (m_runServerThread){
140  //Deliver an error message
141  ATH_MSG_ERROR( "Server thread stopped while run-server-flag is set!" );
142  //since we can't return statusCode failure (we're in a different thread!)
143  //set the flag to mark the server as dead
144  m_runServerThread = false;
145  }
146  }
147 
152  const std::string& mesg) const
153  {
154  //Deliver message
155  msg() << level << mesg << endmsg;
156  //Spit it out immediately
157  msg().flush();
158  }
159 
165  //return the current logging level
166  return msg().level();
167  }
168 
169 
180  return (int)FSMState();
181  }
182 
186  std::vector<std::string> ONCRPCServerSvc::GetStreamNames() const {
187 
188  //Create a vector that can be returned
189  std::vector<std::string> StreamNames;
190 
191  //Obtain an exclusive access lock
192  int retVal = pthread_mutex_lock(&m_accessLock);
193  if ( retVal != 0 ){
194  ATH_MSG_ERROR( "Unable to obtain access lock to get stream names: " << fmterror(retVal) );
195  return StreamNames;
196  }
197 
198  // Iterate over map to get entries
199  EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
200  for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
201 
202  //Get the EventStreamID object
203  EventStreamID EvtStrID = (*MapItr).first;
204 
205  //Add the name of this EventStreamID to the list of stream names
206  StreamNames.push_back(EvtStrID.StreamName());
207  }
208 
209  //Release the lock
210  retVal = pthread_mutex_unlock(&m_accessLock);
211  if ( retVal != 0 )
212  ATH_MSG_ERROR( "Unable to release access lock after getting stream names: " << fmterror(retVal) );
213 
214  //Return the list of names
215  return StreamNames;
216  }
217 
221  const EventStreamID ONCRPCServerSvc::GetEventStreamID( const std::string& StreamName) const {
222 
223  //Obtain an exclusive access lock
224  int retVal = pthread_mutex_lock(&m_accessLock);
225  if ( retVal != 0 ){
226  ATH_MSG_ERROR( "Unable to obtain access lock to get stream ID: " << fmterror(retVal) );
227  return EventStreamID("");
228  }
229 
230  // Search the entry in the map
231  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
232 
233  //Release the lock
234  retVal = pthread_mutex_unlock(&m_accessLock);
235  if ( retVal != 0 )
236  ATH_MSG_ERROR( "Unable to release access lock after getting stream ID: " << fmterror(retVal) );
237 
238  //If the element was not found return an invalid ID
239  if ( MapItr == m_eventStreamMap.end()) return EventStreamID("");
240 
241  //Return the found event stream identifier
242  return (*MapItr).first;
243  }
244 
248  const std::string ONCRPCServerSvc::GetEvent( const EventStreamID& evtStreamID ) const {
249 
250  //Obtain an exclusive access lock
251  int retVal = pthread_mutex_lock(&m_accessLock);
252  if ( retVal != 0 ){
253  ATH_MSG_ERROR( "Unable to obtain access lock to get event: " << fmterror(retVal) );
254  return std::string("");
255  }
256 
257  // Search the entry in the map
258  EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
259 
260  //Release the lock
261  retVal = pthread_mutex_unlock(&m_accessLock);
262  if ( retVal != 0 )
263  ATH_MSG_ERROR( "Unable to release access lock after getting stream event: " << fmterror(retVal) );
264 
265  //If the element was not found return an empty string
266  if ( MapItr == m_eventStreamMap.end()) return std::string("");
267 
268  //Return a copy of the found event string
269  return std::string((*MapItr).second);
270  }
271 
275  ONCRPCServerSvc::ONCRPCServerSvc( const std::string& name, ISvcLocator* sl) :
276  base_class( name, sl )
277  {
278  //declare port number property
279  declareProperty("PortNumber",m_portNumber=0,"Port number to bind - assigned dynamically if set to zero [default: 0]");
280 
281  //Make sure ServerThread does not start unexpectedly
282  m_runServerThread = false ;
283 
284  ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
285 
286  }
287 
292 
293  ATH_MSG_VERBOSE( "Destructor() " );
294  }
295 
301 
302  //Initialize Service base
303  if (Service::initialize().isFailure()) return StatusCode::FAILURE;
304 
305  //Initialize message stream level
306  msg().setLevel(outputLevel());
307  ATH_MSG_VERBOSE( "Initialize()" );
308  ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
309 
313  ATH_CHECK( StartServer() );
314  return StatusCode::SUCCESS;
315  }
316 
322 
323  ATH_MSG_VERBOSE( "Finalize()" );
324 
325  //Stop the server thread and return status code
326  return StopServer();
327  }
328 
333  StatusCode ONCRPCServerSvc::UpdateEventForStream( const EventStreamID& evtStreamID, const std::string & event) {
334 
335  ATH_MSG_VERBOSE( "UpdateEventForStream()" );
336 
337  //Make sure the server is still running
338  if (! m_runServerThread ){
339  ATH_MSG_ERROR( "Server thread is not running - refusing to update event" );
340  return StatusCode::FAILURE;
341  }
342 
343  //Check that the event stream id is valid
344  if (!evtStreamID.isValid()){
345  ATH_MSG_ERROR( "Invalid event stream identifier - cannot add event" );
346  return StatusCode::FAILURE;
347  }
348 
349  //Make sure we don't have already exceeded the maximum number of streams
350  if (m_eventStreamMap.size() > NSTREAMMAX ){
351  ATH_MSG_ERROR( "Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event" );
352  return StatusCode::FAILURE;
353  }
354 
355  //Make sure the event is not larger than the allowed maximal size
356  if (event.length() > NBYTESMAX ){
357  ATH_MSG_ERROR( "Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event" );
358  return StatusCode::FAILURE;
359  }
360 
361  //Make sure we are the only one accessing the data right now, by trying to
362  //obtain a lock. If the lock can not be obtained after a certain time, an
363  //error is reported
364 
365  //Timeout of 5 second and 0 nanoseconds
366  struct timespec timeout = { 5, 0 };
367  //Try to obtain the lock
368 #ifndef __APPLE__
369  int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
370 #else
371  int retVal = pthread_mutex_lock(&m_accessLock);
372 #endif
373  if ( retVal != 0 ){
374  ATH_MSG_ERROR( "Unable to obtain access lock to update event: " << fmterror(retVal) );
375  return StatusCode::FAILURE;
376  }
377 
378  //Using try/catch to ensure the mutex gets unlocked in any case
379  try {
380 
381  //Using std::map::operator[] and std::map::insert() will create a new event
382  //if it did not exist, otherwise just replace the existing entry (making a
383  //copy of the std::string) but would not update the key which holds new
384  //event/run number. Therefore delete existing entry first.
385 
386  //Delete old entry if there is one
387  EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
388  if (OldEvtItr != m_eventStreamMap.end())
389  m_eventStreamMap.erase(OldEvtItr);
390 
391  //Now add the new event
392  m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
393 
394  } catch ( const std::exception& e ) {
395  ATH_MSG_ERROR( "Exception caught while updating event for stream " << evtStreamID.StreamName()
396  << ": " << e.what() );
397  //Also release the lock in this case
398  pthread_mutex_unlock(&m_accessLock);
399  //before we return
400  return StatusCode::FAILURE;
401  }
402 
403  //Finally release the lock again
404  retVal = pthread_mutex_unlock(&m_accessLock);
405  if ( retVal != 0 ){
406  ATH_MSG_ERROR( "Unable to release access lock after updating event: " << fmterror(retVal) );
407  return StatusCode::FAILURE;
408  }
409 
410  ATH_MSG_DEBUG( "Updated stream " << evtStreamID.StreamName()
411  << " with event Nr. " << evtStreamID.EventNumber()
412  << " from run Nr. " << evtStreamID.RunNumber() );
413 
414  return StatusCode::SUCCESS;
415  }
416 
417 } //namespace
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
JiveXML::ONCRPCServerSvc::m_ServerThreadHandle
pthread_t m_ServerThreadHandle
Definition: ONCRPCServerSvc.h:103
JiveXML::ONCRPCServerSvc::GetEvent
virtual const std::string GetEvent(const EventStreamID &evtStreamID) const override
get the current event for a particular stream
Definition: ONCRPCServerSvc.cxx:248
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
python.CaloAddPedShiftConfig.args
args
Definition: CaloAddPedShiftConfig.py:45
JiveXML::EventStreamID::EventNumber
unsigned long EventNumber() const
Definition: EventStream.h:41
initialize
void initialize()
Definition: run_EoverP.cxx:894
ONCRPCThreadCollection.h
JiveXML::ServerThreadArguments
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
Definition: ONCRPCServerThreads.h:22
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
JiveXML::ONCRPCServerSvc::ServerThreadStopped
virtual void ServerThreadStopped() override
Callback when server thread terminates.
Definition: ONCRPCServerSvc.cxx:137
ONCRPCServer.h
python.iconfTool.models.loaders.level
level
Definition: loaders.py:20
rerun_display.client
client
Definition: rerun_display.py:31
StateLessPT_NewConfig.StreamNames
StreamNames
Definition: StateLessPT_NewConfig.py:318
JiveXML::ONCRPCServerThread
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
Definition: ONCRPCServerThreads.cxx:252
ONCRPCServerSvc.h
JiveXML::ServerThreadArguments_t
Definition: ONCRPCServerThreads.h:23
histSizes.code
code
Definition: histSizes.py:129
JiveXML::ONCRPCServerSvc::GetStreamNames
virtual std::vector< std::string > GetStreamNames() const override
get the names of all the streams
Definition: ONCRPCServerSvc.cxx:186
JiveXML::ONCRPCServerSvc::ONCRPCServerSvc
ONCRPCServerSvc(const std::string &name, ISvcLocator *sl)
Default constructor.
Definition: ONCRPCServerSvc.cxx:275
TrigConf::MSGTC::Level
Level
Definition: Trigger/TrigConfiguration/TrigConfBase/TrigConfBase/MsgStream.h:21
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventInfoWrite.StreamName
string StreamName
Definition: EventInfoWrite.py:28
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
JiveXML::ONCRPCServerSvc::finalize
virtual StatusCode finalize() override
Finalize - called once at the end.
Definition: ONCRPCServerSvc.cxx:321
calibdata.exception
exception
Definition: calibdata.py:496
JiveXML::ONCRPCServerSvc::StartServer
StatusCode StartServer()
Start the server thread.
Definition: ONCRPCServerSvc.cxx:33
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
JiveXML::EventStreamID::isValid
bool isValid() const
Definition: EventStream.h:55
JiveXML::EventStreamPair
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition: EventStream.h:86
ONCRPCXDRProcs.h
JiveXML::NBYTESMAX
const unsigned int NBYTESMAX
Definition: ONCRPCServer.h:40
ONCRPCServerThreads.h
JiveXML::EventStreamID
For the client-server communication, each event is uniquely identified by the run number,...
Definition: EventStream.h:19
JiveXML::EventStreamID::StreamName
const std::string & StreamName() const
Definition: EventStream.h:43
ONCRPCSERVERPROG
#define ONCRPCSERVERPROG
Definition: ONCRPCServer.h:30
JiveXML
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
Definition: BadLArRetriever.cxx:22
JiveXML::ONCRPCServerSvc::GetState
virtual int GetState() const override
get the Status of the application
Definition: ONCRPCServerSvc.cxx:173
JiveXML::ONCRPCServerSvc::Message
virtual void Message(const MSG::Level level, const std::string &mesg) const override
This function is exposed to allow using athena messaging service from other threads.
Definition: ONCRPCServerSvc.cxx:151
JiveXML::ONCRPCServerSvc::m_eventStreamMap
EventStreamMap m_eventStreamMap
Definition: ONCRPCServerSvc.h:97
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
JiveXML::ONCRPCServerSvc::m_runServerThread
bool m_runServerThread
Definition: ONCRPCServerSvc.h:107
python.TrigPSCPythonDbSetup.outputLevel
outputLevel
Definition: TrigPSCPythonDbSetup.py:30
JiveXML::ONCRPCServerSvc::initialize
virtual StatusCode initialize() override
Gaudi default methods.
Definition: ONCRPCServerSvc.cxx:300
JiveXML::ONCRPCServerSvc::StopServer
StatusCode StopServer()
Stop the server thread.
Definition: ONCRPCServerSvc.cxx:68
ONCRPCSERVERVERS
#define ONCRPCSERVERVERS
Definition: ONCRPCServer.h:31
JiveXML::ONCRPCServerSvc::~ONCRPCServerSvc
virtual ~ONCRPCServerSvc()
Destructor.
Definition: ONCRPCServerSvc.cxx:291
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
JiveXML::ONCRPCServerSvc::GetEventStreamID
virtual const EventStreamID GetEventStreamID(const std::string &streamName) const override
get the current EventStreamID for a particular stream
Definition: ONCRPCServerSvc.cxx:221
JiveXML::ONCRPCServerSvc::UpdateEventForStream
virtual StatusCode UpdateEventForStream(const EventStreamID &evtStreamID, const std::string &event) override
Put this event as new current event for stream given by name.
Definition: ONCRPCServerSvc.cxx:333
JiveXML::ONCRPCServerSvc::LogLevel
virtual MSG::Level LogLevel() const override
Get the logging level.
Definition: ONCRPCServerSvc.cxx:164
JiveXML::EventStreamID::RunNumber
unsigned int RunNumber() const
Definition: EventStream.h:42
python.TrigInDetArtSteps.timeout
timeout
Definition: TrigInDetArtSteps.py:36
JiveXML::ONCRPCServerSvc::m_portNumber
int m_portNumber
Definition: ONCRPCServerSvc.h:94
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
JiveXML::NSTREAMMAX
const unsigned int NSTREAMMAX
Definition: ONCRPCServer.h:38