ATLAS Offline Software
Loading...
Searching...
No Matches
PrunDriver.cxx File Reference
#include <EventLoopGrid/PrunDriver.h>
#include <EventLoop/Algorithm.h>
#include <EventLoop/ManagerData.h>
#include <EventLoop/ManagerStep.h>
#include <EventLoop/Job.h>
#include <EventLoop/MessageCheck.h>
#include <EventLoop/OutputStream.h>
#include <PathResolver/PathResolver.h>
#include <RootCoreUtils/Assert.h>
#include <RootCoreUtils/hadd.h>
#include <RootCoreUtils/ExceptionMsg.h>
#include <RootCoreUtils/ShellExec.h>
#include <SampleHandler/MetaObject.h>
#include <SampleHandler/Sample.h>
#include <SampleHandler/SampleGrid.h>
#include <SampleHandler/SampleHandler.h>
#include <SampleHandler/GridTools.h>
#include <TList.h>
#include <TPython.h>
#include <TROOT.h>
#include <TFile.h>
#include <TSystem.h>
#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>
#include <vector>
#include <stdexcept>
#include <ranges>
#include "pool.h"
#include <mutex>

Go to the source code of this file.

Functions

 ClassImp (EL::PrunDriver) namespace
static JobState::Enum sampleState (SH::Sample *sample)
static JobState::Enum nextState (JobState::Enum state, Status::Enum status)
static SH::MetaObject defaultOpts ()
static bool downloadContainer (const std::string &name, const std::string &location)
static Status::Enum submit (SH::Sample *const sample, const bool isFirstSample)
static Status::Enum checkPandaTask (SH::Sample *const sample)
static Status::Enum download (SH::Sample *const sample)
static Status::Enum merge (SH::Sample *const sample)
static void processTask (SH::Sample *const sample, const bool isFirstSample)
static void processAllInState (const SH::SampleHandler &sh, JobState::Enum state, const size_t nThreads)
static std::string formatOutputName (const SH::MetaObject &sampleMeta, const std::string &pattern)
std::string outputFileNames (const EL::Job &job)
static void saveJobDef (const std::string &fileName, const EL::Job &job, const SH::SampleHandler sh)
static SH::SampleHandler outputSH (const SH::SampleHandler &in, const std::string &outputLabel)

Function Documentation

◆ checkPandaTask()

Status::Enum checkPandaTask ( SH::Sample *const sample)
static

Definition at line 231 of file PrunDriver.cxx.

232{
233 RCU_REQUIRE(sample);
234 RCU_REQUIRE(static_cast<int>(sample->meta()->castDouble("nc_jediTaskID",0, SH::MetaObject::CAST_NOCAST_DEFAULT)) > 100);
235
236 static bool loaded = false;
237 if (not loaded) {
238 std::string path = PathResolverFindCalibFile("EventLoopGrid/ELG_jediState.py");
239 TPython::LoadMacro(path.c_str());
240 loaded = true;
241 }
242
243 TPython::Bind(dynamic_cast<TObject*>(sample), "ELG_SAMPLE");
244#if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
245 std::any result;
246 TPython::Exec("_anyresult = ROOT.std.make_any['int'](ELG_jediState(ELG_SAMPLE))", &result);
247 int ret = std::any_cast<int>(result);
248#else
249 int ret = TPython::Eval("ELG_jediState(ELG_SAMPLE)");
250#endif
251 TPython::Bind(0, "ELG_SAMPLE");
252
253 if (ret == Status::DONE) return Status::DONE;
254 if (ret == Status::FAIL) return Status::FAIL;
255
256 // Value 90 corresponds to `running` state of the job
257 if (ret != 90) { sample->meta()->setString("nc_ELG_state_details", "task status other than done/finished/failed/running"); }
258 // Value 99 is returned if there is error in the script (import, missing ID)
259 if (ret == 99) {
260 sample->meta()->setString("nc_ELG_state_details",
261 "problem checking jedi task status");
262 }
263
264 return Status::PENDING;
265}
#define RCU_REQUIRE(x)
Definition Assert.h:208
std::string PathResolverFindCalibFile(const std::string &logical_file_name)
@ CAST_NOCAST_DEFAULT
cast and return the default value if the input has the wrong type
Definition MetaObject.h:78
path
python interpreter configuration --------------------------------------—
Definition athena.py:128

◆ ClassImp()

ClassImp ( EL::PrunDriver )
Author
Alexander Madsen
Nils Krumnack

Definition at line 48 of file PrunDriver.cxx.

