15#include <netinet/in.h>
17#include <sys/socket.h>
24extern "C" void get_myaddress (
struct sockaddr_in *);
58 std::ostringstream
msg;
msg <<
"Created new dispatch thread " << pthread_self();
62 switch (rqstp->rq_proc){
77 ReturnEvent(rqstp->rq_xprt, eventRequest, ServerSvc);
84 std::ostringstream
msg;
85 msg <<
"Client request for server procedure #" << rqstp->rq_proc <<
" which is not defined";
87 svcerr_noproc(rqstp->rq_xprt);
101 }
catch (std::exception &e){
102 std::ostringstream
msg;
msg <<
"Caught exception in DispatchThread: " << e.what();
120 std::ostringstream
msg;
msg <<
"Request handler in thread " << pthread_self();
126 if ( ServerSvc->
LogLevel() <= MSG::DEBUG ){
129 auto caller = (
struct sockaddr* )svc_getcaller(rqstp->rq_xprt);
130 char port[NI_MAXSERV];
131 char host[NI_MAXHOST];
132 char IPAddr[INET6_ADDRSTRLEN];
135 msg <<
"Request from host ";
137 if(getnameinfo(caller, rqstp->rq_xprt->xp_addrlen,
143 if(getnameinfo(caller, rqstp->rq_xprt->xp_addrlen,
144 IPAddr,
sizeof IPAddr,
146 NI_NUMERICSERV | NI_NUMERICHOST) == 0)
147 msg <<
"(" << IPAddr <<
") on port " << port;
157 switch (rqstp->rq_proc){
168 if (!svc_getargs(rqstp->rq_xprt,(xdrproc_t)
xdr_event_req,(caddr_t)evtReq)){
169 checkResult(errno,
"creating dispatch tread arguments from GETEVENT call",ServerSvc);
171 svcerr_decode(rqstp->rq_xprt);
174 delete DpThreadArgs.
rqstp;
179 DpThreadArgs.
evtReq = evtReq;
188 evt->EventNumber=-2;evt->RunNumber=-2;evt->StreamName=NULL;evt->NBytes=0;evt->EventData=NULL;
191 if (!svc_getargs(rqstp->rq_xprt,(xdrproc_t)
xdr_event,(caddr_t)evt)){
192 checkResult(errno,
"creating dispatch tread arguments from SETEVENT call",ServerSvc);
194 svcerr_decode(rqstp->rq_xprt);
197 delete DpThreadArgs.
rqstp;
198 DpThreadArgs.
rqstp =
nullptr;
203 DpThreadArgs.
evt = evt;
210 auto ok = [&ServerSvc, &DpThreadArgs](
int retVal,
const char *
msg)->
bool{
213 delete DpThreadArgs.
rqstp;
214 DpThreadArgs.
rqstp =
nullptr;
221 retVal = pthread_attr_init (&attr);
222 if ( ! ok(retVal,
"request handler initializing thread attributes"))
return;
226 if ( ! ok(retVal,
"request handler setting thread stacksize"))
return;
232 retVal = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
233 if ( ! ok(retVal,
"request handler setting thread detach state"))
return;
235 pthread_t dispatchThread;
237 if ( ! ok(retVal,
"request handler creating dispatch thread"))
return;
243 retVal = pthread_attr_destroy(&attr);
244 if ( ! ok(retVal,
"request handler destroying thread attributes"))
return;
246 }
catch (std::exception &e){
247 std::ostringstream
msg;
msg <<
"Caught exception in RequestHandler: " << e.what();
258 ((
JiveXML::IServer*)args)->Message(MSG::INFO,
"Finished ONCRPC server thread");
281 if (!
checkResult(retVal,
"server thread creating thread-specific key",ServerSvc)) pthread_exit(NULL);
282 retVal = pthread_setspecific(ServerSvcKey, ServerSvc);
283 if (!
checkResult(retVal,
"server thread setting thread-specific key",ServerSvc)) pthread_exit(NULL);
290 ServerSvc->
Message(MSG::INFO,
"Started ONCRPC server thread");
294 ServerSvc->
Message(MSG::WARNING,
"The run server flag is not set - stopping server thread immediately");
303 struct sockaddr_in my_addr; get_myaddress(&my_addr);
310 <<
" already registered with portmapper on local host";
317# pragma GCC diagnostic push
318# pragma GCC diagnostic ignored "-Wcast-function-type"
320#if defined(__clang__) && __clang_major__ >= 19
321# pragma clang diagnostic push
322# pragma clang diagnostic ignored "-Wcast-function-type-mismatch"
324 struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;
325 clnt_stat ret = clnt_call(client, NULLPROC, (xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL, timeout);
326#if defined(__clang__) && __clang_major__ >= 19
327# pragma clang diagnostic pop
330# pragma GCC diagnostic pop
332 if (ret == RPC_SUCCESS){
334 ServerSvc->
Message(MSG::ERROR,
"Server exists and is alive on local host - stopping this thread");
336 }
else ServerSvc->
Message(MSG::WARNING,
"Existing server does not respond");
337 }
else ServerSvc->
Message(MSG::WARNING,
"Can not create client for existing server");
340 ServerSvc->
Message(MSG::WARNING,
"Clearing existing portmap entry!");
350 if ((rpc_createerr.cf_stat != RPC_PROGNOTREGISTERED) &&
351 (rpc_createerr.cf_stat != RPC_SYSTEMERROR )){
352 ServerSvc->
Message(MSG::ERROR,clnt_spcreateerror(
"Failed querying portmapper on local host for existing servers"));
357 int server_socket = RPC_ANYSOCK;
360 if (PortNumber != 0){
363 if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) != 0) {
364 checkResult(errno,
"server thread creating socket",ServerSvc);
369 struct sockaddr_in server_addr;
370 server_addr.sin_family = AF_INET;
371 server_addr.sin_addr.s_addr = INADDR_ANY;
372 server_addr.sin_port = htons(PortNumber);
373 memset(&server_addr.sin_zero, 0,
sizeof(server_addr.sin_zero));
376 if (bind(server_socket,
reinterpret_cast<sockaddr *
>(&server_addr),
sizeof(
struct sockaddr)) != 0){
377 std::ostringstream
msg;
msg <<
"server thread binding socket to port " << PortNumber;
381 std::ostringstream
msg;
msg <<
"Successfully bound port " << PortNumber;
386 SVCXPRT * transp = svctcp_create(server_socket, 0, 0);
389 if (transp == NULL) {
390 ServerSvc->
Message(MSG::ERROR,
"Opening TCP socket failed");
400 ServerSvc->
Message(MSG::ERROR,
"Could not register ONCRPC Server with RPC daemon");
401 checkResult(errno,
"registering ONCRPC Server with RPC daemon",ServerSvc);
406 fd_set SocketFileDescriptorSet;
408 int NFileDescriptors = FD_SETSIZE;
411 unsigned long NRequests = 0;
415 ServerSvc->
Message(MSG::VERBOSE,
"Waiting for call...");
418 SocketFileDescriptorSet = svc_fdset;
424 int ret = select(NFileDescriptors,&SocketFileDescriptorSet,NULL,NULL,NULL);
430 if ( errno == EINTR ) continue ;
432 checkResult(errno,
"server thread returning from select",ServerSvc);
442 svc_getreqset(&SocketFileDescriptorSet);
446 ServerSvc->
Message(MSG::DEBUG,
"Serving loop finished");
456 std::ostringstream
msg;
msg <<
"Waiting for " <<
dispatchThreads->NumberOfThreads() <<
" open dispatch threads to terminate ... ";
461 ServerSvc->
Message(MSG::INFO,
" ... finished all dispatch threads");
469 unsigned long* NReqPtr =
new unsigned long(NRequests);
471 pthread_exit(NReqPtr);
473 }
catch (std::exception &e){
474 std::ostringstream
msg;
msg <<
"Caught exception in ServerThread: " << e.what();
#define ONCRPC_SETEVENT_PROC
#define ONCRPC_GETSTATUS_PROC
#define ONCRPC_GETSTREAMS_PROC
#define ONCRPC_GETEVENT_PROC
Define macros for attributes used to control the static checker.
virtual bool GetRunServerFlag() const =0
The server thread will stop once this flag is set to false.
virtual MSG::Level LogLevel() const =0
Get the logging level.
virtual void Message(const MSG::Level level, const std::string &msg) const =0
Pure abstract interface for all full server implementations.
This class handles a collection of threads.
This header is shared inbetween the C-style server thread and the C++ Athena ServerSvc.
void ReturnState(SVCXPRT *transp, IServer *const ServerSvc)
Implementation of ONCRPC_ATHENASTATUS_PROC Return the current athena status in XDR representation.
void * ONCRPCDispatchThread(void *args)
This is the thread handling the request - one thread per request.
bool_t xdr_event_req(XDR *xdrsp, EventRequest *eventReq)
De-/Encoding of EventRequest_t.
void ReturnEvent(SVCXPRT *transp, const EventRequest *eventReq, IServer *const ServerSvc)
Implementation of ONCRPC_GETEVENT_PROC Return an event from a certain streams.
struct DispatchThreadArguments_t DispatchThreadArguments
Arguments handed over fromt the main server thread to the thread dispatching the request (one for eac...
void ReturnStreams(SVCXPRT *transp, IServer *const ServerSvc)
Implementation of ONCRPC_GETSTREAMS_PROC Return the currently available event streams.
bool checkResult(const int RetVal, const std::string &Module, IMessage *const ServerSvc)
Simple result checking routine, that will output an errorMsg throught the ServerSvc if there was an e...
ThreadCollection *const dispatchThreads
struct EventRequest_t EventRequest
Data structures for GetEvent functions.
void ONCRPCCleanupHandler(void *args)
This cleanup handler is called whenever the server thread exits.
void SetNewEvent(SVCXPRT *transp, const Event *event, IServer *const ServerSvc)
Implementation of ONCRPC_SETEVENT_PROC Set a new event for a certain streams.
void ReturnNull(SVCXPRT *transp, IServer *const ServerSvc)
Implementation of NULLPROC Return nothing - server has just been pinged.
pthread_key_t ServerSvcKey ATLAS_THREAD_SAFE
void ONCRPCRequestHandler(struct svc_req *rqstp, SVCXPRT *transp)
The Request handler is called from the RPC kernel routines - hence extern C It creates a new dispatch...
struct ServerThreadArguments_t ServerThreadArguments
Arguments handed over fromt the main (Athena) thread to the server thread.
void * ONCRPCServerThread(void *args)
This is the actual server thread, which takes above arguments.
bool_t xdr_event(XDR *xdrsp, Event *event)
De-/Encoding of Event_t.
const struct svc_req * rqstp
const EventRequest * evtReq