|
ATLAS Offline Software
|
Go to the documentation of this file.
33 #include <TObjString.h>
41 using namespace EL::msgEventLoop;
90 submit (
const Job&
job,
const std::string& location)
const
94 std::string actualLocation = submitOnly (
job, location);
96 wait (actualLocation);
97 return actualLocation;
103 submitOnly (
const Job&
job,
const std::string& location)
const
107 Detail::ManagerData
data;
108 data.addManager (std::make_unique<Detail::BaseManager> ());
109 data.addManager (std::make_unique<Detail::SubmitDirManager> ());
110 data.addManager (std::make_unique<Detail::DriverManager> ());
111 data.addManager (std::make_unique<Detail::SubmitManager> ());
115 data.submitDir = location;
117 if (
data.run().isFailure())
118 throw std::runtime_error (
"failed to submit job");
119 return data.submitDir;
125 resubmit (
const std::string& location,
126 const std::string& option)
128 Detail::ManagerData
data;
129 data.addManager (std::make_unique<Detail::BaseManager> ());
130 data.addManager (std::make_unique<Detail::SubmitDirManager> ());
131 data.addManager (std::make_unique<Detail::DriverManager> ());
132 data.addManager (std::make_unique<Detail::SubmitManager> ());
133 data.submitDir = location;
135 std::unique_ptr<TFile>
file (TFile::Open ((location +
"/driver.root").c_str(),
"READ"));
136 std::unique_ptr<Driver>
driver (
dynamic_cast<Driver*
>(
file->Get (
"driver")));
140 data.resubmit =
true;
141 data.resubmitOption = option;
142 if (
data.run().isFailure())
143 throw std::runtime_error (
"failed to resubmit job");
149 retrieve (
const std::string& location)
151 Detail::ManagerData
data;
152 data.addManager (std::make_unique<Detail::BaseManager> ());
153 data.addManager (std::make_unique<Detail::SubmitDirManager> ());
154 data.addManager (std::make_unique<Detail::DriverManager> ());
155 data.addManager (std::make_unique<Detail::RetrieveManager> ());
156 data.submitDir = location;
158 std::unique_ptr<TFile>
file (TFile::Open ((location +
"/driver.root").c_str(),
"READ"));
160 throw std::runtime_error (
"failed to open driver file");
161 std::unique_ptr<Driver>
driver (
dynamic_cast<Driver*
>(
file->Get (
"driver")));
165 if (
data.run().isFailure())
166 throw std::runtime_error (
"failed to retrieve job");
167 return data.completed;
173 wait (
const std::string& location,
unsigned time)
189 if (abortRetrieve) {
return false; }
190 ANA_MSG_INFO (
"not all worker jobs finished yet, waiting " << time <<
" seconds");
191 for (
unsigned i = 0;
i != time; ++
i)
193 if (abortRetrieve) {
return false; }
208 std::ifstream
file ((location +
"/location").c_str());
209 if (!std::getline (
file, from))
210 RCU_THROW_MSG (
"failed to read submit location from " + location +
"/location");
212 std::string
to = location;
213 while (!
to.empty() &&
to[
to.size()-1] ==
'/')
214 to.resize (
to.size()-1);
217 sh.load (location +
"/hist");
218 sh.updateLocation (from,
to);
219 sh.save (location +
"/hist");
224 if (
list.fileName().find (
"output-") == 0)
228 sh.updateLocation (from,
to);
233 std::ofstream
file ((location +
"/location").c_str());
246 const std::string
name
247 =
data.submitDir +
"/data-" +
out->label();
253 const std::string name2 =
name +
"/" + (*sample)->name() +
".root";
254 std::unique_ptr<SH::SampleLocal> mysample
256 mysample->
add (name2);
257 sh.add (mysample.release());
259 sh.fetch (
data.job->sampleHandler());
260 sh.save (
data.submitDir +
"/output-" +
out->label());
270 sh_hist.
load (
data.submitDir +
"/hist");
281 std::unique_ptr<SH::SampleLocal> mysample
283 TList *
list =
dynamic_cast<TList*
>(histSample->
readHist (
"EventLoop_OutputStream_" +
out->label()));
287 for (TIter iter (
list); (
obj = iter.Next ()); )
289 TObjString *
str =
dynamic_cast<TObjString*
>(
obj);
291 mysample->
add (
str->GetString().Data());
295 sh.add (mysample.release());
297 sh.fetch (
data.job->sampleHandler());
298 sh.save (
data.submitDir +
"/output-" +
out->label());
307 return ::StatusCode::SUCCESS;
def retrieve(aClass, aKey=None)
SH::MetaObject * options()
the list of options to jobs with this driver
char data[hepevt_bytes_allocation_ATLAS]
virtual ::StatusCode doManagerStep(Detail::ManagerData &data) const
std::vector< Sample * >::const_iterator iterator
the iterator to use
static bool retrieve(const std::string &location)
retrieve all the output for the job in the given location
std::string submitOnly(const Job &job, const std::string &location) const
submit the given job with the given output location and return immediately
TObject * readHist(const std::string &name) const
read an object from a histogram file
the base class for the various EventLoop drivers that allow to run jobs on different backends
static void mergedOutputSave(Detail::ManagerData &data)
create and save a sample handler assuming we created all the merged files at the requested locations
void handler(int sig)
signal handler
#define RCU_ASSERT2_SOFT(x, y)
::StatusCode StatusCode
StatusCode definition for legacy code.
void testInvariant() const
test the invariant of this object
This module defines the arguments passed from the BATCH driver to the BATCH worker.
MetaObject * meta()
the meta-information for this sample
Driver()
standard default constructor
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
void load(const std::string &directory)
load all the samples from the given directory
static bool wait(const std::string &location, unsigned time=60)
retrieve all the output for the job in the given location and wait until it is finished completely.
static void diskOutputSave(Detail::ManagerData &data)
make the output sample handler for the given job or stream from the information stored in the histogr...
ClassImp(EL::Driver) namespace EL
a DiskList implementation for local directories
const OutputStream * outputIter
A Sample based on a simple file list.
static bool abortRetrieve
this flag is set to true when the wait() function is running and a SIGINT is caught,...
static void updateLocation(const std::string &location)
update the internal location of files, after moving the submission directory
A class that manages a list of Sample objects.
#define RCU_THROW_MSG(message)
Sample * get(const std::string &name)
get the sample with the given name
void add(const std::string &file)
add a file to the list
#define RCU_READ_INVARIANT(x)
static void resubmit(const std::string &location, const std::string &option)
resubmit all failed sub-jobs for the job in the given location
std::string submit(const Job &job, const std::string &location) const
submit the given job with the given output location and wait for it to finish
#define RCU_NEW_INVARIANT(x)