50 {
51 namespace JobState {
52 static const unsigned int NSTATES = 6;
53 enum Enum { INIT=0, RUN=1, DOWNLOAD=2, MERGE=3, FINISHED=4, FAILED=5 };
54 static const char* name[NSTATES] =
55 { "INIT", "RUNNING", "DOWNLOAD", "MERGE", "FINISHED", "FAILED" };
56 Enum parse(const std::string& what)
57 {
58 for (unsigned int i = 0; i != NSTATES; ++i) {
59 if (what == name[i]) { return static_cast<Enum>(i); }
60 }
61 RCU_ASSERT0("Failed to parse job state string");
62 throw std::runtime_error("PrunDriver.cxx: Failed to parse job state string"); //compiler dummy
63 }
64 }
65
66 // When changing the values in the enum make sure
67 // corresponding values in `data/ELG_jediState.py` script
68 // are changed accordingly
69 namespace Status {
70 //static const int NSTATES = 3;
71 enum Enum { DONE=0, PENDING=1, FAIL=2 };
72 }
73
74 struct TransitionRule {
75 JobState::Enum fromState;
76 Status::Enum status;
77 JobState::Enum toState;
78 TransitionRule(JobState::Enum fromState,
79 Status::Enum status,
80 JobState::Enum toState)
81 : fromState(fromState)
82 , status(status)
83 , toState(toState)
84 {
85 }
86 };
87
88 struct TmpCd {
89 const std::string origDir;
90 TmpCd(const std::string & dir)
91 : origDir(gSystem->pwd())
92 {
93 gSystem->cd(dir.c_str());
94 }
95 ~TmpCd()
96 {
97 gSystem->cd(origDir.c_str());
98 }
99 };
100}
#define RCU_ASSERT0(y)
Definition Assert.h:226
#define INIT(__TYPE)
std::map< std::string, std::string > parse(const std::string &list)
Status
Athena specific StatusCode values.
status
Definition merge.py:16

◆ defaultOpts()

SH::MetaObject defaultOpts ( )
static

Definition at line 139 of file PrunDriver.cxx.

140{
142 o.setString("nc_nGBPerJob", "MAX");
143 o.setString("nc_mergeOutput", "true");
144 o.setString("nc_cmtConfig", gSystem->ExpandPathName("$AnalysisBase_PLATFORM"));
145 o.setString("nc_useAthenaPackages", "true");
146 const std::string mergestr = "elg_merge jobdef.root %OUT %IN";
147 o.setString("nc_mergeScript", mergestr);
148 return o;
149}
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:56
void setString(const std::string &name, const std::string &value)
set the meta-data string with the given name

◆ download()

Status::Enum download ( SH::Sample *const sample)
static

Definition at line 267 of file PrunDriver.cxx.

268{
269 RCU_REQUIRE(sample);
270
271 {
272 static std::mutex mutex;
273 std::lock_guard<std::mutex> lock(mutex);
274 std::cout << "Downloading output from: "
275 << sample->name() << "..." << std::endl;
276 }
277
278 std::string container = sample->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
279 RCU_ASSERT(not container.empty());
280 if (container[container.size()-1] == '/') {
281 container.resize(container.size() - 1);
282 }
283 container += "_hist/";
284
285 bool downloadOk = downloadContainer(container, "elg/download/" + container);
286
287 if (not downloadOk) {
288 std::cerr << "Failed to download one or more files" << std::endl;
289 sample->meta()->setString("nc_ELG_state_details",
290 "error, check log for details");
291 return Status::PENDING;
292 }
293
294 return Status::DONE;
295}
#define RCU_ASSERT(x)
Definition Assert.h:222
static bool downloadContainer(const std::string &name, const std::string &location)
STL class.

◆ downloadContainer()

bool downloadContainer ( const std::string & name,
const std::string & location )
static

Definition at line 151 of file PrunDriver.cxx.

153{
154 RCU_ASSERT(not name.empty());
155 RCU_ASSERT(name[name.size()-1] == '/');
156 RCU_ASSERT(not location.empty());
157
158 try {
159 gSystem->Exec(Form("mkdir -p %s", location.c_str()));
160
161 std::vector<std::string> datasets;
162 for (auto& entry : SH::rucioListDids (name))
163 {
164 if (entry.type == "CONTAINER" || entry.type == "DIDType.CONTAINER")
165 datasets.push_back (entry.name);
166 }
167
168 auto downloadResult = SH::rucioDownloadList (location, datasets);
169 for (const auto& result : downloadResult)
170 {
171 if (result.notDownloaded != 0)
172 return false;
173 }
174 } catch (...) {
175 return false;
176 }
177 return true;
178}
std::vector< RucioDownloadResult > rucioDownloadList(const std::string &location, const std::vector< std::string > &datasets)
run rucio-download with multiple datasets
std::vector< RucioListDidsEntry > rucioListDids(const std::string &dataset)
run rucio-list-dids for the given dataset

