ATLAS Offline Software
Functions
Driver.cxx File Reference
#include <EventLoop/Driver.h>
#include <EventLoop/BaseManager.h>
#include <EventLoop/DriverManager.h>
#include <EventLoop/Job.h>
#include <EventLoop/ManagerData.h>
#include <EventLoop/ManagerOrder.h>
#include <EventLoop/ManagerStep.h>
#include <EventLoop/MessageCheck.h>
#include <EventLoop/MetricsSvc.h>
#include <EventLoop/OutputStream.h>
#include <EventLoop/RetrieveManager.h>
#include <EventLoop/SubmitDirManager.h>
#include <EventLoop/SubmitManager.h>
#include <RootCoreUtils/RootUtils.h>
#include <RootCoreUtils/ThrowMsg.h>
#include <SampleHandler/DiskListLocal.h>
#include <SampleHandler/Sample.h>
#include <SampleHandler/SampleHist.h>
#include <SampleHandler/SampleLocal.h>
#include <TFile.h>
#include <TObjString.h>
#include <TSystem.h>
#include <cstddef>
#include <fstream>
#include <iostream>
#include <memory>
#include <signal.h>

Go to the source code of this file.

Functions

 ClassImp (EL::Driver) namespace EL
 

Function Documentation

◆ ClassImp()

ClassImp ( EL::Driver  )

Definition at line 47 of file Driver.cxx.

