ATLAS Offline Software
Loading...
Searching...
No Matches
ONCRPCServerSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "GaudiKernel/ServiceHandle.h"
6#include "GaudiKernel/StateMachine.h"
7#include "GaudiKernel/IAppMgrUI.h"
8#include "GaudiKernel/GaudiHandle.h"
9
15
16
17namespace {
18 std::string fmterror(int code) {
19 char buf[256];
20 return std::string(strerror_r(code, buf, sizeof(buf)));
21 }
22}
23
24
25namespace JiveXML {
26
34
35 //Initialize access lock mechanism to ensure that the data map is never
36 //accessed by more than one thread at the same time. NULL means default
37 int retVal = pthread_mutex_init(&m_accessLock,NULL);
38 if (retVal != 0){
39 ATH_MSG_ERROR( "Unable to initialize access lock while starting server: " << fmterror(retVal) );
40 return StatusCode::FAILURE;
41 }
42
43 //The arguments passed on to the server - create new object on the heap that
44 //is persistent through the lifetime of the thread
46
47 //set runServer flag to true, so the thread will start
48 m_runServerThread = true ;
49
50 //create thread itself
51 if ( ( pthread_create(&m_ServerThreadHandle, NULL , &JiveXML::ONCRPCServerThread, (void *) args )) != 0){
52
53 //Create thread failed
54 ATH_MSG_ERROR( "Thread creation failed" );
55 return StatusCode::FAILURE;
56 }
57
58 return StatusCode::SUCCESS;
59
60 }
61
69
70 ATH_MSG_VERBOSE( "StopServer()" );
71
76
77 //Create a client - this will already cause an update of the file
78 //descriptors on the sockets
79 CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
80 if (!client){
81 ATH_MSG_ERROR( "Unable to create shutdown client for local server" );
82 return StatusCode::FAILURE;
83 }
84
85 // Next unset the runServerThread flag, which will cause the loop to stop
86 // This needs to happen after client creation, otherwise the next call won't
87 // be serverd anymore
88 m_runServerThread = false ;
89
90// xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
91#if __GNUC__ >= 8
92# pragma GCC diagnostic push
93# pragma GCC diagnostic ignored "-Wcast-function-type"
94#endif
95#if defined(__clang__) && __clang_major__ >= 19
96# pragma clang diagnostic push
97# pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
98#endif
99 // Now issue the call with a timeout
100 struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
101 clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
102#if defined(__clang__) && __clang_major__ >= 19
103# pragma clang diagnostic pop
104#endif
105#if __GNUC__ >= 8
106# pragma GCC diagnostic pop
107#endif
108
109 // A pointer to the return value of the thread
110 void* ret;
111 // wait till the server thread has finished
112 ATH_MSG_INFO( "Waiting for server thread to terminate ..." );
113 pthread_join(m_ServerThreadHandle, &ret);
114 ATH_MSG_INFO( " ... finished server thread" );
115
116 //check if there was a return value
117 if (ret){
118 //Get the return value
119 unsigned long NRequests = *(unsigned long*)ret;
120 ATH_MSG_DEBUG( "Server thread stopped after handling " << NRequests << " requests" );
121 } else
122 ATH_MSG_WARNING( "Server thread had stopped unexpectedly" );
123
124 //Destroy the access lock
125 int retVal = pthread_mutex_destroy(&m_accessLock);
126 if (retVal != 0){
127 ATH_MSG_ERROR( "Unable to destroy access lock after stopping server: " << fmterror(retVal) );
128 return StatusCode::FAILURE;
129 }
130
131 return StatusCode::SUCCESS;
132 }
133
138 //Check if the thread stopped while the run server flag was set
140 //Deliver an error message
141 ATH_MSG_ERROR( "Server thread stopped while run-server-flag is set!" );
142 //since we can't return statusCode failure (we're in a different thread!)
143 //set the flag to mark the server as dead
144 m_runServerThread = false;
145 }
146 }
147
151 void ONCRPCServerSvc::Message( const MSG::Level level,
152 const std::string& mesg) const
153 {
154 //Deliver message
155 msg() << level << mesg << endmsg;
156 //Spit it out immediately
157 msg().flush();
158 }
159
164 MSG::Level ONCRPCServerSvc::LogLevel() const {
165 //return the current logging level
166 return msg().level();
167 }
168
169
180 return (int)FSMState();
181 }
182
186 std::vector<std::string> ONCRPCServerSvc::GetStreamNames() const {
187
188 //Create a vector that can be returned
189 std::vector<std::string> StreamNames;
190
191 //Obtain an exclusive access lock
192 int retVal = pthread_mutex_lock(&m_accessLock);
193 if ( retVal != 0 ){
194 ATH_MSG_ERROR( "Unable to obtain access lock to get stream names: " << fmterror(retVal) );
195 return StreamNames;
196 }
197
198 // Iterate over map to get entries
199 EventStreamMap::const_iterator MapItr = m_eventStreamMap.begin();
200 for ( ; MapItr != m_eventStreamMap.end(); ++MapItr){
201
202 //Get the EventStreamID object
203 EventStreamID EvtStrID = (*MapItr).first;
204
205 //Add the name of this EventStreamID to the list of stream names
206 StreamNames.push_back(EvtStrID.StreamName());
207 }
208
209 //Release the lock
210 retVal = pthread_mutex_unlock(&m_accessLock);
211 if ( retVal != 0 )
212 ATH_MSG_ERROR( "Unable to release access lock after getting stream names: " << fmterror(retVal) );
213
214 //Return the list of names
215 return StreamNames;
216 }
217
221 const EventStreamID ONCRPCServerSvc::GetEventStreamID( const std::string& StreamName) const {
222
223 //Obtain an exclusive access lock
224 int retVal = pthread_mutex_lock(&m_accessLock);
225 if ( retVal != 0 ){
226 ATH_MSG_ERROR( "Unable to obtain access lock to get stream ID: " << fmterror(retVal) );
227 return EventStreamID("");
228 }
229
230 // Search the entry in the map
231 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(EventStreamID(StreamName));
232
233 //Release the lock
234 retVal = pthread_mutex_unlock(&m_accessLock);
235 if ( retVal != 0 )
236 ATH_MSG_ERROR( "Unable to release access lock after getting stream ID: " << fmterror(retVal) );
237
238 //If the element was not found return an invalid ID
239 if ( MapItr == m_eventStreamMap.end()) return EventStreamID("");
240
241 //Return the found event stream identifier
242 return (*MapItr).first;
243 }
244
248 const std::string ONCRPCServerSvc::GetEvent( const EventStreamID& evtStreamID ) const {
249
250 //Obtain an exclusive access lock
251 int retVal = pthread_mutex_lock(&m_accessLock);
252 if ( retVal != 0 ){
253 ATH_MSG_ERROR( "Unable to obtain access lock to get event: " << fmterror(retVal) );
254 return std::string("");
255 }
256
257 // Search the entry in the map
258 EventStreamMap::const_iterator MapItr = m_eventStreamMap.find(evtStreamID);
259
260 //Release the lock
261 retVal = pthread_mutex_unlock(&m_accessLock);
262 if ( retVal != 0 )
263 ATH_MSG_ERROR( "Unable to release access lock after getting stream event: " << fmterror(retVal) );
264
265 //If the element was not found return an empty string
266 if ( MapItr == m_eventStreamMap.end()) return std::string("");
267
268 //Return a copy of the found event string
269 return std::string((*MapItr).second);
270 }
271
275 ONCRPCServerSvc::ONCRPCServerSvc( const std::string& name, ISvcLocator* sl) :
276 base_class( name, sl )
277 {
278 //declare port number property
279 declareProperty("PortNumber",m_portNumber=0,"Port number to bind - assigned dynamically if set to zero [default: 0]");
280
281 //Make sure ServerThread does not start unexpectedly
282 m_runServerThread = false ;
283
284 ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
285
286 }
287
292
293 ATH_MSG_VERBOSE( "Destructor() " );
294 }
295
301
302 //Initialize Service base
303 if (Service::initialize().isFailure()) return StatusCode::FAILURE;
304
305 //Initialize message stream level
306 msg().setLevel(outputLevel());
307 ATH_MSG_VERBOSE( "Initialize()" );
308 ATH_MSG_VERBOSE( "Output level is set to " << (int)msg().level() );
309
314 return StatusCode::SUCCESS;
315 }
316
322
323 ATH_MSG_VERBOSE( "Finalize()" );
324
325 //Stop the server thread and return status code
326 return StopServer();
327 }
328
333 StatusCode ONCRPCServerSvc::UpdateEventForStream( const EventStreamID& evtStreamID, const std::string & event) {
334
335 ATH_MSG_VERBOSE( "UpdateEventForStream()" );
336
337 //Make sure the server is still running
338 if (! m_runServerThread ){
339 ATH_MSG_ERROR( "Server thread is not running - refusing to update event" );
340 return StatusCode::FAILURE;
341 }
342
343 //Check that the event stream id is valid
344 if (!evtStreamID.isValid()){
345 ATH_MSG_ERROR( "Invalid event stream identifier - cannot add event" );
346 return StatusCode::FAILURE;
347 }
348
349 //Make sure we don't have already exceeded the maximum number of streams
350 if (m_eventStreamMap.size() > NSTREAMMAX ){
351 ATH_MSG_ERROR( "Reached max. allowed number of streams " << NSTREAMMAX << " - cannot add event" );
352 return StatusCode::FAILURE;
353 }
354
355 //Make sure the event is not larger than the allowed maximal size
356 if (event.length() > NBYTESMAX ){
357 ATH_MSG_ERROR( "Event is larger than allowed max. of " << NBYTESMAX << " bytes - cannot add event" );
358 return StatusCode::FAILURE;
359 }
360
361 //Make sure we are the only one accessing the data right now, by trying to
362 //obtain a lock. If the lock can not be obtained after a certain time, an
363 //error is reported
364
365 //Timeout of 5 second and 0 nanoseconds
366 struct timespec timeout = { 5, 0 };
367 //Try to obtain the lock
368#ifndef __APPLE__
369 int retVal = pthread_mutex_timedlock(&m_accessLock, &timeout);
370#else
371 int retVal = pthread_mutex_lock(&m_accessLock);
372#endif
373 if ( retVal != 0 ){
374 ATH_MSG_ERROR( "Unable to obtain access lock to update event: " << fmterror(retVal) );
375 return StatusCode::FAILURE;
376 }
377
378 //Using try/catch to ensure the mutex gets unlocked in any case
379 try {
380
381 //Using std::map::operator[] and std::map::insert() will create a new event
382 //if it did not exist, otherwise just replace the existing entry (making a
383 //copy of the std::string) but would not update the key which holds new
384 //event/run number. Therefore delete existing entry first.
385
386 //Delete old entry if there is one
387 EventStreamMap::iterator OldEvtItr = m_eventStreamMap.find(evtStreamID);
388 if (OldEvtItr != m_eventStreamMap.end())
389 m_eventStreamMap.erase(OldEvtItr);
390
391 //Now add the new event
392 m_eventStreamMap.insert(EventStreamPair(evtStreamID,event));
393
394 } catch ( const std::exception& e ) {
395 ATH_MSG_ERROR( "Exception caught while updating event for stream " << evtStreamID.StreamName()
396 << ": " << e.what() );
397 //Also release the lock in this case
398 pthread_mutex_unlock(&m_accessLock);
399 //before we return
400 return StatusCode::FAILURE;
401 }
402
403 //Finally release the lock again
404 retVal = pthread_mutex_unlock(&m_accessLock);
405 if ( retVal != 0 ){
406 ATH_MSG_ERROR( "Unable to release access lock after updating event: " << fmterror(retVal) );
407 return StatusCode::FAILURE;
408 }
409
410 ATH_MSG_DEBUG( "Updated stream " << evtStreamID.StreamName()
411 << " with event Nr. " << evtStreamID.EventNumber()
412 << " from run Nr. " << evtStreamID.RunNumber() );
413
414 return StatusCode::SUCCESS;
415 }
416
417} //namespace
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
#define ONCRPCSERVERVERS
#define ONCRPCSERVERPROG
For the client-server communication, each event is uniquely identified by the run number,...
Definition EventStream.h:19
unsigned int RunNumber() const
Definition EventStream.h:42
unsigned long EventNumber() const
Definition EventStream.h:41
const std::string & StreamName() const
Definition EventStream.h:43
virtual int GetState() const override
get the Status of the application
virtual MSG::Level LogLevel() const override
Get the logging level.
virtual const EventStreamID GetEventStreamID(const std::string &streamName) const override
get the current EventStreamID for a particular stream
virtual std::vector< std::string > GetStreamNames() const override
get the names of all the streams
ONCRPCServerSvc(const std::string &name, ISvcLocator *sl)
Default constructor.
virtual void Message(const MSG::Level level, const std::string &mesg) const override
This function is exposed to allow using athena messaging service from other threads.
StatusCode StopServer()
Stop the server thread.
EventStreamMap m_eventStreamMap
virtual StatusCode finalize() override
Finalize - called once at the end.
virtual const std::string GetEvent(const EventStreamID &evtStreamID) const override
get the current event for a particular stream
virtual StatusCode UpdateEventForStream(const EventStreamID &evtStreamID, const std::string &event) override
Put this event as new current event for stream given by name.
virtual ~ONCRPCServerSvc()
Destructor.
virtual void ServerThreadStopped() override
Callback when server thread terminates.
virtual StatusCode initialize() override
Gaudi default methods.
StatusCode StartServer()
Start the server thread.
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
const unsigned int NBYTESMAX
std::pair< const EventStreamID, const std::string > EventStreamPair
A map that stores events according to their EventStreamID Due to the way EventStreamID is build,...
Definition EventStream.h:86
const unsigned int NSTREAMMAX
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
MsgStream & msg
Definition testRead.cxx:32