◆ formatOutputName()

std::string formatOutputName ( const SH::MetaObject & sampleMeta,
const std::string & pattern )
static

Definition at line 396 of file PrunDriver.cxx.

398{
399 const std::string sampleName = sampleMeta.castString("sample_name");
400 RCU_REQUIRE(not pattern.empty());
401 using namespace EL::msgEventLoop;
402
403 static const std::string nickname =
404 gSystem->GetFromPipe(Form("python -c \"%s\" 2>/dev/null",
405 "from pandatools import PsubUtils;"
406 "print(PsubUtils.getNickname());")).Data();
407
408 TString out = pattern.c_str();
409
410 // Handle case of no proxy; will create a proxy later in the submission
411 if (nickname.length()>20){
412 ANA_MSG_WARNING( "No proxy available - cannot use nickname yet. Will try a late replacement.");
413 } else {
414 out.ReplaceAll("%nickname%", nickname);
415 }
416
417 out.ReplaceAll("%in:name%", sampleName);
418
419 std::stringstream ss(sampleName);
420 std::string item;
421 int field = 0;
422 while(std::getline(ss, item, '.')) {
423 std::stringstream sskey;
424 sskey << "%in:name[" << ++field << "]%";
425 out.ReplaceAll(sskey.str(), item);
426 }
427 while (out.Index("%in:") != -1) {
428 int i1 = out.Index("%in:");
429 int i2 = out.Index("%", i1+1);
430 TString metaName = out(i1+4, i2-i1-4);
431 out.ReplaceAll("%in:"+metaName+"%",
432 sampleMeta.castString(std::string(metaName.Data())));
433 }
434 out.ReplaceAll("/", "");
435 return out.Data();
436}
#define ANA_MSG_WARNING(xmsg)
Macro printing warning messages.
static Double_t ss
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name

◆ merge()

Status::Enum merge ( SH::Sample *const sample)
static

Definition at line 297 of file PrunDriver.cxx.

