ATLAS Offline Software
EvtRangeScatterer.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "EvtRangeScatterer.h"
8 
10 #include "GaudiKernel/IEvtSelector.h"
11 #include "GaudiKernel/IIoComponentMgr.h"
12 #include "GaudiKernel/IFileMgr.h"
13 #include "CxxUtils/starts_with.h"
14 
15 #include <sys/stat.h>
16 #include <sstream>
17 #include <unistd.h>
18 #include <stdio.h>
19 #include <stdint.h>
20 
21 #include "yampl/SocketFactory.h"
22 #include <queue>
23 #include <fstream>
24 #include <cstdlib>
25 #include <filesystem>
26 
28  , const std::string& name
29  , const IInterface* parent)
31  , m_processorChannel("")
32  , m_eventRangeChannel("")
33  , m_doCaching(false)
34 {
35  m_subprocDirPrefix = "range_scatterer";
36  declareProperty("ProcessorChannel", m_processorChannel);
37  declareProperty("EventRangeChannel", m_eventRangeChannel);
38  declareProperty("DoCaching",m_doCaching);
39 }
40 
42 {
43 }
44 
46 {
47  ATH_MSG_DEBUG("In initialize");
48 
50  if(!sc.isSuccess()) return sc;
51 
52  return StatusCode::SUCCESS;
53 }
54 
56 {
57  return StatusCode::SUCCESS;
58 }
59 
60 int EvtRangeScatterer::makePool(int maxevt, int nprocs, const std::string& topdir)
61 {
62  ATH_MSG_DEBUG("In makePool " << getpid());
63 
64  if(maxevt < -1) {
65  ATH_MSG_ERROR("Invalid number of events requested: " << maxevt);
66  return -1;
67  }
68 
69  if(topdir.empty()) {
70  ATH_MSG_ERROR("Empty name for the top directory!");
71  return -1;
72  }
73 
74  m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
75  m_subprocTopDir = topdir;
76 
77  // Create the process group with only one process and map_async bootstrap
79  ATH_MSG_INFO("Token Scatterer process created");
80  if(mapAsyncFlag(AthenaMPToolBase::FUNC_BOOTSTRAP))
81  return -1;
82 
83  return 1;
84 }
85 
87 {
88  ATH_MSG_DEBUG("In exec " << getpid());
89 
90  if(mapAsyncFlag(AthenaMPToolBase::FUNC_EXEC))
91  return StatusCode::FAILURE;
92 
93  // Set exit flag on reader
94  if(m_processGroup->map_async(0,0)){
95  ATH_MSG_ERROR("Unable to set exit to the reader");
96  return StatusCode::FAILURE;
97  }
98  return StatusCode::SUCCESS;
99 }
100 
101 void EvtRangeScatterer::subProcessLogs(std::vector<std::string>& filenames)
102 {
103  filenames.clear();
104  std::filesystem::path reader_rundir(m_subprocTopDir);
105  reader_rundir/= std::filesystem::path(m_subprocDirPrefix);
106  filenames.push_back(reader_rundir.string()+std::string("/AthenaMP.log"));
107 }
108 
110 {
112  return jobOutputs;
113 }
114 
115 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::bootstrap_func()
116 {
117  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
118  outwork->data = malloc(sizeof(int));
119  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
120  outwork->size = sizeof(int);
121 
122  // ...
123  // (possible) TODO: extend outwork with some error message, which will be eventually
124  // reported in the master proces
125  // ...
126 
127  // Reader dir: mkdir
128  std::filesystem::path reader_rundir(m_subprocTopDir);
129  reader_rundir /= std::filesystem::path(m_subprocDirPrefix);
130 
131  if(mkdir(reader_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
132  ATH_MSG_ERROR("Unable to make reader run directory: " << reader_rundir.string() << ". " << fmterror(errno));
133  return outwork;
134  }
135 
136  // Redirect logs
137  if(redirectLog(reader_rundir.string()))
138  return outwork;
139 
140  ATH_MSG_INFO("Logs redirected in the AthenaMP Event Range Scatterer PID=" << getpid());
141 
142  // Update Io Registry
143  if(updateIoReg(reader_rundir.string()))
144  return outwork;
145 
146  ATH_MSG_INFO("Io registry updated in the AthenaMP Event Range Scatterer PID=" << getpid());
147 
148  // _______________________ Handle saved PFC (if any) ______________________
149  std::filesystem::path abs_reader_rundir = std::filesystem::absolute(reader_rundir);
150  if(handleSavedPfc(abs_reader_rundir))
151  return outwork;
152 
153  // Reopen file descriptors
154  if(reopenFds())
155  return outwork;
156 
157  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP Event Range Scatterer PID=" << getpid());
158 
159  // ________________________ I/O reinit ________________________
160  if(!m_ioMgr->io_reinitialize().isSuccess()) {
161  ATH_MSG_ERROR("Failed to reinitialize I/O");
162  return outwork;
163  } else {
164  ATH_MSG_DEBUG("Successfully reinitialized I/O");
165  }
166 
167  // Start the event selector
168  IService* evtSelSvc = dynamic_cast<IService*>(m_evtSelector);
169  if(!evtSelSvc) {
170  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
171  return outwork;
172  }
173  if(!evtSelSvc->start().isSuccess()) {
174  ATH_MSG_ERROR("Failed to restart the event selector");
175  return outwork;
176  } else {
177  ATH_MSG_DEBUG("Successfully restarted the event selector");
178  }
179 
180  // Reader dir: chdir
181  if(chdir(reader_rundir.string().c_str())==-1) {
182  ATH_MSG_ERROR("Failed to chdir to " << reader_rundir.string());
183  return outwork;
184  }
185 
186  // Declare success and return
187  *(int*)(outwork->data) = 0;
188  return outwork;
189 }
190 
191 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func()
192 {
193  ATH_MSG_INFO("Exec function in the AthenaMP Token Scatterer PID=" << getpid());
194 
195  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
196  // Create a socket to communicate with the Pilot
197  yampl::ISocket* socket2Pilot = socketFactory->createClientSocket(yampl::Channel(m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA);
198  ATH_MSG_INFO("Created CLIENT socket for communicating Event Ranges with the Pilot");
199  // Create a socket to communicate with EvtRangeProcessors
200  std::string socket2ProcessorName = m_processorChannel.value() + std::string("_") + m_randStr;
201  yampl::ISocket* socket2Processor = socketFactory->createServerSocket(yampl::Channel(socket2ProcessorName,yampl::LOCAL),yampl::MOVE_DATA);
202  ATH_MSG_INFO("Created SERVER socket to token processors: " << socket2ProcessorName);
203 
204  bool all_ok=true;
205  int procReportPending(0); // Keep track of how many output files are yet to be reported by Token Processors
206 
207  AthenaInterprocess::SharedQueue* sharedFailedPidQueue(0);
208  if(detStore()->retrieve(sharedFailedPidQueue,"AthenaMPFailedPidQueue_"+m_randStr).isFailure()) {
209  ATH_MSG_ERROR("Unable to retrieve the pointer to Shared Failed PID Queue");
210  all_ok=false;
211  }
212 
213  // Communication protocol with the Pilot
214  std::string strReady("Ready for events");
215  std::string strStopProcessing("No more events");
216  std::string processorWaitRequest("");
217  int workerPid{-1};
218 
219  ATH_MSG_INFO("Starting main loop");
220 
221  while(all_ok) {
222  // NO CACHING MODE: first get a request from one of the processors and only after that request the next event range from the pilot
223  if(!m_doCaching && processorWaitRequest.empty()) {
224  ATH_MSG_DEBUG("Waiting for event range request from one of the processors ... ");
225  while(processorWaitRequest.empty()) {
226  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
227  pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
228  usleep(1000);
229  }
230  ATH_MSG_INFO("One of the processors is ready for the next range");
231  // Get PID from the request and Update m_pid2RangeID
232  workerPid = std::atoi(processorWaitRequest.c_str());
233  auto it = m_pid2RangeID.find(workerPid);
234  if(it!=m_pid2RangeID.end()) {
235  m_pid2RangeID.erase(it);
236  }
237  }
238 
239  // Signal the Pilot that AthenaMP is ready for event processing
240  void* ready_message = malloc(strReady.size());
241  memcpy(ready_message,strReady.data(),strReady.size());
242  socket2Pilot->send(ready_message,strReady.size());
243  void* eventRangeMessage;
244  std::string strPeerId;
245  ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage,strPeerId);
246  std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
247  size_t carRet = eventRange.find('\n');
248  if(carRet!=std::string::npos)
249  eventRange.resize(carRet);
250 
251  // Break the loop if no more ranges are expected
252  if(eventRange.find(strStopProcessing)!=std::string::npos) {
253  ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
254  break;
255  }
256  ATH_MSG_INFO("Got Event Range from the pilot: " << eventRange);
257 
258  // Parse the Event Range string
259  // Expected the following format: [{KEY:VALUE[,KEY:VALUE]}]
260  // First get rid of the leading '[{' and the trailing '}]'
261  if(CxxUtils::starts_with (eventRange, "[{")) eventRange=eventRange.substr(2);
262  if(CxxUtils::ends_with (eventRange, "}]")) eventRange.resize(eventRange.size()-2);
263 
264  std::map<std::string,std::string> eventRangeMap;
265  size_t startpos(0);
266  size_t endpos = eventRange.find(',');
267  while(endpos!=std::string::npos) {
268  // Get the Key-Value pair
269  std::string keyValue(eventRange.substr(startpos,endpos-startpos));
270  size_t colonPos = keyValue.find(':');
271  std::string strKey = keyValue.substr(0,colonPos);
272  std::string strVal = keyValue.substr(colonPos+1);
273  trimRangeStrings(strKey);
274  trimRangeStrings(strVal);
275  eventRangeMap[strKey]=strVal;
276  // Next iteration
277  startpos = endpos+1;
278  endpos = eventRange.find(',',startpos);
279  }
280  // Get the final Key-Value pair
281  std::string keyValue(eventRange.substr(startpos));
282  size_t colonPos = keyValue.find(':');
283  std::string strKey = keyValue.substr(0,colonPos);
284  std::string strVal = keyValue.substr(colonPos+1);
285  trimRangeStrings(strKey);
286  trimRangeStrings(strVal);
287  eventRangeMap[strKey]=strVal;
288 
289  if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
290  || eventRangeMap.find("startEvent")==eventRangeMap.end()
291  || eventRangeMap.find("lastEvent")==eventRangeMap.end()
292  || eventRangeMap.find("GUID")==eventRangeMap.end()) {
293  std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong format");
295  ATH_MSG_INFO("Ignoring this event range ");
296  void* errorMessage = malloc(errorStr.size());
297  memcpy(errorMessage,errorStr.data(),errorStr.size());
298  socket2Pilot->send(errorMessage,errorStr.size());
299  continue;
300  }
301  else {
302  ATH_MSG_DEBUG("*** Decoded Event Range ***");
303  std::map<std::string,std::string>::const_iterator it = eventRangeMap.begin(),
304  itEnd = eventRangeMap.end();
305  for(;it!=itEnd;++it)
306  ATH_MSG_DEBUG(it->first << ":" << it->second);
307  ATH_MSG_DEBUG("*** ***");
308  }
309 
310  std::string rangeID = eventRangeMap["eventRangeID"];
311  std::string guid = eventRangeMap["GUID"];
312  int startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
313  int lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
314  if(rangeID.empty()
315  || guid.empty()
316  || lastEvent < startEvent) {
317  std::string errorStr("ERR_ATHENAMP_PARSE \"" + eventRange + "\": Wrong values of range fields");
319  ATH_MSG_INFO("Ignoring this event range ");
320  void* errorMessage = malloc(errorStr.size());
321  memcpy(errorMessage,errorStr.data(),errorStr.size());
322  socket2Pilot->send(errorMessage,errorStr.size());
323  continue;
324  }
325 
326  std::string message2ProcessorStr;
327  char* message2Processor(0);
328 
329  std::ostringstream ostr;
330  ostr << rangeID;
331  if(eventRangeMap.find("PFN")!=eventRangeMap.end()) {
332  ostr << "," << "PFN:" << eventRangeMap["PFN"];
333  }
334  ostr << "," << eventRangeMap["startEvent"]
335  << "," << eventRangeMap["lastEvent"];
336  message2ProcessorStr = ostr.str();
337 
338  // CACHING MODE: first get an event range from the pilot, transform it into the tokens
339  // and only after that wait for a new range request by one of the processors
340  if(m_doCaching) {
341  ATH_MSG_DEBUG("Waiting for event range request from one of the processors");
342  while(processorWaitRequest.empty()) {
343  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
344  pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
345  usleep(1000);
346  }
347  ATH_MSG_INFO("One of the processors is ready for the next range");
348  // Get PID from the request and Update m_pid2RangeID
349  workerPid = std::atoi(processorWaitRequest.c_str());
350  auto it = m_pid2RangeID.find(workerPid);
351  if(it!=m_pid2RangeID.end()) {
352  m_pid2RangeID.erase(it);
353  }
354  }
355 
356  // Send to the Processor: RangeID,evtToken[,evtToken]
357  message2Processor = (char*)malloc(message2ProcessorStr.size());
358  memcpy(message2Processor,message2ProcessorStr.data(),message2ProcessorStr.size());
359  socket2Processor->send(message2Processor,message2ProcessorStr.size());
360  procReportPending++;
361 
362  // Get PID from the request and Update m_pid2RangeID
363  m_pid2RangeID[workerPid] = rangeID;
364  processorWaitRequest.clear();
365 
366  ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
367  }
368 
369  if(all_ok) {
370  // We are done distributing event tokens.
371  // Tell the workers that the event processing is over
372  // i.e. send out m_nprocs empty messages
373  void* emptyMess4Processor(0);
374  if(!processorWaitRequest.empty()) {
375  // We already have one processor waiting for the answer
376  emptyMess4Processor = malloc(1);
377  socket2Processor->send(emptyMess4Processor,1);
378  ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
379  processorWaitRequest.clear();
380  }
381  bool endLoop{false};
382  while(true) {
383  ATH_MSG_DEBUG("Going to set another processor free");
384  while(processorWaitRequest.empty()) {
385  processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
386  if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
387  endLoop = true;
388  break;
389  }
390  usleep(1000);
391  }
392  if(endLoop) break;
393  // Remove worker from m_pid2RangeID
394  workerPid = std::atoi(processorWaitRequest.c_str());
395  auto it = m_pid2RangeID.find(workerPid);
396  if(it!=m_pid2RangeID.end()) {
397  m_pid2RangeID.erase(it);
398  }
399  emptyMess4Processor = malloc(1);
400  socket2Processor->send(emptyMess4Processor,1);
401  ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
402  ATH_MSG_INFO("Still " << procReportPending << " pending reports");
403  processorWaitRequest.clear();
404  }
405  }
406 
407  if(m_appMgr->stop().isFailure()) {
408  ATH_MSG_ERROR("Unable to stop AppMgr");
409  all_ok=false;
410  }
411  else {
412  if(m_appMgr->finalize().isFailure()) {
413  std::cerr << "Unable to finalize AppMgr" << std::endl;
414  all_ok=false;
415  }
416  }
417 
418  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
419  outwork->data = malloc(sizeof(int));
420  *(int*)(outwork->data) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
421  outwork->size = sizeof(int);
422 
423  // ...
424  // (possible) TODO: extend outwork with some error message, which will be eventually
425  // reported in the master proces
426  // ...
427  delete socket2Processor;
428  delete socket2Pilot;
429  delete socketFactory;
430 
431  return outwork;
432 }
433 
434 std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::fin_func()
435 {
436  // Dummy
437  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
438  outwork->data = malloc(sizeof(int));
439  *(int*)(outwork->data) = 0; // Error code: for now use 0 success, 1 failure
440  outwork->size = sizeof(int);
441  return outwork;
442 }
443 
445 {
446  size_t i(0);
447  // get rid of leading spaces
448  while(i<str.size() && str[i]==' ') i++;
449  if(i) str = str.substr(i);
450 
451  if(str.empty()) return; // Corner case: string consists only of spaces
452 
453  // get rid of trailing spaces
454  i=str.size()-1;
455  while(str[i]==' ') i--;
456  if(i) str.resize(i+1);
457 
458  // the string might be enclosed by either
459  // "u\'" and "\'"
460  // or
461  // "\"" and "\""
462  // Get rid of them!
463  if(CxxUtils::starts_with (str, "u\'")) {
464  str = str.substr(2);
465  if(CxxUtils::ends_with (str, "\'")) {
466  str.resize(str.size()-1);
467  }
468  }
469  else if(CxxUtils::starts_with (str, "\"")) {
470  str = str.substr(1);
471  if(CxxUtils::ends_with (str, "\"")) {
472  str.resize(str.size()-1);
473  }
474  }
475 }
476 
477 std::string EvtRangeScatterer::getNewRangeRequest(yampl::ISocket* socket2Processor
478  , yampl::ISocket* socket2Pilot
479  , int& procReportPending)
480 {
481  void* processor_request(0);
482  std::string strPeerId;
483  ssize_t processorRequestSize = socket2Processor->tryRecv(processor_request,0,strPeerId);
484 
485  if(processorRequestSize==-1) return std::string("");
486  if(processorRequestSize==sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status)) {
487  ATH_MSG_INFO("Processor reported event range processing error");
488  pid_t pid = *((pid_t*)processor_request);
490  std::string errorStr("ERR_ATHENAMP_PROCESS "+ m_pid2RangeID[pid] + ": ");
491  switch(status) {
493  errorStr+=std::string("Not found in the input file");
494  break;
496  errorStr+=std::string("Seek failed");
497  break;
499  errorStr+=std::string("Failed to process event range");
500  break;
502  errorStr+=std::string("Failed to make output file");
503  break;
505  errorStr+=std::string("Failed to set input file");
506  break;
507  default:
508  break;
509  }
510  void* errorMessage = malloc(errorStr.size());
511  memcpy(errorMessage,errorStr.data(),errorStr.size());
512  socket2Pilot->send(errorMessage,errorStr.size());
513  procReportPending--;
514  ATH_MSG_INFO("Error reported to the pilot. Reports pending: " << procReportPending);
515  return std::string("");
516  }
517  std::string strProcessorRequest((const char*)processor_request,processorRequestSize);
518  ATH_MSG_INFO("Received request from a processor: " << strProcessorRequest);
519  // Decode the request. If it contains output file name then pass it over to the pilot and return empty string
520  if(CxxUtils::starts_with (strProcessorRequest, "/")) {
521  void* outpFileNameMessage = malloc(strProcessorRequest.size());
522  memcpy(outpFileNameMessage,strProcessorRequest.data(),strProcessorRequest.size());
523  socket2Pilot->send(outpFileNameMessage,strProcessorRequest.size());
524  procReportPending--;
525  ATH_MSG_INFO("Output file reported to the pilot. Reports pending: " << procReportPending);
526  return std::string("");
527  }
528  return strProcessorRequest;
529 }
530 
532  , yampl::ISocket* socket2Pilot
533  , int& procReportPending)
534 {
535  pid_t pid{0};
536  if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
537  && pid!=-1) {
538  ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
539  auto itPid = m_pid2RangeID.find(pid);
540  if(itPid!=m_pid2RangeID.end()) {
541  ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
542 
543  std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
544  void* errorMessage = malloc(errorStr.size());
545  memcpy(errorMessage,errorStr.data(),errorStr.size());
546  socket2Pilot->send(errorMessage,errorStr.size());
547  --procReportPending;
548  m_pid2RangeID.erase(pid);
549  }
550  ATH_MSG_INFO("Reports pending: " << procReportPending);
551  }
552  return pid;
553 }
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
EvtRangeScatterer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: EvtRangeScatterer.cxx:101
EvtRangeScatterer::initialize
virtual StatusCode initialize() override
Definition: EvtRangeScatterer.cxx:45
CxxUtils::starts_with
bool starts_with(const char *s, const char *prefix)
Test whether one null-terminated byte string starts with another.
EvtRangeScatterer::getNewRangeRequest
std::string getNewRangeRequest(yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
Definition: EvtRangeScatterer.cxx:477
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:62
AthenaMPToolBase::m_evtSelector
IEvtSelector * m_evtSelector
Definition: AthenaMPToolBase.h:94
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
EvtRangeScatterer::pollFailedPidQueue
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
Definition: EvtRangeScatterer.cxx:531
IEventShare.h
AthenaInterprocess::ScheduledWork::size
int size
Definition: IMessageDecoder.h:14
CxxUtils::ends_with
bool ends_with(const char *s, const char *suffix)
Test whether one null-terminated byte string ends with another.
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:88
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
EvtRangeScatterer::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: EvtRangeScatterer.cxx:109
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:83
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:56
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:32
skel.it
it
Definition: skel.GENtoEVGEN.py:423
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:362
EvtRangeScatterer::trimRangeStrings
void trimRangeStrings(std::string &)
Definition: EvtRangeScatterer.cxx:444
EvtRangeScatterer::finalize
virtual StatusCode finalize() override
Definition: EvtRangeScatterer.cxx:55
AthenaInterprocess::ScheduledWork::data
void * data
Definition: IMessageDecoder.h:13
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
EvtRangeScatterer::~EvtRangeScatterer
virtual ~EvtRangeScatterer() override
Definition: EvtRangeScatterer.cxx:41
ProcessGroup.h
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:59
EvtRangeScatterer.h
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:67
AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore
const ServiceHandle< StoreGateSvc > & detStore() const
The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.
Definition: AthCommonDataStore.h:95
EvtRangeScatterer::m_doCaching
bool m_doCaching
Definition: EvtRangeScatterer.h:65
EvtRangeScatterer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: EvtRangeScatterer.cxx:434
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
EvtRangeScatterer::m_eventRangeChannel
StringProperty m_eventRangeChannel
Definition: EvtRangeScatterer.h:64
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
RunTileMonitoring.keyValue
keyValue
Definition: RunTileMonitoring.py:150
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:91
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EvtRangeScatterer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: EvtRangeScatterer.cxx:191
lumiFormat.i
int i
Definition: lumiFormat.py:92
EvtRangeScatterer::EvtRangeScatterer
EvtRangeScatterer()
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaInterprocess::ProcessGroup
Definition: ProcessGroup.h:27
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
EvtRangeScatterer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: EvtRangeScatterer.cxx:115
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:129
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:341
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:55
starts_with.h
C++20-like starts_with/ends_with for strings.
pool_uuid.guid
guid
Definition: pool_uuid.py:112
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:60
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
checkRpcDigits.errorStr
string errorStr
Definition: checkRpcDigits.py:182
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:93
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:278
grepfile.filenames
list filenames
Definition: grepfile.py:34
EvtRangeScatterer::m_processorChannel
StringProperty m_processorChannel
Definition: EvtRangeScatterer.h:63
EvtRangeScatterer::m_pid2RangeID
Pid2RangeID m_pid2RangeID
Definition: EvtRangeScatterer.h:66
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
SharedQueue.h
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
str
Definition: BTagTrackIpAccessor.cxx:11
merge.status
status
Definition: merge.py:17
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:84
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:58
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:425
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:25
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:66
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:369
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:61