50 {
51  bool EL::Driver::abortRetrieve(false);
52 
53 
54 
55  void Driver ::
56  testInvariant () const
57  {
58  RCU_INVARIANT (this != 0);
59  }
60 
61 
62 
63  Driver ::
64  Driver ()
65  {
66  RCU_NEW_INVARIANT (this);
67  }
68 
69 
70 
72  options ()
73  {
74  RCU_READ_INVARIANT (this);
75  return &m_options;
76  }
77 
78 
79 
81  options () const
82  {
83  RCU_READ_INVARIANT (this);
84  return &m_options;
85  }
86 
87 
88 
89  std::string Driver ::
90  submit (const Job& job, const std::string& location) const
91  {
92  // no invariant used
93 
94  std::string actualLocation = submitOnly (job, location);
95  ANA_MSG_DEBUG ("wait on: " << actualLocation);
96  wait (actualLocation);
97  return actualLocation;
98  }
99 
100 
101 
102  std::string Driver ::
103  submitOnly (const Job& job, const std::string& location) const
104  {
105  RCU_READ_INVARIANT (this);
106 
107  Detail::ManagerData data;
108  data.addManager (std::make_unique<Detail::BaseManager> ());
109  data.addManager (std::make_unique<Detail::SubmitDirManager> ());
110  data.addManager (std::make_unique<Detail::DriverManager> ());
111  data.addManager (std::make_unique<Detail::SubmitManager> ());
112 
113  Job myjob = job;
114  data.driver = this;
115  data.submitDir = location;
116  data.job = &myjob;
117  if (data.run().isFailure())
118  throw std::runtime_error ("failed to submit job");
119  return data.submitDir;
120  }
121 
122 
123 
124  void Driver ::
125  resubmit (const std::string& location,
126  const std::string& option)
127  {
128  Detail::ManagerData data;
129  data.addManager (std::make_unique<Detail::BaseManager> ());
130  data.addManager (std::make_unique<Detail::SubmitDirManager> ());
131  data.addManager (std::make_unique<Detail::DriverManager> ());
132  data.addManager (std::make_unique<Detail::SubmitManager> ());
133  data.submitDir = location;
134 
135  std::unique_ptr<TFile> file (TFile::Open ((location + "/driver.root").c_str(), "READ"));
136  std::unique_ptr<Driver> driver (dynamic_cast<Driver*>(file->Get ("driver")));
137  RCU_ASSERT2_SOFT (driver.get() != 0, "failed to read driver");
138  data.driver = driver.get();
139 
140  data.resubmit = true;
141  data.resubmitOption = option;
142  if (data.run().isFailure())
143  throw std::runtime_error ("failed to resubmit job");
144  }
145 
146 
147 
148  bool Driver ::
149  retrieve (const std::string& location)
150  {
151  Detail::ManagerData data;
152  data.addManager (std::make_unique<Detail::BaseManager> ());
153  data.addManager (std::make_unique<Detail::SubmitDirManager> ());
154  data.addManager (std::make_unique<Detail::DriverManager> ());
155  data.addManager (std::make_unique<Detail::RetrieveManager> ());
156  data.submitDir = location;
157 
158  std::unique_ptr<TFile> file (TFile::Open ((location + "/driver.root").c_str(), "READ"));
159  if (!file || file->IsZombie())
160  throw std::runtime_error ("failed to open driver file");
161  std::unique_ptr<Driver> driver (dynamic_cast<Driver*>(file->Get ("driver")));
162  RCU_ASSERT2_SOFT (driver.get() != 0, "failed to read driver");
163  data.driver = driver.get();
164 
165  if (data.run().isFailure())
166  throw std::runtime_error ("failed to retrieve job");
167  return data.completed;
168  }
169 
170 
171 
172  bool Driver ::
173  wait (const std::string& location, unsigned time)
174  {
175  // no invariant used
176 
177  struct SigTrap {
178  static void handler (int)
179  {
181  ANA_MSG_INFO ("\nAborting...");
182  }
183  SigTrap() { signal (SIGINT, &handler); }
184  ~SigTrap() { signal (SIGINT, SIG_DFL); EL::Driver::abortRetrieve = false; }
185  } sigTrap;
186 
187  while (!retrieve (location))
188  {
189  if (abortRetrieve) { return false; }
190  ANA_MSG_INFO ("not all worker jobs finished yet, waiting " << time << " seconds");
191  for (unsigned i = 0; i != time; ++i)
192  {
193  if (abortRetrieve) { return false; }
194  sleep (1);
195  }
196  ANA_MSG_INFO ("rechecking jobs");
197  }
198  return true;
199  }
200 
201 
202 
203  void Driver ::
204  updateLocation (const std::string& location)
205  {
206  std::string from;
207  {
208  std::ifstream file ((location + "/location").c_str());
209  if (!std::getline (file, from))
210  RCU_THROW_MSG ("failed to read submit location from " + location + "/location");
211  }
212  std::string to = location;
213  while (!to.empty() && to[to.size()-1] == '/')
214  to.resize (to.size()-1);
215  {
217  sh.load (location + "/hist");
218  sh.updateLocation (from, to);
219  sh.save (location + "/hist");
220  }
221  SH::DiskListLocal list (location);
222  while (list.next())
223  {
224  if (list.fileName().find ("output-") == 0)
225  {
227  sh.load (list.path());
228  sh.updateLocation (from, to);
229  sh.save (list.path());
230  }
231  }
232  {
233  std::ofstream file ((location + "/location").c_str());
234  file << to << std::endl;
235  }
236  }
237 
238 
239 
240  void Driver ::
241  mergedOutputSave (Detail::ManagerData& data)
242  {
243  for (Job::outputIter out = data.job->outputBegin(),
244  end = data.job->outputEnd(); out != end; ++ out)
245  {
246  const std::string name
247  = data.submitDir + "/data-" + out->label();
248 
250  for (SH::SampleHandler::iterator sample = data.job->sampleHandler().begin(),
251  end = data.job->sampleHandler().end(); sample != end; ++ sample)
252  {
253  const std::string name2 = name + "/" + (*sample)->name() + ".root";
254  std::unique_ptr<SH::SampleLocal> mysample
255  (new SH::SampleLocal ((*sample)->name()));
256  mysample->add (name2);
257  sh.add (mysample.release());
258  }
259  sh.fetch (data.job->sampleHandler());
260  sh.save (data.submitDir + "/output-" + out->label());
261  }
262  }
263 
264 
265 
266  void Driver ::
267  diskOutputSave (Detail::ManagerData& data)
268  {
269  SH::SampleHandler sh_hist;
270  sh_hist.load (data.submitDir + "/hist");
271 
272  for (Job::outputIter out = data.job->outputBegin(),
273  end = data.job->outputEnd(); out != end; ++ out)
274  {
276  for (SH::SampleHandler::iterator sample = data.job->sampleHandler().begin(),
277  end = data.job->sampleHandler().end(); sample != end; ++ sample)
278  {
279  SH::Sample *histSample = sh_hist.get ((*sample)->name());
280  RCU_ASSERT (histSample != 0);
281  std::unique_ptr<SH::SampleLocal> mysample
282  (new SH::SampleLocal ((*sample)->name()));
283  TList *list = dynamic_cast<TList*>(histSample->readHist ("EventLoop_OutputStream_" + out->label()));
284  if (list != 0)
285  {
286  TObject *obj = 0;
287  for (TIter iter (list); (obj = iter.Next ()); )
288  {
289  TObjString *str = dynamic_cast<TObjString*>(obj);
290  RCU_ASSERT (str != 0);
291  mysample->add (str->GetString().Data());
292  }
293  }
294  mysample->meta()->fetch (*out->options());
295  sh.add (mysample.release());
296  }
297  sh.fetch (data.job->sampleHandler());
298  sh.save (data.submitDir + "/output-" + out->label());
299  }
300  }
301 
302 
303 
304  ::StatusCode Driver ::
305  doManagerStep (Detail::ManagerData& /*data*/) const
306  {
307  return ::StatusCode::SUCCESS;
308  }
309 }
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
SH::SampleHandler::iterator
std::vector< Sample * >::const_iterator iterator
the iterator to use
Definition: SampleHandler.h:475
SH::MetaObject
A class that manages meta-data to be associated with an object.
Definition: MetaObject.h:56
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
SH::Sample::readHist
TObject * readHist(const std::string &name) const
read an object from a histogram file
FullCPAlgorithmsTest_eljob.driver
driver
Definition: FullCPAlgorithmsTest_eljob.py:171
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
handler
void handler(int sig)
signal handler
Definition: rmain.cxx:98
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
RCU_ASSERT2_SOFT
#define RCU_ASSERT2_SOFT(x, y)
Definition: Assert.h:169
FullCPAlgorithmsTest_eljob.sh
sh
Definition: FullCPAlgorithmsTest_eljob.py:111
FullCPAlgorithmsTest_eljob.sample
sample
Definition: FullCPAlgorithmsTest_eljob.py:113
lumiFormat.i
int i
Definition: lumiFormat.py:85
RCU::Shell
Definition: ShellExec.cxx:28
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ANA_MSG_INFO
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:290
file
TFile * file
Definition: tile_monitor.h:29
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:379
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
SH::Sample
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition: Sample.h:54
SH::SampleHandler::load
void load(const std::string &directory)
load all the samples from the given directory
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
SH::DiskListLocal
a DiskList implementation for local directories
Definition: DiskListLocal.h:27
CxxUtils::to
CONT to(RANGE &&r)
Definition: ranges.h:39
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
SH::SampleLocal
A Sample based on a simple file list.
Definition: SampleLocal.h:38
EL::Driver::abortRetrieve
static bool abortRetrieve
this flag is set to true when the wait() function is running and a SIGINT is caught,...
Definition: Driver.h:212
SH::SampleHandler
A class that manages a list of Sample objects.
Definition: SampleHandler.h:60
str
Definition: BTagTrackIpAccessor.cxx:11
RCU_THROW_MSG
#define RCU_THROW_MSG(message)
Definition: PrintMsg.h:58
SH::SampleHandler::get
Sample * get(const std::string &name)
get the sample with the given name
test_interactive_athena.job
job
Definition: test_interactive_athena.py:6
python.PyAthena.obj
obj
Definition: PyAthena.py:132
RCU_ASSERT
#define RCU_ASSERT(x)
Definition: Assert.h:222
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
ANA_MSG_DEBUG
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:288
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233