298{
299 RCU_REQUIRE(sample);
300
301 std::string container = sample->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
302 RCU_ASSERT(not container.empty());
303 if (container[container.size()-1] == '/') {
304 container.resize(container.size() - 1);
305 }
306 container += "_hist/";
307 const std::string dir = "elg/download/" + container;
308
309 const std::string fileName = "hist-output.root";
310
311 const std::string target = Form("hist-%s.root", sample->name().c_str());
312
313 const std::string findCmd(Form("find %s -name \"*.%s*\" | tr '\n' ' '",
314 dir.c_str(), fileName.c_str()));
315 std::istringstream input(gSystem->GetFromPipe(findCmd.c_str()).Data());
316 std::vector<std::string> files((std::istream_iterator<std::string>(input)),
317 std::istream_iterator<std::string>());
318
319 std::sort(files.begin(), files.end());
320 RCU_ASSERT(std::unique(files.begin(), files.end()) == files.end());
321
322 if (not files.size()) {
323 std::cerr << "Found no input files for merging! "
324 << "Requeueing sample for download..." << std::endl;
325 sample->meta()->setString("nc_ELG_state_details", "retry, files were lost");
326 return Status::FAIL;
327 }
328
329 try {
330 RCU::hadd(target.c_str(), files);
331 } catch (...) {
332 sample->meta()->setString("nc_ELG_state_details",
333 "error, check log for details");
334 gSystem->Exec(Form("rm -f %s", target.c_str()));
335 return Status::PENDING;
336 }
337
338 for (size_t i = 0; i != files.size(); ++i) {
339 gSystem->Exec(Form("rm %s", files[i].c_str()));
340 }
341 gSystem->Exec(Form("rmdir %s/*", dir.c_str()));
342 gSystem->Exec(Form("rmdir %s", dir.c_str()));
343
344 return Status::DONE;
345}
std::vector< std::string > files
file names and file pointers
Definition hcg.cxx:50
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
Definition hadd.cxx:28
DataModel_detail::iterator< DVL > unique(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of unique for DataVector/List.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.

◆ nextState()

JobState::Enum nextState ( JobState::Enum state,
Status::Enum status )
static

Definition at line 110 of file PrunDriver.cxx.

111{
112 RCU_REQUIRE(state != JobState::FINISHED);
113 RCU_REQUIRE(state != JobState::FAILED);
114 static const TransitionRule TABLE[] =
115 {
116 TransitionRule(JobState::INIT, Status::DONE, JobState::RUN),
117 TransitionRule(JobState::INIT, Status::PENDING, JobState::INIT),
118 TransitionRule(JobState::INIT, Status::FAIL, JobState::FAILED),
119 TransitionRule(JobState::RUN, Status::DONE, JobState::DOWNLOAD),
120 TransitionRule(JobState::RUN, Status::PENDING, JobState::RUN),
121 TransitionRule(JobState::RUN, Status::FAIL, JobState::FAILED),
122 TransitionRule(JobState::DOWNLOAD, Status::DONE, JobState::MERGE),
123 TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124 TransitionRule(JobState::DOWNLOAD, Status::FAIL, JobState::FAILED),
125 TransitionRule(JobState::MERGE, Status::DONE, JobState::FINISHED),
126 TransitionRule(JobState::MERGE, Status::PENDING, JobState::MERGE),
127 TransitionRule(JobState::MERGE, Status::FAIL, JobState::DOWNLOAD)
128 };
129 static const unsigned int TABLE_SIZE = sizeof(TABLE) / sizeof(TABLE[0]);
130 for (unsigned int i = 0; i != TABLE_SIZE; ++i) {
131 if (TABLE[i].fromState == state && TABLE[i].status == status) {
132 return TABLE[i].toState;
133 }
134 }
135 RCU_ASSERT0("Missing state transition rule");
136 throw std::logic_error("PrunDriver.cxx: Missing state transition rule");
137}

◆ outputFileNames()

std::string outputFileNames ( const EL::Job & job)

Definition at line 438 of file PrunDriver.cxx.

439{
440 TList outputs;
441 for (EL::Job::outputIter out = job.outputBegin(),
442 end = job.outputEnd(); out != end; ++out) {
443 outputs.Add(out->Clone());
444 }
445 std::string out = "hist:hist-output.root";
446 TIter itr(&outputs);
447 TObject *obj = 0;
448 while ((obj = itr())) {
449 EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
450 const std::string name = os->label() + ".root";
451 const std::string ds =
452 os->options()->castString(EL::OutputStream::optContainerSuffix);
453 out += "," + (ds.empty() ? name : ds + ":" + name);
454 }
455 return out;
456}
const OutputStream * outputIter
Definition Job.h:143
static const std::string optContainerSuffix

◆ outputSH()

SH::SampleHandler outputSH ( const SH::SampleHandler & in,
const std::string & outputLabel )
static

Definition at line 482 of file PrunDriver.cxx.

484{
486 const std::string outputFile = "*" + outputLabel + ".root*";
487 const std::string outDSSuffix = '_' + outputLabel + ".root/";
488 for (SH::SampleHandler::iterator s = in.begin(); s != in.end(); ++s) {
489 SH::SampleGrid* outSample = new SH::SampleGrid((*s)->name());
490 const std::string outputDS = (*s)->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT) + outDSSuffix;
491 outSample->meta()->setString("nc_grid", outputDS);
492 outSample->meta()->setString("nc_grid_filter", outputFile);
493 out.add(outSample);
494 }
495 out.fetch(in);
496 return out;
497}
const std::string outputLabel
This class implements a Sample located on the grid.
Definition SampleGrid.h:44
A class that manages a list of Sample objects.
iterator begin() const
the begin iterator to use
std::vector< Sample * >::const_iterator iterator
the iterator to use
iterator end() const
the end iterator to use
MetaObject * meta()
the meta-information for this sample

◆ processAllInState()

void processAllInState ( const SH::SampleHandler & sh,
JobState::Enum state,
const size_t nThreads )
static

Definition at line 378 of file PrunDriver.cxx.

380{
381 RCU_REQUIRE(sh.size());
382
383 WorkList workList;
384
385 bool isFirstSample = true;
386 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
387 if (sampleState(*s) == state) {
388 workList.push_back([s, isFirstSample]()->void{ processTask(*s, isFirstSample); });
389 // Change boolean to false as already processed one sample
390 isFirstSample = false;
391 }
392 }
393 process(workList, nThreads);
394}
std::vector< WorkUnit > WorkList
static JobState::Enum sampleState(SH::Sample *sample)
static void processTask(SH::Sample *const sample, const bool isFirstSample)
const std::string process

◆ processTask()

void processTask ( SH::Sample *const sample,
const bool isFirstSample )
static

Definition at line 347 of file PrunDriver.cxx.

