ATLAS Offline Software
Loading...
Searching...
No Matches
Driver.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7
8//
9// includes
10//
11
12#include <EventLoop/Driver.h>
13
16#include <EventLoop/Job.h>
32#include <TFile.h>
33#include <TObjString.h>
34#include <TSystem.h>
35#include <cstddef>
36#include <fstream>
37#include <iostream>
38#include <memory>
39#include <signal.h>
40
41using namespace EL::msgEventLoop;
42
43//
44// method implementations
45//
46
48
49namespace EL
50{
51 bool EL::Driver::abortRetrieve(false);
52
53
54
55 void Driver ::
56 testInvariant () const
57 {
58 RCU_INVARIANT (this != 0);
59 }
60
61
62
63 Driver ::
64 Driver ()
65 {
66 RCU_NEW_INVARIANT (this);
67 }
68
69
70
71 SH::MetaObject *Driver ::
72 options ()
73 {
74 RCU_READ_INVARIANT (this);
75 return &m_options;
76 }
77
78
79
80 const SH::MetaObject *Driver ::
81 options () const
82 {
83 RCU_READ_INVARIANT (this);
84 return &m_options;
85 }
86
87
88
89 std::string Driver ::
90 submit (const Job& job, const std::string& location) const
91 {
92 // no invariant used
93
94 std::string actualLocation = submitOnly (job, location);
95 ANA_MSG_DEBUG ("wait on: " << actualLocation);
96 wait (actualLocation);
97 return actualLocation;
98 }
99
100
101
102 std::string Driver ::
103 submitOnly (const Job& job, const std::string& location) const
104 {
105 RCU_READ_INVARIANT (this);
106
107 Detail::ManagerData data;
108 data.addManager (std::make_unique<Detail::BaseManager> ());
109 data.addManager (std::make_unique<Detail::SubmitDirManager> ());
110 data.addManager (std::make_unique<Detail::DriverManager> ());
111 data.addManager (std::make_unique<Detail::SubmitManager> ());
112
113 Job myjob = job;
114 data.driver = this;
115 data.submitDir = location;
116 data.job = &myjob;
117 if (data.run().isFailure())
118 throw std::runtime_error ("failed to submit job");
119 return data.submitDir;
120 }
121
122
123
124 void Driver ::
125 resubmit (const std::string& location,
126 const std::string& option)
127 {
128 Detail::ManagerData data;
129 data.addManager (std::make_unique<Detail::BaseManager> ());
130 data.addManager (std::make_unique<Detail::SubmitDirManager> ());
131 data.addManager (std::make_unique<Detail::DriverManager> ());
132 data.addManager (std::make_unique<Detail::SubmitManager> ());
133 data.submitDir = location;
134
135 std::unique_ptr<TFile> file (TFile::Open ((location + "/driver.root").c_str(), "READ"));
136 std::unique_ptr<Driver> driver (dynamic_cast<Driver*>(file->Get ("driver")));
137 RCU_ASSERT2_SOFT (driver.get() != 0, "failed to read driver");
138 data.driver = driver.get();
139
140 data.resubmit = true;
141 data.resubmitOption = option;
142 if (data.run().isFailure())
143 throw std::runtime_error ("failed to resubmit job");
144 }
145
146
147
148 bool Driver ::
149 retrieve (const std::string& location)
150 {
151 Detail::ManagerData data;
152 data.addManager (std::make_unique<Detail::BaseManager> ());
153 data.addManager (std::make_unique<Detail::SubmitDirManager> ());
154 data.addManager (std::make_unique<Detail::DriverManager> ());
155 data.addManager (std::make_unique<Detail::RetrieveManager> ());
156 data.submitDir = location;
157
158 std::unique_ptr<TFile> file (TFile::Open ((location + "/driver.root").c_str(), "READ"));
159 if (!file || file->IsZombie())
160 throw std::runtime_error ("failed to open driver file");
161 std::unique_ptr<Driver> driver (dynamic_cast<Driver*>(file->Get ("driver")));
162 RCU_ASSERT2_SOFT (driver.get() != 0, "failed to read driver");
163 data.driver = driver.get();
164
165 if (data.run().isFailure())
166 throw std::runtime_error ("failed to retrieve job");
167 return data.completed;
168 }
169
170
171
172 bool Driver ::
173 wait (const std::string& location, unsigned time)
174 {
175 // no invariant used
176
177 struct SigTrap {
178 static void handler (int)
179 {
181 ANA_MSG_INFO ("\nAborting...");
182 }
183 SigTrap() { signal (SIGINT, &handler); }
184 ~SigTrap() { signal (SIGINT, SIG_DFL); EL::Driver::abortRetrieve = false; }
185 } sigTrap;
186
187 while (!retrieve (location))
188 {
189 if (abortRetrieve) { return false; }
190 ANA_MSG_INFO ("not all worker jobs finished yet, waiting " << time << " seconds");
191 for (unsigned i = 0; i != time; ++i)
192 {
193 if (abortRetrieve) { return false; }
194 sleep (1);
195 }
196 ANA_MSG_INFO ("rechecking jobs");
197 }
198 return true;
199 }
200
201
202
203 void Driver ::
204 updateLocation (const std::string& location)
205 {
206 std::string from;
207 {
208 std::ifstream file ((location + "/location").c_str());
209 if (!std::getline (file, from))
210 RCU_THROW_MSG ("failed to read submit location from " + location + "/location");
211 }
212 std::string to = location;
213 while (!to.empty() && to[to.size()-1] == '/')
214 to.resize (to.size()-1);
215 {
217 sh.load (location + "/hist");
218 sh.updateLocation (from, to);
219 sh.save (location + "/hist");
220 }
221 SH::DiskListLocal list (location);
222 while (list.next())
223 {
224 if (list.fileName().find ("output-") == 0)
225 {
227 sh.load (list.path());
228 sh.updateLocation (from, to);
229 sh.save (list.path());
230 }
231 }
232 {
233 std::ofstream file ((location + "/location").c_str());
234 file << to << std::endl;
235 }
236 }
237
238
239
240 void Driver ::
241 mergedOutputSave (Detail::ManagerData& data)
242 {
243 for (Job::outputIter out = data.job->outputBegin(),
244 end = data.job->outputEnd(); out != end; ++ out)
245 {
246 const std::string name
247 = data.submitDir + "/data-" + out->label();
248
250 for (SH::SampleHandler::iterator sample = data.job->sampleHandler().begin(),
251 end = data.job->sampleHandler().end(); sample != end; ++ sample)
252 {
253 const std::string name2 = name + "/" + (*sample)->name() + ".root";
254 std::unique_ptr<SH::SampleLocal> mysample
255 (new SH::SampleLocal ((*sample)->name()));
256 mysample->add (name2);
257 sh.add (mysample.release());
258 }
259 sh.fetch (data.job->sampleHandler());
260 sh.save (data.submitDir + "/output-" + out->label());
261 }
262 }
263
264
265
266 void Driver ::
267 diskOutputSave (Detail::ManagerData& data)
268 {
269 SH::SampleHandler sh_hist;
270 sh_hist.load (data.submitDir + "/hist");
271
272 for (Job::outputIter out = data.job->outputBegin(),
273 end = data.job->outputEnd(); out != end; ++ out)
274 {
276 for (SH::SampleHandler::iterator sample = data.job->sampleHandler().begin(),
277 end = data.job->sampleHandler().end(); sample != end; ++ sample)
278 {
279 SH::Sample *histSample = sh_hist.get ((*sample)->name());
280 RCU_ASSERT (histSample != 0);
281 std::unique_ptr<SH::SampleLocal> mysample
282 (new SH::SampleLocal ((*sample)->name()));
283 TList *list = dynamic_cast<TList*>(histSample->readHist ("EventLoop_OutputStream_" + out->label()));
284 if (list != 0)
285 {
286 TObject *obj = 0;
287 for (TIter iter (list); (obj = iter.Next ()); )
288 {
289 TObjString *str = dynamic_cast<TObjString*>(obj);
290 RCU_ASSERT (str != 0);
291 mysample->add (str->GetString().Data());
292 }
293 }
294 mysample->meta()->fetch (*out->options());
295 sh.add (mysample.release());
296 }
297 sh.fetch (data.job->sampleHandler());
298 sh.save (data.submitDir + "/output-" + out->label());
299 }
300 }
301
302
303
304 ::StatusCode Driver ::
305 doManagerStep (Detail::ManagerData& /*data*/) const
306 {
307 return ::StatusCode::SUCCESS;
308 }
309}
#define RCU_INVARIANT(x)
Definition Assert.h:201
#define RCU_ASSERT(x)
Definition Assert.h:222
#define RCU_ASSERT2_SOFT(x, y)
Definition Assert.h:169
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
ClassImp(EL::Driver) namespace EL
Definition Driver.cxx:47
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:58
the base class for the various EventLoop drivers that allow to run jobs on different backends
Definition Driver.h:28
static bool abortRetrieve
this flag is set to true when the wait() function is running and a SIGINT is caught,...
Definition Driver.h:212
const OutputStream * outputIter
Definition Job.h:143
a DiskList implementation for local directories
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:56
A class that manages a list of Sample objects.
Sample * get(const std::string &name)
get the sample with the given name
std::vector< Sample * >::const_iterator iterator
the iterator to use
void load(const std::string &directory)
load all the samples from the given directory
A Sample based on a simple file list.
Definition SampleLocal.h:38
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition Sample.h:54
TObject * readHist(const std::string &name) const
read an object from a histogram file
This module defines the arguments passed from the BATCH driver to the BATCH worker.
void handler(int sig)
signal handler
Definition rmain.cxx:99
TFile * file