ATLAS Offline Software
Loading...
Searching...
No Matches
ONCRPCServerThreads.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
10
11#include "JiveXML/IServer.h"
12
13//Decoding of caller IP address
14#include <climits>
15#include <netinet/in.h>
16#include <arpa/inet.h>
17#include <sys/socket.h>
18#include <netdb.h>
19
20#include <sstream>
21#include <errno.h>
22#ifdef __APPLE__
23#include <rpc/rpc.h>
24extern "C" void get_myaddress (struct sockaddr_in *);
25#endif
26
27namespace JiveXML {
28
29 //Global data
30 //This variable is visible in all threads
32
33 //Thread specific data keys
34 //These variables are global only within a single thread
35 pthread_key_t ServerSvcKey ATLAS_THREAD_SAFE; //<! A pointer to the the Athena Service associated with this thread
36
42 void* ONCRPCDispatchThread ( void* args ){
43
44 //Get the thread arguments
45 const struct svc_req* rqstp = ((DispatchThreadArguments*)args)->rqstp;
46 const EventRequest* eventRequest = ((DispatchThreadArguments*)args)->evtReq;
47 const Event* event = ((DispatchThreadArguments*)args)->evt;
48 IServer* const ServerSvc = ((DispatchThreadArguments*)args)->ServerSvcPtr;
49
50 //we have stored the arguments, so delete the object
51 delete (DispatchThreadArguments*)args;
52
53 try {
54
55 //Now first of all register myself with the ServerSvc;
56 dispatchThreads->AddThread(pthread_self());
57 //Tell them the thread number if they want to know
58 std::ostringstream msg; msg << "Created new dispatch thread " << pthread_self();
59 ServerSvc->Message(MSG::VERBOSE,msg.str());
60
61 //Process the request
62 switch (rqstp->rq_proc){
63 //NULLPROC - no data exchanged, just pinging for response
64 case NULLPROC:
65 ReturnNull(rqstp->rq_xprt, ServerSvc);
66 break ;
67 //GETSTATUS - return the current athena status
69 ReturnState(rqstp->rq_xprt, ServerSvc);
70 break ;
71 //GETSTREAMS - return the names of the available streams
73 ReturnStreams(rqstp->rq_xprt, ServerSvc);
74 break ;
75 //GETEVENT - return the current event in a given stream
77 ReturnEvent(rqstp->rq_xprt, eventRequest, ServerSvc);
78 break ;
79 //GETEVENT - return the current event in a given stream
81 SetNewEvent(rqstp->rq_xprt, event, ServerSvc);
82 break ;
83 default:
84 std::ostringstream msg;
85 msg << "Client request for server procedure #" << rqstp->rq_proc << " which is not defined";
86 ServerSvc->Message(MSG::WARNING, msg.str());
87 svcerr_noproc(rqstp->rq_xprt);
88 break ;
89 }
90
91 //We have dealt with the request, delete it
92 delete rqstp;
93 delete event;
94 delete eventRequest;
95
96 //Finally unregister myself with the ServerSvc
97 //Note that this will also detach the thread, so this has to be the last
98 //statement before the thread returns
99 dispatchThreads->RemoveThread(pthread_self());
100
101 } catch (std::exception &e){
102 std::ostringstream msg; msg << "Caught exception in DispatchThread: " << e.what();
103 ServerSvc->Message(MSG::ERROR,msg.str());
104 }
105
106 return NULL;
107 }
108
109
116 extern "C" void ONCRPCRequestHandler( struct svc_req* rqstp, SVCXPRT* /*transp*/){
117
118 //Get the ServerSvc pointer for this thread
119 JiveXML::IServer* const ServerSvc = (JiveXML::IServer*)pthread_getspecific(ServerSvcKey);
120 std::ostringstream msg; msg << "Request handler in thread " << pthread_self();
121 ServerSvc->Message(MSG::VERBOSE,msg.str()); msg.str("");
122
123 try {
124
125 //Check wether it is worth retrievin caller information
126 if ( ServerSvc->LogLevel() <= MSG::DEBUG ){
127
128 //Get information about the requester
129 auto caller = (struct sockaddr* )svc_getcaller(rqstp->rq_xprt);
130 char port[NI_MAXSERV];
131 char host[NI_MAXHOST];
132 char IPAddr[INET6_ADDRSTRLEN];
133
134 //assemble a message
135 msg << "Request from host ";
136 //Add host name if we have one
137 if(getnameinfo(caller, rqstp->rq_xprt->xp_addrlen,
138 host, sizeof host,
139 nullptr, 0,
140 0) == 0)
141 msg << host << " ";
142 //Add ip-address and port
143 if(getnameinfo(caller, rqstp->rq_xprt->xp_addrlen,
144 IPAddr, sizeof IPAddr,
145 port, sizeof port,
146 NI_NUMERICSERV | NI_NUMERICHOST) == 0)
147 msg << "(" << IPAddr << ") on port " << port;
148 //Deliver the message
149 ServerSvc->Message(MSG::DEBUG,msg.str()); msg.str("");
150 }
151
152 //Prepare the dispatch thread argument structure making a copy of the
153 //request, so it sticks around if this thread is gone
154 DispatchThreadArguments DpThreadArgs(ServerSvc, new svc_req(*rqstp), NULL, NULL);
155
156 //Check for input data to the request
157 switch (rqstp->rq_proc){
158
160 //Create structure to hold request arguments on the head so we can
161 //forward it to the dispatch thread.
162 //NOTE: This is a C-struct that MUST be initalized.
163 //It will be cleaned up by the dispatch thread
164 EventRequest* evtReq = new EventRequest;
165 evtReq->EventNumber=-2;evtReq->RunNumber=-2;evtReq->StreamName=NULL;
166
167 //Retrieve the request arguments
168 if (!svc_getargs(rqstp->rq_xprt,(xdrproc_t)xdr_event_req,(caddr_t)evtReq)){
169 checkResult(errno,"creating dispatch tread arguments from GETEVENT call",ServerSvc);
170 //tell the client his request failed
171 svcerr_decode(rqstp->rq_xprt);
172 //return immediately from this request
173 //clean up
174 delete DpThreadArgs.rqstp;
175 return ;
176 }
177
178 //Set the event request argument
179 DpThreadArgs.evtReq = evtReq;
180 break;
181
182 } case ONCRPC_SETEVENT_PROC: {
183 //Create structure to hold request arguments on the HEAP so we can
184 //forward it to the dispatch thread.
185 //NOTE: This is a C-struct that MUST be initalized
186 //It will be cleaned up by the dispatch thread
187 Event* evt = new Event;
188 evt->EventNumber=-2;evt->RunNumber=-2;evt->StreamName=NULL;evt->NBytes=0;evt->EventData=NULL;
189
190 //Retrieve the request arguments
191 if (!svc_getargs(rqstp->rq_xprt,(xdrproc_t)xdr_event,(caddr_t)evt)){
192 checkResult(errno,"creating dispatch tread arguments from SETEVENT call",ServerSvc);
193 //tell the client his request failed
194 svcerr_decode(rqstp->rq_xprt);
195 //return immediately from this request
196 //clean up
197 delete DpThreadArgs.rqstp;
198 DpThreadArgs.rqstp = nullptr;
199 return ;
200 }
201
202 //Set the event request argument
203 DpThreadArgs.evt = evt;
204 break;
205 }
206 }
207
208 //Check for errors in all pthread functions
209 int retVal = 0;
210 auto ok = [&ServerSvc, &DpThreadArgs](int retVal, const char * msg)->bool{
211 if ( ! checkResult(retVal,msg,ServerSvc)){
212 //clean up
213 delete DpThreadArgs.rqstp;
214 DpThreadArgs.rqstp = nullptr;
215 return false;
216 }
217 return true;
218 };
219 //Generate thread attributes
220 pthread_attr_t attr;
221 retVal = pthread_attr_init (&attr);
222 if ( ! ok(retVal,"request handler initializing thread attributes")) return;
223
224 //Removing the limit on the thread memory usage as a test. Suspect that some threads do not have enough memory to finish and therefore eat up all the memory.
225 //retVal = pthread_attr_setstacksize(&attr,10*PTHREAD_STACK_MIN);
226 if ( ! ok(retVal,"request handler setting thread stacksize")) return;
227
228
229 //NOTE: All threads are first created joinable, so we can wait for the to
230 //finish using pthread_detach. Yet, when the thread removes itself from
231 //the ThreadCollection, it will detach itself, so no memory is lost.
232 retVal = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
233 if ( ! ok(retVal,"request handler setting thread detach state")) return;
234 //Create a new thread
235 pthread_t dispatchThread;
236 retVal = pthread_create(&dispatchThread,&attr,&ONCRPCDispatchThread,new DispatchThreadArguments(DpThreadArgs));
237 if ( ! ok(retVal,"request handler creating dispatch thread")) return;
238
239 //And wait till it has registered itself with the ThreadCollection
240 dispatchThreads->WaitAdd();
241
242 //release thread attributs
243 retVal = pthread_attr_destroy(&attr);
244 if ( ! ok(retVal,"request handler destroying thread attributes")) return;
245
246 } catch (std::exception &e){
247 std::ostringstream msg; msg << "Caught exception in RequestHandler: " << e.what();
248 ServerSvc->Message(MSG::ERROR,msg.str());
249 }
250 }
251
255 void ONCRPCCleanupHandler(void* args){
256 //Since this cleanup handler is associated with the IServer*
257 //we will get exactly this as an argument
258 ((JiveXML::IServer*)args)->Message(MSG::INFO,"Finished ONCRPC server thread");
259 //Also call the servers callback
260 ((JiveXML::IServer*)args)->ServerThreadStopped();
261 }
262
266 void* ONCRPCServerThread( void* args ){
267
268 //Get a pointer to the Service that started this thread
269 JiveXML::IServer* const ServerSvc = (*( ServerThreadArguments* )args).ServerPtr;
270 //Get the port number from the thread arguments
271 const int PortNumber = (*( ServerThreadArguments* )args).PortNumber;
272
273 //We've got the arguments and can delete the struct
274 delete (ServerThreadArguments*)args;
275
276 //Save the arguments in a thread-specific keys
277 //Once the thread exits, the specified cleanup handler is called,
278 //which in our case is used to notify the IServer of the thread termination
279 int retVal = 0;
280 retVal = pthread_key_create(&ServerSvcKey,ONCRPCCleanupHandler);
281 if (! checkResult(retVal,"server thread creating thread-specific key",ServerSvc)) pthread_exit(NULL);
282 retVal = pthread_setspecific(ServerSvcKey, ServerSvc);
283 if (! checkResult(retVal,"server thread setting thread-specific key",ServerSvc)) pthread_exit(NULL);
284
285 try {
286
287 //Also install cleanup handlers if they are given as arguments
288
289 //Send a message to the messaging function of the service
290 ServerSvc->Message(MSG::INFO,"Started ONCRPC server thread");
291
292 //Check wether the runServerThread flag is set
293 if (!ServerSvc->GetRunServerFlag()){
294 ServerSvc->Message(MSG::WARNING,"The run server flag is not set - stopping server thread immediately");
295 pthread_exit(NULL);
296 }
297
302 //Get my address
303 struct sockaddr_in my_addr; get_myaddress(&my_addr);
304 //Try get the portnumber of that process
305 unsigned short port = pmap_getport(&my_addr, ONCRPCSERVERPROG,ONCRPCSERVERVERS,IPPROTO_TCP);
306 //Check if some program has already been registered
307 if ( port != 0 ){
308 //First report it
309 std::ostringstream msg; msg << "Program with program number " << ONCRPCSERVERPROG
310 << " already registered with portmapper on local host";
311 ServerSvc->Message(MSG::INFO,msg.str());
312 //Next check if it is alive by creating a client and calling its NULLPROC
313 CLIENT* client = clnt_create("localhost", ONCRPCSERVERPROG,ONCRPCSERVERVERS, "tcp");
314 if (client != NULL){
315// xdr_void is defined inconsistently in xdr.h and gets a warning from gcc8.
316#if __GNUC__ >= 8
317# pragma GCC diagnostic push
318# pragma GCC diagnostic ignored "-Wcast-function-type"
319#endif
320#if defined(__clang__) && __clang_major__ >= 19
321# pragma clang diagnostic push
322# pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
323#endif
324 struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
325 clnt_stat ret = clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
326#if defined(__clang__) && __clang_major__ >= 19
327# pragma clang diagnostic pop
328#endif
329#if __GNUC__ >= 8
330# pragma GCC diagnostic pop
331#endif
332 if (ret == RPC_SUCCESS){
333 //So we already have a server, and it is alive -- then we don't start
334 ServerSvc->Message(MSG::ERROR,"Server exists and is alive on local host - stopping this thread");
335 pthread_exit(NULL);
336 } else ServerSvc->Message(MSG::WARNING,"Existing server does not respond");
337 } else ServerSvc->Message(MSG::WARNING,"Can not create client for existing server");
338
339 //Now just force a clearing of the portmap entry
340 ServerSvc->Message(MSG::WARNING,"Clearing existing portmap entry!");
342
343 //Finally clear the error entry by trying to get the port number again
344 port = pmap_getport(&my_addr, ONCRPCSERVERPROG,ONCRPCSERVERVERS,IPPROTO_TCP);
345 }
346
347 //We should have left either the first or the second call to pmap_getport
348 // with "program not registered" , which is returned as RPC_PROGNOTREGISTERD
349 //or RPC_SYSTEMERROR. However, we could also have failed contacting the portmapper
350 if ((rpc_createerr.cf_stat != RPC_PROGNOTREGISTERED) &&
351 (rpc_createerr.cf_stat != RPC_SYSTEMERROR )){
352 ServerSvc->Message(MSG::ERROR,clnt_spcreateerror("Failed querying portmapper on local host for existing servers"));
353 pthread_exit(NULL);
354 }
355
356 //The socket to connect to, not bound to a specifc port by default
357 int server_socket = RPC_ANYSOCK;
358
359 //Check if a specific port was requested
360 if (PortNumber != 0){
361
362 //Create a socket
363 if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) != 0) {
364 checkResult(errno,"server thread creating socket",ServerSvc);
365 pthread_exit(NULL);
366 }
367
368 //Create an inet address with the given port number
369 struct sockaddr_in server_addr;
370 server_addr.sin_family = AF_INET; //IPv4 address
371 server_addr.sin_addr.s_addr = INADDR_ANY; //will bind all interfaces
372 server_addr.sin_port = htons(PortNumber); //port number in network byte order
373 memset(&server_addr.sin_zero, 0, sizeof(server_addr.sin_zero)); //zero padding
374
375 //Now bind the socket to that inet address
376 if (bind(server_socket, reinterpret_cast<sockaddr *>(&server_addr), sizeof(struct sockaddr)) != 0){
377 std::ostringstream msg; msg << "server thread binding socket to port " << PortNumber;
378 checkResult(errno,msg.str(),ServerSvc);
379 pthread_exit(NULL);
380 }
381 std::ostringstream msg; msg << "Successfully bound port " << PortNumber;
382 ServerSvc->Message(MSG::INFO,msg.str());
383 }
384
385 //Get a transport handle using default buffer sizes
386 SVCXPRT * transp = svctcp_create(server_socket, 0, 0);
387
388 //Check result
389 if (transp == NULL) {
390 ServerSvc->Message(MSG::ERROR,"Opening TCP socket failed");
391 pthread_exit(NULL);
392 }
393
394 //Next register our server with the RPC daemon
395#ifndef __APPLE__
396 if (! svc_register(transp,ONCRPCSERVERPROG,ONCRPCSERVERVERS,ONCRPCRequestHandler,IPPROTO_TCP)){
397#else
398 if (! svc_register(transp,ONCRPCSERVERPROG,ONCRPCSERVERVERS,(void (*)())ONCRPCRequestHandler,IPPROTO_TCP)){
399#endif
400 ServerSvc->Message(MSG::ERROR,"Could not register ONCRPC Server with RPC daemon");
401 checkResult(errno,"registering ONCRPC Server with RPC daemon",ServerSvc);
402 pthread_exit(NULL);
403 }
404
405 //A set of socket file descriptors
406 fd_set SocketFileDescriptorSet;
407 //Size of the socket handle table
408 int NFileDescriptors = FD_SETSIZE;
409
410 //Count the number of requests we have received while running
411 unsigned long NRequests = 0;
412
413 do {
414
415 ServerSvc->Message(MSG::VERBOSE,"Waiting for call...");
416
417 //Get the file descriptor set
418 SocketFileDescriptorSet = svc_fdset;
419
420 //Wait for input on any of these using select:
421 // - first NULL: not cecking for readiness to write to socket
422 // - second NULL: not checking for socket conditions
423 // - third NULL: timeout - wait forever
424 int ret = select(NFileDescriptors,&SocketFileDescriptorSet,NULL,NULL,NULL);
425 // Check result
426 switch (ret){
427 // an error occured
428 case -1:
429 // If just an interrupt continue
430 if ( errno == EINTR ) continue ;
431 // Else show error
432 checkResult(errno,"server thread returning from select",ServerSvc);
433 return NULL;
434 // a timeout occured - break the loop
435 case 0:
436 break ;
437 // activity on one of the sockets
438 default:
439 //Count number of requests
440 ++NRequests;
441 // Get the requests from the sockets
442 svc_getreqset(&SocketFileDescriptorSet);
443 }
444 } while ( ServerSvc->GetRunServerFlag() );
445
446 ServerSvc->Message(MSG::DEBUG,"Serving loop finished");
447
448 //Unregister the service, so no clients can connect anymore
449 svc_unregister(ONCRPCSERVERPROG,ONCRPCSERVERVERS);
450
451 //Also stop all the dispatching threads, which need to finish before this
452 //one. Otherwise the server will become invalid, before all requests are
453 //served
454
455 //Tell how many threads we are waiting for
456 std::ostringstream msg; msg << "Waiting for " << dispatchThreads->NumberOfThreads() << " open dispatch threads to terminate ... ";
457 ServerSvc->Message(MSG::INFO,msg.str());
458 //Then wait for them all to finish
459 dispatchThreads->JoinAll();
460 //Tell when we are done
461 ServerSvc->Message(MSG::INFO, " ... finished all dispatch threads");
462
463 //Now that the threads are stopped we can finish the transport protocol and
464 //close the port
465 svc_destroy(transp);
466
467 //Return number of requests - as all local variables will disappear,
468 //the return value needs to be allocated on the heap;
469 unsigned long* NReqPtr = new unsigned long(NRequests);
470 //Terminate this thread
471 pthread_exit(NReqPtr);
472
473 } catch (std::exception &e){
474 std::ostringstream msg; msg << "Caught exception in ServerThread: " << e.what();
475 ServerSvc->Message(MSG::ERROR,msg.str());
476 }
477
478 return NULL;
479 }
480} //namespace
481
#define ONCRPCSERVERVERS
#define ONCRPC_SETEVENT_PROC
#define ONCRPC_GETSTATUS_PROC
#define ONCRPC_GETSTREAMS_PROC
#define ONCRPC_GETEVENT_PROC
#define ONCRPCSERVERPROG
Define macros for attributes used to control the static checker.
virtual bool GetRunServerFlag() const =0
The server thread will stop once this flag is set to false.
virtual MSG::Level LogLevel() const =0
Get the logging level.
virtual void Message(const MSG::Level level, const std::string &msg) const =0
Pure abstract interface for all full server implementations.
Definition IServer.h:22
This class handles a collection of threads.
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
void ReturnState(SVCXPRT *transp, IServer *const ServerSvc)
Implementation of ONCRPC_ATHENASTATUS_PROC Return the current athena status in XDR representation.
void * ONCRPCDispatchThread(void *args)
This is the thread handling the request - one thread per request.
bool_t xdr_event_req(XDR *xdrsp, EventRequest *eventReq)
De-/Encoding of EventRequest_t.
void ReturnEvent(SVCXPRT *transp, const EventRequest *eventReq, IServer *const ServerSvc)
Implementation of ONCRPC_GETEVENT_PROC Return an event from a certain streams.
struct DispatchThreadArguments_t DispatchThreadArguments
Arguments handed over fromt the main server thread to the thread dispatching the request (one for eac...
void ReturnStreams(SVCXPRT *transp, IServer *const ServerSvc)
Implementation of ONCRPC_GETSTREAMS_PROC Return the currently available event streams.
bool checkResult(const int RetVal, const std::string &Module, IMessage *const ServerSvc)
Simple result checking routine, that will output an errorMsg throught the ServerSvc if there was an e...
ThreadCollection *const dispatchThreads
struct EventRequest_t EventRequest
Data structures for GetEvent functions.
void ONCRPCCleanupHandler(void *args)
This cleanup handler is called whenever the server thread exits.
void SetNewEvent(SVCXPRT *transp, const Event *event, IServer *const ServerSvc)
Implementation of ONCRPC_SETEVENT_PROC Set a new event for a certain streams.
void ReturnNull(SVCXPRT *transp, IServer *const ServerSvc)
Implementation of NULLPROC Return nothing - server has just been pinged.
struct Event_t Event
pthread_key_t ServerSvcKey ATLAS_THREAD_SAFE
void ONCRPCRequestHandler(struct svc_req *rqstp, SVCXPRT *transp)
The Request handler is called from the RPC kernel routines - hence extern C It creates a new dispatch...
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
bool_t xdr_event(XDR *xdrsp, Event *event)
De-/Encoding of Event_t.
MsgStream & msg
Definition testRead.cxx:32