348{
349 RCU_REQUIRE(sample);
350
351 JobState::Enum state = sampleState(sample);
352
353 sample->meta()->setString("nc_ELG_state_details", "");
354
355 Status::Enum status = Status::PENDING;
356 switch (state) {
357 case JobState::INIT:
358 status = submit(sample, isFirstSample);
359 break;
360 case JobState::RUN:
361 status = checkPandaTask(sample);
362 break;
363 case JobState::DOWNLOAD:
364 status = download(sample);
365 break;
366 case JobState::MERGE:
367 status = merge(sample);
368 break;
369 case JobState::FINISHED:
370 case JobState::FAILED:
371 break;
372 }
373
374 state = nextState(state, status);
375 sample->meta()->setString("nc_ELG_state", JobState::name[state]);
376}
static JobState::Enum nextState(JobState::Enum state, Status::Enum status)
static Status::Enum checkPandaTask(SH::Sample *const sample)
static Status::Enum download(SH::Sample *const sample)
Definition merge.py:1

◆ sampleState()

JobState::Enum sampleState ( SH::Sample * sample)
static

Definition at line 102 of file PrunDriver.cxx.

103{
104 RCU_REQUIRE(sample);
105 static const std::string defaultState = JobState::name[JobState::INIT];
106 std::string label = sample->meta()->castString ("nc_ELG_state", defaultState, SH::MetaObject::CAST_NOCAST_DEFAULT);
107 return JobState::parse(label);
108}
std::string label(const std::string &format, int i)
Definition label.h:19

◆ saveJobDef()

void saveJobDef ( const std::string & fileName,
const EL::Job & job,
const SH::SampleHandler sh )
static

Definition at line 459 of file PrunDriver.cxx.

462{
463 TFile file(fileName.c_str(), "RECREATE");
464 TList outputs;
465 for (EL::Job::outputIter o = job.outputBegin(); o !=job.outputEnd(); ++o)
466 outputs.Add(o->Clone());
467 file.WriteTObject(&job.jobConfig(), "jobConfig", "SingleKey");
468 file.WriteTObject(&outputs, "outputs", "SingleKey");
469 bool haveDefault = false;
470 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
471 const SH::MetaObject& meta = *((*s)->meta());
472 file.WriteObject(&meta, meta.castString("sample_name").c_str());
473 if (!haveDefault)
474 {
475 file.WriteObject (&meta, "defaultMetaObject");
476 haveDefault = true;
477 }
478 }
479}
-diff
TFile * file

◆ submit()

Status::Enum submit ( SH::Sample *const sample,
const bool isFirstSample )
static

Definition at line 180 of file PrunDriver.cxx.

181{
182 RCU_REQUIRE(sample);
183 using namespace EL::msgEventLoop;
184
185 ANA_MSG_INFO( "Submitting " << sample->name() << "..." );
186
187 static bool loaded = false;
188 if (not loaded) {
189 // TString path = "$ROOTCOREBIN/python/EventLoopGrid/ELG_prun.py";
190 // gSystem->ExpandPathName(path);
191 // TPython::LoadMacro(path.Data());
192 std::string path = PathResolverFindCalibFile("EventLoopGrid/ELG_prun.py");
193 TPython::LoadMacro(path.c_str());
194 loaded = true;
195 }
196
197 TPython::Bind(dynamic_cast<TObject*>(sample), "ELG_SAMPLE");
198#if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
199 std::any result;
200 TPython::Exec("_anyresult = ROOT.std.make_any['int'](ELG_prun(ELG_SAMPLE))", &result);
201 int ret = std::any_cast<int>(result);
202#else
203 int ret = TPython::Eval("ELG_prun(ELG_SAMPLE)");
204#endif
205 TPython::Bind(0, "ELG_SAMPLE");
206
207 // Tarball is created for the first sample to be submitted
208 // then the tarball is simply reused for the other samples
209 // If the returned value is 1 it implies the tarball creation failed
210 // See EventLoopGrid/data/ELG_prun.py script
211 // Abort any further processing as the tarball was not succesfully created
212 if (isFirstSample && ret == 1){
213 ANA_MSG_ERROR("Failed to create tarball");
214 throw std::runtime_error("PrunDriver.cxx: aborting due to tarball creation issue");
215 }
216
217 if (ret < 100) {
218 sample->meta()->setString("nc_ELG_state_details",
219 "problem submitting");
220 return Status::FAIL;
221 }
222
223 sample->meta()->setDouble("nc_jediTaskID", ret);
224
225 // Let's also tell people about their task ID
226 ANA_MSG_INFO( "Task submitted; jediTaskID=" << ret );
227
228 return Status::DONE;
229}
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.