ATLAS Offline Software
Loading...
Searching...
No Matches
PrunDriver.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3*/
4
7
8
9
11#include <EventLoop/Algorithm.h>
14#include <EventLoop/Job.h>
19#include <RootCoreUtils/hadd.h>
27
28
29#include <TList.h>
30#include <TPython.h>
31#include <TROOT.h>
32#include <TFile.h>
33#include <TSystem.h>
34
35#include <algorithm>
36#include <cstdlib>
37#include <iostream>
38#include <sstream>
39#include <string>
40#include <vector>
41#include <stdexcept>
42
43#include <ranges>
44
45#include "pool.h"
46#include <mutex>
47
49
50namespace {
51 namespace JobState {
52 static const unsigned int NSTATES = 6;
53 enum Enum { INIT=0, RUN=1, DOWNLOAD=2, MERGE=3, FINISHED=4, FAILED=5 };
54 static const char* name[NSTATES] =
55 { "INIT", "RUNNING", "DOWNLOAD", "MERGE", "FINISHED", "FAILED" };
56 Enum parse(const std::string& what)
57 {
58 for (unsigned int i = 0; i != NSTATES; ++i) {
59 if (what == name[i]) { return static_cast<Enum>(i); }
60 }
61 RCU_ASSERT0("Failed to parse job state string");
62 throw std::runtime_error("PrunDriver.cxx: Failed to parse job state string"); //compiler dummy
63 }
64 }
65
66 // When changing the values in the enum make sure
67 // corresponding values in `data/ELG_jediState.py` script
68 // are changed accordingly
69 namespace Status {
70 //static const int NSTATES = 3;
71 enum Enum { DONE=0, PENDING=1, FAIL=2 };
72 }
73
74 struct TransitionRule {
75 JobState::Enum fromState;
76 Status::Enum status;
77 JobState::Enum toState;
78 TransitionRule(JobState::Enum fromState,
79 Status::Enum status,
80 JobState::Enum toState)
81 : fromState(fromState)
82 , status(status)
83 , toState(toState)
84 {
85 }
86 };
87
88 struct TmpCd {
89 const std::string origDir;
90 TmpCd(const std::string & dir)
91 : origDir(gSystem->pwd())
92 {
93 gSystem->cd(dir.c_str());
94 }
95 ~TmpCd()
96 {
97 gSystem->cd(origDir.c_str());
98 }
99 };
100}
101
102static JobState::Enum sampleState(SH::Sample* sample)
103{
104 RCU_REQUIRE(sample);
105 static const std::string defaultState = JobState::name[JobState::INIT];
106 std::string label = sample->meta()->castString ("nc_ELG_state", defaultState, SH::MetaObject::CAST_NOCAST_DEFAULT);
107 return JobState::parse(label);
108}
109
110static JobState::Enum nextState(JobState::Enum state, Status::Enum status)
111{
112 RCU_REQUIRE(state != JobState::FINISHED);
113 RCU_REQUIRE(state != JobState::FAILED);
114 static const TransitionRule TABLE[] =
115 {
116 TransitionRule(JobState::INIT, Status::DONE, JobState::RUN),
117 TransitionRule(JobState::INIT, Status::PENDING, JobState::INIT),
118 TransitionRule(JobState::INIT, Status::FAIL, JobState::FAILED),
119 TransitionRule(JobState::RUN, Status::DONE, JobState::DOWNLOAD),
120 TransitionRule(JobState::RUN, Status::PENDING, JobState::RUN),
121 TransitionRule(JobState::RUN, Status::FAIL, JobState::FAILED),
122 TransitionRule(JobState::DOWNLOAD, Status::DONE, JobState::MERGE),
123 TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124 TransitionRule(JobState::DOWNLOAD, Status::FAIL, JobState::FAILED),
125 TransitionRule(JobState::MERGE, Status::DONE, JobState::FINISHED),
126 TransitionRule(JobState::MERGE, Status::PENDING, JobState::MERGE),
127 TransitionRule(JobState::MERGE, Status::FAIL, JobState::DOWNLOAD)
128 };
129 static const unsigned int TABLE_SIZE = sizeof(TABLE) / sizeof(TABLE[0]);
130 for (unsigned int i = 0; i != TABLE_SIZE; ++i) {
131 if (TABLE[i].fromState == state && TABLE[i].status == status) {
132 return TABLE[i].toState;
133 }
134 }
135 RCU_ASSERT0("Missing state transition rule");
136 throw std::logic_error("PrunDriver.cxx: Missing state transition rule");
137}
138
140{
142 o.setString("nc_nGBPerJob", "MAX");
143 o.setString("nc_mergeOutput", "true");
144 o.setString("nc_cmtConfig", gSystem->ExpandPathName("$AnalysisBase_PLATFORM"));
145 o.setString("nc_useAthenaPackages", "true");
146 const std::string mergestr = "elg_merge jobdef.root %OUT %IN";
147 o.setString("nc_mergeScript", mergestr);
148 return o;
149}
150
151static bool downloadContainer(const std::string& name,
152 const std::string& location)
153{
154 RCU_ASSERT(not name.empty());
155 RCU_ASSERT(name[name.size()-1] == '/');
156 RCU_ASSERT(not location.empty());
157
158 try {
159 gSystem->Exec(Form("mkdir -p %s", location.c_str()));
160
161 std::vector<std::string> datasets;
162 for (auto& entry : SH::rucioListDids (name))
163 {
164 if (entry.type == "CONTAINER" || entry.type == "DIDType.CONTAINER")
165 datasets.push_back (entry.name);
166 }
167
168 auto downloadResult = SH::rucioDownloadList (location, datasets);
169 for (const auto& result : downloadResult)
170 {
171 if (result.notDownloaded != 0)
172 return false;
173 }
174 } catch (...) {
175 return false;
176 }
177 return true;
178}
179
180static Status::Enum submit(SH::Sample* const sample, const bool isFirstSample)
181{
182 RCU_REQUIRE(sample);
183 using namespace EL::msgEventLoop;
184
185 ANA_MSG_INFO( "Submitting " << sample->name() << "..." );
186
187 static bool loaded = false;
188 if (not loaded) {
189 // TString path = "$ROOTCOREBIN/python/EventLoopGrid/ELG_prun.py";
190 // gSystem->ExpandPathName(path);
191 // TPython::LoadMacro(path.Data());
192 std::string path = PathResolverFindCalibFile("EventLoopGrid/ELG_prun.py");
193 TPython::LoadMacro(path.c_str());
194 loaded = true;
195 }
196
197 TPython::Bind(dynamic_cast<TObject*>(sample), "ELG_SAMPLE");
198#if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
199 std::any result;
200 TPython::Exec("_anyresult = ROOT.std.make_any['int'](ELG_prun(ELG_SAMPLE))", &result);
201 int ret = std::any_cast<int>(result);
202#else
203 int ret = TPython::Eval("ELG_prun(ELG_SAMPLE)");
204#endif
205 TPython::Bind(0, "ELG_SAMPLE");
206
207 // Tarball is created for the first sample to be submitted
208 // then the tarball is simply reused for the other samples
209 // If the returned value is 1 it implies the tarball creation failed
210 // See EventLoopGrid/data/ELG_prun.py script
211 // Abort any further processing as the tarball was not succesfully created
212 if (isFirstSample && ret == 1){
213 ANA_MSG_ERROR("Failed to create tarball");
214 throw std::runtime_error("PrunDriver.cxx: aborting due to tarball creation issue");
215 }
216
217 if (ret < 100) {
218 sample->meta()->setString("nc_ELG_state_details",
219 "problem submitting");
220 return Status::FAIL;
221 }
222
223 sample->meta()->setDouble("nc_jediTaskID", ret);
224
225 // Let's also tell people about their task ID
226 ANA_MSG_INFO( "Task submitted; jediTaskID=" << ret );
227
228 return Status::DONE;
229}
230
231static Status::Enum checkPandaTask(SH::Sample* const sample)
232{
233 RCU_REQUIRE(sample);
234 RCU_REQUIRE(static_cast<int>(sample->meta()->castDouble("nc_jediTaskID",0, SH::MetaObject::CAST_NOCAST_DEFAULT)) > 100);
235
236 static bool loaded = false;
237 if (not loaded) {
238 std::string path = PathResolverFindCalibFile("EventLoopGrid/ELG_jediState.py");
239 TPython::LoadMacro(path.c_str());
240 loaded = true;
241 }
242
243 TPython::Bind(dynamic_cast<TObject*>(sample), "ELG_SAMPLE");
244#if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
245 std::any result;
246 TPython::Exec("_anyresult = ROOT.std.make_any['int'](ELG_jediState(ELG_SAMPLE))", &result);
247 int ret = std::any_cast<int>(result);
248#else
249 int ret = TPython::Eval("ELG_jediState(ELG_SAMPLE)");
250#endif
251 TPython::Bind(0, "ELG_SAMPLE");
252
253 if (ret == Status::DONE) return Status::DONE;
254 if (ret == Status::FAIL) return Status::FAIL;
255
256 // Value 90 corresponds to `running` state of the job
257 if (ret != 90) { sample->meta()->setString("nc_ELG_state_details", "task status other than done/finished/failed/running"); }
258 // Value 99 is returned if there is error in the script (import, missing ID)
259 if (ret == 99) {
260 sample->meta()->setString("nc_ELG_state_details",
261 "problem checking jedi task status");
262 }
263
264 return Status::PENDING;
265}
266
267static Status::Enum download(SH::Sample* const sample)
268{
269 RCU_REQUIRE(sample);
270
271 {
272 static std::mutex mutex;
273 std::lock_guard<std::mutex> lock(mutex);
274 std::cout << "Downloading output from: "
275 << sample->name() << "..." << std::endl;
276 }
277
278 std::string container = sample->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
279 RCU_ASSERT(not container.empty());
280 if (container[container.size()-1] == '/') {
281 container.resize(container.size() - 1);
282 }
283 container += "_hist/";
284
285 bool downloadOk = downloadContainer(container, "elg/download/" + container);
286
287 if (not downloadOk) {
288 std::cerr << "Failed to download one or more files" << std::endl;
289 sample->meta()->setString("nc_ELG_state_details",
290 "error, check log for details");
291 return Status::PENDING;
292 }
293
294 return Status::DONE;
295}
296
297static Status::Enum merge(SH::Sample* const sample)
298{
299 RCU_REQUIRE(sample);
300
301 std::string container = sample->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
302 RCU_ASSERT(not container.empty());
303 if (container[container.size()-1] == '/') {
304 container.resize(container.size() - 1);
305 }
306 container += "_hist/";
307 const std::string dir = "elg/download/" + container;
308
309 const std::string fileName = "hist-output.root";
310
311 const std::string target = Form("hist-%s.root", sample->name().c_str());
312
313 const std::string findCmd(Form("find %s -name \"*.%s*\" | tr '\n' ' '",
314 dir.c_str(), fileName.c_str()));
315 std::istringstream input(gSystem->GetFromPipe(findCmd.c_str()).Data());
316 std::vector<std::string> files((std::istream_iterator<std::string>(input)),
317 std::istream_iterator<std::string>());
318
319 std::sort(files.begin(), files.end());
320 RCU_ASSERT(std::unique(files.begin(), files.end()) == files.end());
321
322 if (not files.size()) {
323 std::cerr << "Found no input files for merging! "
324 << "Requeueing sample for download..." << std::endl;
325 sample->meta()->setString("nc_ELG_state_details", "retry, files were lost");
326 return Status::FAIL;
327 }
328
329 try {
330 RCU::hadd(target.c_str(), files);
331 } catch (...) {
332 sample->meta()->setString("nc_ELG_state_details",
333 "error, check log for details");
334 gSystem->Exec(Form("rm -f %s", target.c_str()));
335 return Status::PENDING;
336 }
337
338 for (size_t i = 0; i != files.size(); ++i) {
339 gSystem->Exec(Form("rm %s", files[i].c_str()));
340 }
341 gSystem->Exec(Form("rmdir %s/*", dir.c_str()));
342 gSystem->Exec(Form("rmdir %s", dir.c_str()));
343
344 return Status::DONE;
345}
346
347static void processTask(SH::Sample* const sample, const bool isFirstSample)
348{
349 RCU_REQUIRE(sample);
350
351 JobState::Enum state = sampleState(sample);
352
353 sample->meta()->setString("nc_ELG_state_details", "");
354
355 Status::Enum status = Status::PENDING;
356 switch (state) {
357 case JobState::INIT:
358 status = submit(sample, isFirstSample);
359 break;
360 case JobState::RUN:
361 status = checkPandaTask(sample);
362 break;
363 case JobState::DOWNLOAD:
364 status = download(sample);
365 break;
366 case JobState::MERGE:
367 status = merge(sample);
368 break;
369 case JobState::FINISHED:
370 case JobState::FAILED:
371 break;
372 }
373
374 state = nextState(state, status);
375 sample->meta()->setString("nc_ELG_state", JobState::name[state]);
376}
377
378static void processAllInState(const SH::SampleHandler& sh, JobState::Enum state,
379 const size_t nThreads)
380{
381 RCU_REQUIRE(sh.size());
382
383 WorkList workList;
384
385 bool isFirstSample = true;
386 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
387 if (sampleState(*s) == state) {
388 workList.push_back([s, isFirstSample]()->void{ processTask(*s, isFirstSample); });
389 // Change boolean to false as already processed one sample
390 isFirstSample = false;
391 }
392 }
393 process(workList, nThreads);
394}
395
396static std::string formatOutputName(const SH::MetaObject& sampleMeta,
397 const std::string & pattern)
398{
399 const std::string sampleName = sampleMeta.castString("sample_name");
400 RCU_REQUIRE(not pattern.empty());
401 using namespace EL::msgEventLoop;
402
403 static const std::string nickname =
404 gSystem->GetFromPipe(Form("python -c \"%s\" 2>/dev/null",
405 "from pandatools import PsubUtils;"
406 "print(PsubUtils.getNickname());")).Data();
407
408 TString out = pattern.c_str();
409
410 // Handle case of no proxy; will create a proxy later in the submission
411 if (nickname.length()>20){
412 ANA_MSG_WARNING( "No proxy available - cannot use nickname yet. Will try a late replacement.");
413 } else {
414 out.ReplaceAll("%nickname%", nickname);
415 }
416
417 out.ReplaceAll("%in:name%", sampleName);
418
419 std::stringstream ss(sampleName);
420 std::string item;
421 int field = 0;
422 while(std::getline(ss, item, '.')) {
423 std::stringstream sskey;
424 sskey << "%in:name[" << ++field << "]%";
425 out.ReplaceAll(sskey.str(), item);
426 }
427 while (out.Index("%in:") != -1) {
428 int i1 = out.Index("%in:");
429 int i2 = out.Index("%", i1+1);
430 TString metaName = out(i1+4, i2-i1-4);
431 out.ReplaceAll("%in:"+metaName+"%",
432 sampleMeta.castString(std::string(metaName.Data())));
433 }
434 out.ReplaceAll("/", "");
435 return out.Data();
436}
437
438std::string outputFileNames(const EL::Job& job)
439{
440 TList outputs;
441 for (EL::Job::outputIter out = job.outputBegin(),
442 end = job.outputEnd(); out != end; ++out) {
443 outputs.Add(out->Clone());
444 }
445 std::string out = "hist:hist-output.root";
446 TIter itr(&outputs);
447 TObject *obj = 0;
448 while ((obj = itr())) {
449 EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
450 const std::string name = os->label() + ".root";
451 const std::string ds =
452 os->options()->castString(EL::OutputStream::optContainerSuffix);
453 out += "," + (ds.empty() ? name : ds + ":" + name);
454 }
455 return out;
456}
457
458// Save algortihms and lists of inputs and outputs to a root file
459static void saveJobDef(const std::string& fileName,
460 const EL::Job& job,
461 const SH::SampleHandler sh)
462{
463 TFile file(fileName.c_str(), "RECREATE");
464 TList outputs;
465 for (EL::Job::outputIter o = job.outputBegin(); o !=job.outputEnd(); ++o)
466 outputs.Add(o->Clone());
467 file.WriteTObject(&job.jobConfig(), "jobConfig", "SingleKey");
468 file.WriteTObject(&outputs, "outputs", "SingleKey");
469 bool haveDefault = false;
470 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
471 const SH::MetaObject& meta = *((*s)->meta());
472 file.WriteObject(&meta, meta.castString("sample_name").c_str());
473 if (!haveDefault)
474 {
475 file.WriteObject (&meta, "defaultMetaObject");
476 haveDefault = true;
477 }
478 }
479}
480
481// Create a sample handler with grid locations of outputs with given label
483 const std::string& outputLabel)
484{
486 const std::string outputFile = "*" + outputLabel + ".root*";
487 const std::string outDSSuffix = '_' + outputLabel + ".root/";
488 for (SH::SampleHandler::iterator s = in.begin(); s != in.end(); ++s) {
489 SH::SampleGrid* outSample = new SH::SampleGrid((*s)->name());
490 const std::string outputDS = (*s)->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT) + outDSSuffix;
491 outSample->meta()->setString("nc_grid", outputDS);
492 outSample->meta()->setString("nc_grid_filter", outputFile);
493 out.add(outSample);
494 }
495 out.fetch(in);
496 return out;
497}
498
500{
501 RCU_INVARIANT(this != 0);
502}
503
508
509::StatusCode EL::PrunDriver ::
510doManagerStep (Detail::ManagerData& data) const
511{
512 using namespace msgEventLoop;
514 switch (data.step)
515 {
517 {
518 const std::string jobELGDir = data.submitDir + "/elg";
519 const std::string runShFile = jobELGDir + "/runjob.sh";
520 //const std::string runShOrig = "$ROOTCOREBIN/data/EventLoopGrid/runjob.sh";
521 const std::string mergeShFile = jobELGDir + "/elg_merge";
522 //const std::string mergeShOrig =
523 // "$ROOTCOREBIN/user_scripts/EventLoopGrid/elg_merge";
524 const std::string runShOrig = PathResolverFindCalibFile("EventLoopGrid/runjob.sh");
525 const std::string mergeShOrig = PathResolverFindCalibFile("EventLoopGrid/elg_merge");
526
527 const std::string jobDefFile = jobELGDir + "/jobdef.root";
528 gSystem->Exec(Form("mkdir -p %s", jobELGDir.c_str()));
529 gSystem->Exec(Form("cp %s %s", runShOrig.c_str(), runShFile.c_str()));
530 gSystem->Exec(Form("chmod +x %s", runShFile.c_str()));
531 gSystem->Exec(Form("cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
532 gSystem->Exec(Form("chmod +x %s", mergeShFile.c_str()));
533
534 // create symbolic links for additionnal files/directories if any to ship to the grid
535 std::string listToShipToGrid = data.options.castString(EL::Job::optGridPrunShipAdditionalFilesOrDirs, "");
536 // parse the list of comma separated files and/or directories to ship to the grid
537 if (listToShipToGrid.size()){
539 "Creating symbolic links for additional files or directories to be sent to grid.\n"
540 "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
541 "Otherwise prun ignores those files."
542 );
543
544 std::vector<std::string> vect_filesOrDirToShip;
545 for (auto&& part : std::views::split(listToShipToGrid, ',')) vect_filesOrDirToShip.emplace_back(part.begin(), part.end());
546 // Create symbolic links of files or directories to the submission directory
547 for (const std::string & fileOrDirToShip: vect_filesOrDirToShip){
548 ANA_MSG_INFO (("Creating symbolic link for: " +fileOrDirToShip).c_str());
549 RCU::Shell::exec("ln -sf " + fileOrDirToShip + " " + jobELGDir);
550 }
551 ANA_MSG_INFO ("Finished creation of symbolic links");
552 }
553
554 const SH::SampleHandler& sh = data.job->sampleHandler();
555
556 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
557 SH::MetaObject& meta = *(*s)->meta();
558 meta.fetchDefaults(data.options);
559 meta.fetchDefaults(defaultOpts());
560 meta.setString("nc_outputs", outputFileNames(*data.job));
561 std::string outputSampleName = meta.castString("nc_outputSampleName");
562 if (outputSampleName.empty()) {
563 outputSampleName = "user.%nickname%.%in:name%";
564 }
565 meta.setString("nc_outDS", formatOutputName(meta, outputSampleName));
566 meta.setString("nc_inDS", meta.castString("nc_grid", (*s)->name()));
567 meta.setString("nc_writeInputToTxt", "IN:input.txt");
568 meta.setString("nc_match", meta.castString("nc_grid_filter"));
569 const std::string execstr = "runjob.sh " + (*s)->name();
570 meta.setString("nc_exec", execstr);
571 meta.setString("nc_framework", "EventLoopGrid");
572 }
573
574 saveJobDef(jobDefFile, *data.job, sh);
575
576 for (EL::Job::outputIter out = data.job->outputBegin();
577 out != data.job->outputEnd(); ++out) {
578 SH::SampleHandler shOut = outputSH(sh, out->label());
579 shOut.save(data.submitDir + "/output-" + out->label());
580 }
581 SH::SampleHandler shHist = outputSH(sh, "hist-output");
582 shHist.save(data.submitDir + "/output-hist");
583
584 TmpCd keepDir(jobELGDir);
585
586 processAllInState(sh, JobState::INIT, 0);
587
588 sh.save(data.submitDir + "/input");
589 data.submitted = true;
590 }
591 break;
592
594 {
596 }
597 break;
598
599 default:
600 (void) true; // safe to do nothing
601 }
602 return ::StatusCode::SUCCESS;
603}
604
606{
607 RCU_READ_INVARIANT(this);
608 RCU_REQUIRE(not data.submitDir.empty());
609
610 TmpCd tmpDir(data.submitDir);
611
613 sh.load("input");
614 RCU_ASSERT(sh.size());
615
616 const size_t nRunThreads = options()->castDouble("nc_run_threads", 0);
617 const size_t nDlThreads = options()->castDouble("nc_download_threads", 0);
618 processAllInState(sh, JobState::INIT, 0);
619 processAllInState(sh, JobState::RUN, nRunThreads);
620 processAllInState(sh, JobState::DOWNLOAD, nDlThreads);
621 processAllInState(sh, JobState::MERGE, 0);
622
623 sh.save("input");
624
625 std::cout << std::endl;
626
627 bool allDone = true;
628 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
629 JobState::Enum state = sampleState(*s);
630 std::string details = (*s)->meta()->castString("nc_ELG_state_details", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
631 if (not details.empty()) { details = '(' + details + ')'; }
632
633 std::cout << (*s)->name() << "\t";
634 switch (state) {
635 case JobState::INIT:
636 case JobState::RUN:
637 case JobState::DOWNLOAD:
638 case JobState::MERGE:
639 std::cout << JobState::name[state] << "\t";
640 break;
641 case JobState::FINISHED:
642 std::cout << "\033[1;32m" << JobState::name[state] << "\033[0m\t";
643 break;
644 case JobState::FAILED:
645 std::cout << "\033[1;31m" << JobState::name[state] << "\033[0m\t";
646 break;
647 }
648 std::cout << details << std::endl;
649
650 allDone &= (state == JobState::FINISHED || state == JobState::FAILED);
651 }
652
653 std::cout << std::endl;
654
655 data.retrieved = true;
656 data.completed = allDone;
657 return ::StatusCode::SUCCESS;
658}
659
660void EL::PrunDriver::status(const std::string& location)
661{
662 RCU_REQUIRE(not location.empty());
663 TmpCd tmpDir(location);
665 sh.load("input");
666 RCU_ASSERT(sh.size());
667 processAllInState(sh, JobState::RUN, 0);
668 sh.save("input");
669 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
670 JobState::Enum state = sampleState(*s);
671 std::string details = (*s)->meta()->castString("nc_ELG_state_details", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
672 if (not details.empty()) { details = '(' + details + ')'; }
673 std::cout << (*s)->name() << "\t" << JobState::name[state]
674 << "\t" << details << std::endl;
675 }
676}
677
678void EL::PrunDriver::setState(const std::string& location,
679 const std::string& task,
680 const std::string& state)
681{
682 RCU_REQUIRE(not location.empty());
683 RCU_REQUIRE(not task.empty());
684 RCU_REQUIRE(not state.empty());
685 TmpCd tmpDir(location);
687 sh.load("input");
688 RCU_ASSERT(sh.size());
689 if (not sh.get(task)) {
690 std::cout << "Unknown task: " << task << std::endl;
691 std::cout << "Choose one of: " << std::endl;
692 sh.print();
693 return;
694 }
695 JobState::parse(state);
696 sh.get(task)->meta()->setString("nc_ELG_state", state);
697 sh.save("input");
698}
#define RCU_INVARIANT(x)
Definition Assert.h:201
#define RCU_ASSERT(x)
Definition Assert.h:222
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233
#define RCU_REQUIRE(x)
Definition Assert.h:208
#define RCU_ASSERT0(y)
Definition Assert.h:226
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
#define ANA_MSG_WARNING(xmsg)
Macro printing warning messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
#define INIT(__TYPE)
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
static Double_t ss
const std::string outputLabel
std::string PathResolverFindCalibFile(const std::string &logical_file_name)
std::vector< WorkUnit > WorkList
static SH::MetaObject defaultOpts()
static void processAllInState(const SH::SampleHandler &sh, JobState::Enum state, const size_t nThreads)
ClassImp(EL::PrunDriver) namespace
std::string outputFileNames(const EL::Job &job)
static bool downloadContainer(const std::string &name, const std::string &location)
static JobState::Enum nextState(JobState::Enum state, Status::Enum status)
static std::string formatOutputName(const SH::MetaObject &sampleMeta, const std::string &pattern)
static Status::Enum checkPandaTask(SH::Sample *const sample)
static void saveJobDef(const std::string &fileName, const EL::Job &job, const SH::SampleHandler sh)
static Status::Enum download(SH::Sample *const sample)
static JobState::Enum sampleState(SH::Sample *sample)
static SH::SampleHandler outputSH(const SH::SampleHandler &in, const std::string &outputLabel)
static void processTask(SH::Sample *const sample, const bool isFirstSample)
SH::MetaObject * options()
the list of options to jobs with this driver
virtual::StatusCode doManagerStep(Detail::ManagerData &data) const
Definition Job.h:51
const OutputStream * outputIter
Definition Job.h:143
static const std::string optGridPrunShipAdditionalFilesOrDirs
Enables to ship additional files to the tarbal sent to the grid Should be a list of comma separated p...
Definition Job.h:494
static const std::string optContainerSuffix
a Driver to submit jobs via prun
Definition PrunDriver.h:23
static void status(const std::string &location)
::StatusCode doRetrieve(Detail::ManagerData &data) const
static void setState(const std::string &location, const std::string &task, const std::string &state)
void testInvariant() const
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:56
@ CAST_NOCAST_DEFAULT
cast and return the default value if the input has the wrong type
Definition MetaObject.h:78
void setString(const std::string &name, const std::string &value)
set the meta-data string with the given name
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name
This class implements a Sample located on the grid.
Definition SampleGrid.h:44
A class that manages a list of Sample objects.
void save(const std::string &directory) const
save the list of samples to the given directory
iterator begin() const
the begin iterator to use
std::vector< Sample * >::const_iterator iterator
the iterator to use
iterator end() const
the end iterator to use
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition Sample.h:54
MetaObject * meta()
the meta-information for this sample
STL class.
std::map< std::string, std::string > parse(const std::string &list)
const std::string process
std::vector< std::string > files
file names and file pointers
Definition hcg.cxx:50
std::string label(const std::string &format, int i)
Definition label.h:19
@ doRetrieve
call the actual doRetrieve method
@ submitJob
do the actual job submission
Definition ManagerStep.h:92
::StatusCode StatusCode
StatusCode definition for legacy code.
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition ShellExec.cxx:29
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
Definition hadd.cxx:28
std::vector< RucioDownloadResult > rucioDownloadList(const std::string &location, const std::vector< std::string > &datasets)
run rucio-download with multiple datasets
std::vector< RucioListDidsEntry > rucioListDids(const std::string &dataset)
run rucio-list-dids for the given dataset
Definition merge.py:1
-diff
DataModel_detail::iterator< DVL > unique(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of unique for DataVector/List.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
an internal data structure for passing data between different manager objects anbd step
Definition ManagerData.h:46
TFile * file