ATLAS Offline Software
PrunDriver.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
7 
8 
9 
11 #include <EventLoop/Algorithm.h>
12 #include <EventLoop/ManagerData.h>
13 #include <EventLoop/ManagerStep.h>
14 #include <EventLoop/Job.h>
15 #include <EventLoop/MessageCheck.h>
16 #include <EventLoop/OutputStream.h>
18 #include <RootCoreUtils/Assert.h>
19 #include <RootCoreUtils/hadd.h>
23 #include <SampleHandler/Sample.h>
27 
28 
29 #include <TList.h>
30 #include <TPython.h>
31 #include <TROOT.h>
32 #include <TFile.h>
33 #include <TSystem.h>
34 
35 #include <algorithm>
36 #include <cstdlib>
37 #include <iostream>
38 #include <sstream>
39 #include <string>
40 #include <vector>
41 #include <stdexcept>
42 
43 #include <ranges>
44 
45 #include "pool.h"
46 #include <mutex>
47 
49 
50 namespace {
51  namespace JobState {
52  static const unsigned int NSTATES = 6;
53  enum Enum { INIT=0, RUN=1, DOWNLOAD=2, MERGE=3, FINISHED=4, FAILED=5 };
54  static const char* name[NSTATES] =
55  { "INIT", "RUNNING", "DOWNLOAD", "MERGE", "FINISHED", "FAILED" };
56  Enum parse(const std::string& what)
57  {
58  for (unsigned int i = 0; i != NSTATES; ++i) {
59  if (what == name[i]) { return static_cast<Enum>(i); }
60  }
61  RCU_ASSERT0("Failed to parse job state string");
62  throw std::runtime_error("PrunDriver.cxx: Failed to parse job state string"); //compiler dummy
63  }
64  }
65 
66  // When changing the values in the enum make sure
67  // corresponding values in `data/ELG_jediState.py` script
68  // are changed accordingly
69  namespace Status {
70  //static const int NSTATES = 3;
71  enum Enum { DONE=0, PENDING=1, FAIL=2 };
72  }
73 
74  struct TransitionRule {
75  JobState::Enum fromState;
76  Status::Enum status;
77  JobState::Enum toState;
78  TransitionRule(JobState::Enum fromState,
79  Status::Enum status,
80  JobState::Enum toState)
81  : fromState(fromState)
82  , status(status)
83  , toState(toState)
84  {
85  }
86  };
87 
88  struct TmpCd {
89  const std::string origDir;
90  TmpCd(const std::string & dir)
91  : origDir(gSystem->pwd())
92  {
93  gSystem->cd(dir.c_str());
94  }
95  ~TmpCd()
96  {
97  gSystem->cd(origDir.c_str());
98  }
99  };
100 }
101 
102 static JobState::Enum sampleState(SH::Sample* sample)
103 {
105  static const std::string defaultState = JobState::name[JobState::INIT];
106  std::string label = sample->meta()->castString ("nc_ELG_state", defaultState, SH::MetaObject::CAST_NOCAST_DEFAULT);
107  return JobState::parse(label);
108 }
109 
110 static JobState::Enum nextState(JobState::Enum state, Status::Enum status)
111 {
113  RCU_REQUIRE(state != JobState::FAILED);
114  static const TransitionRule TABLE[] =
115  {
116  TransitionRule(JobState::INIT, Status::DONE, JobState::RUN),
117  TransitionRule(JobState::INIT, Status::PENDING, JobState::INIT),
118  TransitionRule(JobState::INIT, Status::FAIL, JobState::FAILED),
119  TransitionRule(JobState::RUN, Status::DONE, JobState::DOWNLOAD),
120  TransitionRule(JobState::RUN, Status::PENDING, JobState::RUN),
121  TransitionRule(JobState::RUN, Status::FAIL, JobState::FAILED),
122  TransitionRule(JobState::DOWNLOAD, Status::DONE, JobState::MERGE),
123  TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124  TransitionRule(JobState::DOWNLOAD, Status::FAIL, JobState::FAILED),
125  TransitionRule(JobState::MERGE, Status::DONE, JobState::FINISHED),
126  TransitionRule(JobState::MERGE, Status::PENDING, JobState::MERGE),
127  TransitionRule(JobState::MERGE, Status::FAIL, JobState::DOWNLOAD)
128  };
129  static const unsigned int TABLE_SIZE = sizeof(TABLE) / sizeof(TABLE[0]);
130  for (unsigned int i = 0; i != TABLE_SIZE; ++i) {
131  if (TABLE[i].fromState == state && TABLE[i].status == status) {
132  return TABLE[i].toState;
133  }
134  }
135  RCU_ASSERT0("Missing state transition rule");
136  throw std::logic_error("PrunDriver.cxx: Missing state transition rule");
137 }
138 
139 static SH::MetaObject defaultOpts()
140 {
141  SH::MetaObject o;
142  o.setString("nc_nGBPerJob", "MAX");
143  o.setString("nc_mergeOutput", "true");
144  o.setString("nc_cmtConfig", gSystem->ExpandPathName("$AnalysisBase_PLATFORM"));
145  o.setString("nc_useAthenaPackages", "true");
146  const std::string mergestr = "elg_merge jobdef.root %OUT %IN";
147  o.setString("nc_mergeScript", mergestr);
148  return o;
149 }
150 
151 static bool downloadContainer(const std::string& name,
152  const std::string& location)
153 {
154  RCU_ASSERT(not name.empty());
155  RCU_ASSERT(name[name.size()-1] == '/');
156  RCU_ASSERT(not location.empty());
157 
158  try {
159  gSystem->Exec(Form("mkdir -p %s", location.c_str()));
160 
161  std::vector<std::string> datasets;
162  for (auto& entry : SH::rucioListDids (name))
163  {
164  if (entry.type == "CONTAINER" || entry.type == "DIDType.CONTAINER")
165  datasets.push_back (entry.name);
166  }
167 
168  auto downloadResult = SH::rucioDownloadList (location, datasets);
169  for (const auto& result : downloadResult)
170  {
171  if (result.notDownloaded != 0)
172  return false;
173  }
174  } catch (...) {
175  return false;
176  }
177  return true;
178 }
179 
180 static Status::Enum submit(SH::Sample* const sample, const bool isFirstSample)
181 {
183  using namespace EL::msgEventLoop;
184 
185  ANA_MSG_INFO( "Submitting " << sample->name() << "..." );
186 
187  static bool loaded = false;
188  if (not loaded) {
189  // TString path = "$ROOTCOREBIN/python/EventLoopGrid/ELG_prun.py";
190  // gSystem->ExpandPathName(path);
191  // TPython::LoadMacro(path.Data());
192  std::string path = PathResolverFindCalibFile("EventLoopGrid/ELG_prun.py");
193  TPython::LoadMacro(path.c_str());
194  loaded = true;
195  }
196 
197  TPython::Bind(dynamic_cast<TObject*>(sample), "ELG_SAMPLE");
198 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
199  std::any result;
200  TPython::Exec("_anyresult = ROOT.std.make_any['int'](ELG_prun(ELG_SAMPLE))", &result);
201  int ret = std::any_cast<int>(result);
202 #else
203  int ret = TPython::Eval("ELG_prun(ELG_SAMPLE)");
204 #endif
205  TPython::Bind(0, "ELG_SAMPLE");
206 
207  // Tarball is created for the first sample to be submitted
208  // then the tarball is simply reused for the other samples
209  // If the returned value is 1 it implies the tarball creation failed
210  // See EventLoopGrid/data/ELG_prun.py script
211  // Abort any further processing as the tarball was not succesfully created
212  if (isFirstSample && ret == 1){
213  ANA_MSG_ERROR("Failed to create tarball");
214  throw std::runtime_error("PrunDriver.cxx: aborting due to tarball creation issue");
215  }
216 
217  if (ret < 100) {
218  sample->meta()->setString("nc_ELG_state_details",
219  "problem submitting");
220  return Status::FAIL;
221  }
222 
223  sample->meta()->setDouble("nc_jediTaskID", ret);
224 
225  // Let's also tell people about their task ID
226  ANA_MSG_INFO( "Task submitted; jediTaskID=" << ret );
227 
228  return Status::DONE;
229 }
230 
231 static Status::Enum checkPandaTask(SH::Sample* const sample)
232 {
234  RCU_REQUIRE(static_cast<int>(sample->meta()->castDouble("nc_jediTaskID",0, SH::MetaObject::CAST_NOCAST_DEFAULT)) > 100);
235 
236  static bool loaded = false;
237  if (not loaded) {
238  std::string path = PathResolverFindCalibFile("EventLoopGrid/ELG_jediState.py");
239  TPython::LoadMacro(path.c_str());
240  loaded = true;
241  }
242 
243  TPython::Bind(dynamic_cast<TObject*>(sample), "ELG_SAMPLE");
244 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
245  std::any result;
246  TPython::Exec("_anyresult = ROOT.std.make_any['int'](ELG_jediState(ELG_SAMPLE))", &result);
247  int ret = std::any_cast<int>(result);
248 #else
249  int ret = TPython::Eval("ELG_jediState(ELG_SAMPLE)");
250 #endif
251  TPython::Bind(0, "ELG_SAMPLE");
252 
253  if (ret == Status::DONE) return Status::DONE;
254  if (ret == Status::FAIL) return Status::FAIL;
255 
256  // Value 90 corresponds to `running` state of the job
257  if (ret != 90) { sample->meta()->setString("nc_ELG_state_details", "task status other than done/finished/failed/running"); }
258  // Value 99 is returned if there is error in the script (import, missing ID)
259  if (ret == 99) {
260  sample->meta()->setString("nc_ELG_state_details",
261  "problem checking jedi task status");
262  }
263 
264  return Status::PENDING;
265 }
266 
267 static Status::Enum download(SH::Sample* const sample)
268 {
270 
271  {
272  static std::mutex mutex;
273  std::lock_guard<std::mutex> lock(mutex);
274  std::cout << "Downloading output from: "
275  << sample->name() << "..." << std::endl;
276  }
277 
278  std::string container = sample->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
279  RCU_ASSERT(not container.empty());
280  if (container[container.size()-1] == '/') {
281  container.resize(container.size() - 1);
282  }
283  container += "_hist/";
284 
285  bool downloadOk = downloadContainer(container, "elg/download/" + container);
286 
287  if (not downloadOk) {
288  std::cerr << "Failed to download one or more files" << std::endl;
289  sample->meta()->setString("nc_ELG_state_details",
290  "error, check log for details");
291  return Status::PENDING;
292  }
293 
294  return Status::DONE;
295 }
296 
297 static Status::Enum merge(SH::Sample* const sample)
298 {
300 
301  std::string container = sample->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
302  RCU_ASSERT(not container.empty());
303  if (container[container.size()-1] == '/') {
304  container.resize(container.size() - 1);
305  }
306  container += "_hist/";
307  const std::string dir = "elg/download/" + container;
308 
309  const std::string fileName = "hist-output.root";
310 
311  const std::string target = Form("hist-%s.root", sample->name().c_str());
312 
313  const std::string findCmd(Form("find %s -name \"*.%s*\" | tr '\n' ' '",
314  dir.c_str(), fileName.c_str()));
315  std::istringstream input(gSystem->GetFromPipe(findCmd.c_str()).Data());
316  std::vector<std::string> files((std::istream_iterator<std::string>(input)),
317  std::istream_iterator<std::string>());
318 
319  std::sort(files.begin(), files.end());
320  RCU_ASSERT(std::unique(files.begin(), files.end()) == files.end());
321 
322  if (not files.size()) {
323  std::cerr << "Found no input files for merging! "
324  << "Requeueing sample for download..." << std::endl;
325  sample->meta()->setString("nc_ELG_state_details", "retry, files were lost");
326  return Status::FAIL;
327  }
328 
329  try {
330  RCU::hadd(target.c_str(), files);
331  } catch (...) {
332  sample->meta()->setString("nc_ELG_state_details",
333  "error, check log for details");
334  gSystem->Exec(Form("rm -f %s", target.c_str()));
335  return Status::PENDING;
336  }
337 
338  for (size_t i = 0; i != files.size(); ++i) {
339  gSystem->Exec(Form("rm %s", files[i].c_str()));
340  }
341  gSystem->Exec(Form("rmdir %s/*", dir.c_str()));
342  gSystem->Exec(Form("rmdir %s", dir.c_str()));
343 
344  return Status::DONE;
345 }
346 
347 static void processTask(SH::Sample* const sample, const bool isFirstSample)
348 {
350 
351  JobState::Enum state = sampleState(sample);
352 
353  sample->meta()->setString("nc_ELG_state_details", "");
354 
355  Status::Enum status = Status::PENDING;
356  switch (state) {
357  case JobState::INIT:
358  status = submit(sample, isFirstSample);
359  break;
360  case JobState::RUN:
361  status = checkPandaTask(sample);
362  break;
363  case JobState::DOWNLOAD:
364  status = download(sample);
365  break;
366  case JobState::MERGE:
367  status = merge(sample);
368  break;
369  case JobState::FINISHED:
370  case JobState::FAILED:
371  break;
372  }
373 
374  state = nextState(state, status);
375  sample->meta()->setString("nc_ELG_state", JobState::name[state]);
376 }
377 
378 static void processAllInState(const SH::SampleHandler& sh, JobState::Enum state,
379  const size_t nThreads)
380 {
381  RCU_REQUIRE(sh.size());
382 
383  WorkList workList;
384 
385  bool isFirstSample = true;
386  for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
387  if (sampleState(*s) == state) {
388  workList.push_back([s, isFirstSample]()->void{ processTask(*s, isFirstSample); });
389  // Change boolean to false as already processed one sample
390  isFirstSample = false;
391  }
392  }
393  process(workList, nThreads);
394 }
395 
396 static std::string formatOutputName(const SH::MetaObject& sampleMeta,
397  const std::string & pattern)
398 {
399  const std::string sampleName = sampleMeta.castString("sample_name");
400  RCU_REQUIRE(not pattern.empty());
401  using namespace EL::msgEventLoop;
402 
403  static const std::string nickname =
404  gSystem->GetFromPipe(Form("python -c \"%s\" 2>/dev/null",
405  "from pandatools import PsubUtils;"
406  "print(PsubUtils.getNickname());")).Data();
407 
408  TString out = pattern.c_str();
409 
410  // Handle case of no proxy; will create a proxy later in the submission
411  if (nickname.length()>20){
412  ANA_MSG_WARNING( "No proxy available - cannot use nickname yet. Will try a late replacement.");
413  } else {
414  out.ReplaceAll("%nickname%", nickname);
415  }
416 
417  out.ReplaceAll("%in:name%", sampleName);
418 
419  std::stringstream ss(sampleName);
420  std::string item;
421  int field = 0;
422  while(std::getline(ss, item, '.')) {
423  std::stringstream sskey;
424  sskey << "%in:name[" << ++field << "]%";
425  out.ReplaceAll(sskey.str(), item);
426  }
427  while (out.Index("%in:") != -1) {
428  int i1 = out.Index("%in:");
429  int i2 = out.Index("%", i1+1);
430  TString metaName = out(i1+4, i2-i1-4);
431  out.ReplaceAll("%in:"+metaName+"%",
432  sampleMeta.castString(std::string(metaName.Data())));
433  }
434  out.ReplaceAll("/", "");
435  return out.Data();
436 }
437 
438 std::string outputFileNames(const EL::Job& job)
439 {
440  TList outputs;
441  for (EL::Job::outputIter out = job.outputBegin(),
442  end = job.outputEnd(); out != end; ++out) {
443  outputs.Add(out->Clone());
444  }
445  std::string out = "hist:hist-output.root";
446  TIter itr(&outputs);
447  TObject *obj = 0;
448  while ((obj = itr())) {
449  EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
450  const std::string name = os->label() + ".root";
451  const std::string ds =
452  os->options()->castString(EL::OutputStream::optContainerSuffix);
453  out += "," + (ds.empty() ? name : ds + ":" + name);
454  }
455  return out;
456 }
457 
458 // Save algortihms and lists of inputs and outputs to a root file
459 static void saveJobDef(const std::string& fileName,
460  const EL::Job& job,
461  const SH::SampleHandler sh)
462 {
463  TFile file(fileName.c_str(), "RECREATE");
464  TList outputs;
465  for (EL::Job::outputIter o = job.outputBegin(); o !=job.outputEnd(); ++o)
466  outputs.Add(o->Clone());
467  file.WriteTObject(&job.jobConfig(), "jobConfig", "SingleKey");
468  file.WriteTObject(&outputs, "outputs", "SingleKey");
469  bool haveDefault = false;
470  for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
471  const SH::MetaObject& meta = *((*s)->meta());
472  file.WriteObject(&meta, meta.castString("sample_name").c_str());
473  if (!haveDefault)
474  {
475  file.WriteObject (&meta, "defaultMetaObject");
476  haveDefault = true;
477  }
478  }
479 }
480 
481 // Create a sample handler with grid locations of outputs with given label
482 static SH::SampleHandler outputSH(const SH::SampleHandler& in,
483  const std::string& outputLabel)
484 {
486  const std::string outputFile = "*" + outputLabel + ".root*";
487  const std::string outDSSuffix = '_' + outputLabel + ".root/";
488  for (SH::SampleHandler::iterator s = in.begin(); s != in.end(); ++s) {
489  SH::SampleGrid* outSample = new SH::SampleGrid((*s)->name());
490  const std::string outputDS = (*s)->meta()->castString("nc_outDS", "", SH::MetaObject::CAST_NOCAST_DEFAULT) + outDSSuffix;
491  outSample->meta()->setString("nc_grid", outputDS);
492  outSample->meta()->setString("nc_grid_filter", outputFile);
493  out.add(outSample);
494  }
495  out.fetch(in);
496  return out;
497 }
498 
500 {
501  RCU_INVARIANT(this != 0);
502 }
503 
505 {
506  RCU_NEW_INVARIANT(this);
507 }
508 
511 {
512  using namespace msgEventLoop;
514  switch (data.step)
515  {
517  {
518  const std::string jobELGDir = data.submitDir + "/elg";
519  const std::string runShFile = jobELGDir + "/runjob.sh";
520  //const std::string runShOrig = "$ROOTCOREBIN/data/EventLoopGrid/runjob.sh";
521  const std::string mergeShFile = jobELGDir + "/elg_merge";
522  //const std::string mergeShOrig =
523  // "$ROOTCOREBIN/user_scripts/EventLoopGrid/elg_merge";
524  const std::string runShOrig = PathResolverFindCalibFile("EventLoopGrid/runjob.sh");
525  const std::string mergeShOrig = PathResolverFindCalibFile("EventLoopGrid/elg_merge");
526 
527  const std::string jobDefFile = jobELGDir + "/jobdef.root";
528  gSystem->Exec(Form("mkdir -p %s", jobELGDir.c_str()));
529  gSystem->Exec(Form("cp %s %s", runShOrig.c_str(), runShFile.c_str()));
530  gSystem->Exec(Form("chmod +x %s", runShFile.c_str()));
531  gSystem->Exec(Form("cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
532  gSystem->Exec(Form("chmod +x %s", mergeShFile.c_str()));
533 
534  // create symbolic links for additionnal files/directories if any to ship to the grid
535  std::string listToShipToGrid = data.options.castString(EL::Job::optGridPrunShipAdditionalFilesOrDirs, "");
536  // parse the list of comma separated files and/or directories to ship to the grid
537  if (listToShipToGrid.size()){
538  ANA_MSG_INFO (
539  "Creating symbolic links for additional files or directories to be sent to grid.\n"
540  "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
541  "Otherwise prun ignores those files."
542  );
543 
544  std::vector<std::string> vect_filesOrDirToShip;
545  for (auto&& part : std::views::split(listToShipToGrid, ',')) vect_filesOrDirToShip.emplace_back(part.begin(), part.end());
546  // Create symbolic links of files or directories to the submission directory
547  for (const std::string & fileOrDirToShip: vect_filesOrDirToShip){
548  ANA_MSG_INFO (("Creating symbolic link for: " +fileOrDirToShip).c_str());
549  RCU::Shell::exec("ln -sf " + fileOrDirToShip + " " + jobELGDir);
550  }
551  ANA_MSG_INFO ("Finished creation of symbolic links");
552  }
553 
554  const SH::SampleHandler& sh = data.job->sampleHandler();
555 
556  for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
557  SH::MetaObject& meta = *(*s)->meta();
558  meta.fetchDefaults(data.options);
559  meta.fetchDefaults(defaultOpts());
560  meta.setString("nc_outputs", outputFileNames(*data.job));
561  std::string outputSampleName = meta.castString("nc_outputSampleName");
562  if (outputSampleName.empty()) {
563  outputSampleName = "user.%nickname%.%in:name%";
564  }
565  meta.setString("nc_outDS", formatOutputName(meta, outputSampleName));
566  meta.setString("nc_inDS", meta.castString("nc_grid", (*s)->name()));
567  meta.setString("nc_writeInputToTxt", "IN:input.txt");
568  meta.setString("nc_match", meta.castString("nc_grid_filter"));
569  const std::string execstr = "runjob.sh " + (*s)->name();
570  meta.setString("nc_exec", execstr);
571  meta.setString("nc_framework", "EventLoopGrid");
572  }
573 
574  saveJobDef(jobDefFile, *data.job, sh);
575 
576  for (EL::Job::outputIter out = data.job->outputBegin();
577  out != data.job->outputEnd(); ++out) {
578  SH::SampleHandler shOut = outputSH(sh, out->label());
579  shOut.save(data.submitDir + "/output-" + out->label());
580  }
581  SH::SampleHandler shHist = outputSH(sh, "hist-output");
582  shHist.save(data.submitDir + "/output-hist");
583 
584  TmpCd keepDir(jobELGDir);
585 
586  processAllInState(sh, JobState::INIT, 0);
587 
588  sh.save(data.submitDir + "/input");
589  data.submitted = true;
590  }
591  break;
592 
594  {
595  ANA_CHECK (doRetrieve (data));
596  }
597  break;
598 
599  default:
600  (void) true; // safe to do nothing
601  }
602  return ::StatusCode::SUCCESS;
603 }
604 
606 {
607  RCU_READ_INVARIANT(this);
608  RCU_REQUIRE(not data.submitDir.empty());
609 
610  TmpCd tmpDir(data.submitDir);
611 
613  sh.load("input");
614  RCU_ASSERT(sh.size());
615 
616  const size_t nRunThreads = options()->castDouble("nc_run_threads", 0);
617  const size_t nDlThreads = options()->castDouble("nc_download_threads", 0);
618  processAllInState(sh, JobState::INIT, 0);
619  processAllInState(sh, JobState::RUN, nRunThreads);
620  processAllInState(sh, JobState::DOWNLOAD, nDlThreads);
621  processAllInState(sh, JobState::MERGE, 0);
622 
623  sh.save("input");
624 
625  std::cout << std::endl;
626 
627  bool allDone = true;
628  for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
629  JobState::Enum state = sampleState(*s);
630  std::string details = (*s)->meta()->castString("nc_ELG_state_details", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
631  if (not details.empty()) { details = '(' + details + ')'; }
632 
633  std::cout << (*s)->name() << "\t";
634  switch (state) {
635  case JobState::INIT:
636  case JobState::RUN:
637  case JobState::DOWNLOAD:
638  case JobState::MERGE:
639  std::cout << JobState::name[state] << "\t";
640  break;
641  case JobState::FINISHED:
642  std::cout << "\033[1;32m" << JobState::name[state] << "\033[0m\t";
643  break;
644  case JobState::FAILED:
645  std::cout << "\033[1;31m" << JobState::name[state] << "\033[0m\t";
646  break;
647  }
648  std::cout << details << std::endl;
649 
650  allDone &= (state == JobState::FINISHED || state == JobState::FAILED);
651  }
652 
653  std::cout << std::endl;
654 
655  data.retrieved = true;
656  data.completed = allDone;
657  return ::StatusCode::SUCCESS;
658 }
659 
660 void EL::PrunDriver::status(const std::string& location)
661 {
662  RCU_REQUIRE(not location.empty());
663  TmpCd tmpDir(location);
665  sh.load("input");
666  RCU_ASSERT(sh.size());
667  processAllInState(sh, JobState::RUN, 0);
668  sh.save("input");
669  for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
670  JobState::Enum state = sampleState(*s);
671  std::string details = (*s)->meta()->castString("nc_ELG_state_details", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
672  if (not details.empty()) { details = '(' + details + ')'; }
673  std::cout << (*s)->name() << "\t" << JobState::name[state]
674  << "\t" << details << std::endl;
675  }
676 }
677 
678 void EL::PrunDriver::setState(const std::string& location,
679  const std::string& task,
680  const std::string& state)
681 {
682  RCU_REQUIRE(not location.empty());
683  RCU_REQUIRE(not task.empty());
684  RCU_REQUIRE(not state.empty());
685  TmpCd tmpDir(location);
687  sh.load("input");
688  RCU_ASSERT(sh.size());
689  if (not sh.get(task)) {
690  std::cout << "Unknown task: " << task << std::endl;
691  std::cout << "Choose one of: " << std::endl;
692  sh.print();
693  return;
694  }
695  JobState::parse(state);
696  sh.get(task)->meta()->setString("nc_ELG_state", state);
697  sh.save("input");
698 }
LArG4FSStartPointFilter.part
part
Definition: LArG4FSStartPointFilter.py:21
mergePhysValFiles.pattern
pattern
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:25
HelloWorldOptions.job
job
Definition: HelloWorldOptions.py:18
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
EL::Driver::doManagerStep
virtual ::StatusCode doManagerStep(Detail::ManagerData &data) const
checkxAOD.ds
ds
Definition: Tools/PyUtils/bin/checkxAOD.py:260
SH::MetaObject::CAST_NOCAST_DEFAULT
@ CAST_NOCAST_DEFAULT
cast and return the default value if the input has the wrong type
Definition: MetaObject.h:78
SH::SampleHandler::iterator
std::vector< Sample * >::const_iterator iterator
the iterator to use
Definition: SampleHandler.h:475
INIT
#define INIT(__TYPE)
ExceptionMsg.h
get_generator_info.result
result
Definition: get_generator_info.py:21
offline_EventStorage_v5::FINISHED
@ FINISHED
Definition: v5_DataWriter.h:42
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
PowhegControl_ttHplus_NLO.ss
ss
Definition: PowhegControl_ttHplus_NLO.py:83
SH::MetaObject
A class that manages meta-data to be associated with an object.
Definition: MetaObject.h:56
EL::OutputStream
Definition: OutputStream.h:34
SH::rucioListDids
std::vector< RucioListDidsEntry > rucioListDids(const std::string &dataset)
run rucio-list-dids for the given dataset
Definition: GridTools.cxx:348
BeamSpot::mutex
std::mutex mutex
Definition: InDetBeamSpotVertex.cxx:18
Job.h
DoubleEventSelectorOverlayTest.nThreads
nThreads
Definition: DoubleEventSelectorOverlayTest.py:83
outputLabel
const std::string outputLabel
Definition: OverlapRemovalTester.cxx:69
parse
std::map< std::string, std::string > parse(const std::string &list)
Definition: egammaLayerRecalibTool.cxx:1113
RCU_REQUIRE
#define RCU_REQUIRE(x)
Definition: Assert.h:208
EL::PrunDriver::status
static void status(const std::string &location)
Definition: PrunDriver.cxx:660
OutputStream.h
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:70
ANA_MSG_ERROR
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:294
SH::SampleHandler::end
iterator end() const
the end iterator to use
SampleHandler.h
ANA_CHECK
#define ANA_CHECK(EXP)
check whether the given expression was successful
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:324
hadd.h
ShellExec.h
python.RatesEmulationExample.lock
lock
Definition: RatesEmulationExample.py:148
ReadOfcFromCool.field
field
Definition: ReadOfcFromCool.py:48
EL::PrunDriver::setState
static void setState(const std::string &location, const std::string &task, const std::string &state)
Definition: PrunDriver.cxx:678
Assert.h
MessageCheck.h
SUSY_SimplifiedModel_PostInclude.process
string process
Definition: SUSY_SimplifiedModel_PostInclude.py:43
submit
Definition: submit.py:1
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:92
SH::SampleHandler::save
void save(const std::string &directory) const
save the list of samples to the given directory
EL::PrunDriver::PrunDriver
PrunDriver()
Definition: PrunDriver.cxx:504
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
PyPoolBrowser.item
item
Definition: PyPoolBrowser.py:129
WorkList
std::vector< WorkUnit > WorkList
Definition: PhysicsAnalysis/D3PDTools/EventLoopGrid/Root/pool.h:13
RCU::hadd
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
Definition: hadd.cxx:28
EL::PrunDriver
a Driver to submit jobs via prun
Definition: PrunDriver.h:23
MetaObject.h
FullCPAlgorithmsTest_eljob.sh
sh
Definition: FullCPAlgorithmsTest_eljob.py:114
details
Definition: IParticleWriter.h:21
FullCPAlgorithmsTest_eljob.sample
sample
Definition: FullCPAlgorithmsTest_eljob.py:116
lumiFormat.i
int i
Definition: lumiFormat.py:85
internal_poltrig::MERGE
@ MERGE
Definition: PolygonTriangulator.cxx:115
RCU::Shell
Definition: ShellExec.cxx:28
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ManagerData.h
Algorithm.h
EL::Job::optGridPrunShipAdditionalFilesOrDirs
static const std::string optGridPrunShipAdditionalFilesOrDirs
Enables to ship additional files to the tarbal sent to the grid Should be a list of comma separated p...
Definition: Job.h:488
mergePhysValFiles.origDir
origDir
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:23
ANA_MSG_INFO
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:290
GridTools.h
ANA_MSG_WARNING
#define ANA_MSG_WARNING(xmsg)
Macro printing warning messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:292
generateReferenceFile.files
files
Definition: generateReferenceFile.py:12
add-xsec-uncert-quadrature-N.label
label
Definition: add-xsec-uncert-quadrature-N.py:104
file
TFile * file
Definition: tile_monitor.h:29
SH::SampleGrid
This class implements a Sample located on the grid.
Definition: SampleGrid.h:44
SH::MetaObject::castString
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:378
EL::PrunDriver::doManagerStep
virtual ::StatusCode doManagerStep(Detail::ManagerData &data) const override
Definition: PrunDriver.cxx:510
EL::Detail::ManagerStep::doRetrieve
@ doRetrieve
call the actual doRetrieve method
cmd-l1calo-dq-file.datasets
datasets
Definition: cmd-l1calo-dq-file.py:30
SH::Sample::meta
MetaObject * meta()
the meta-information for this sample
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
ReadFromCoolCompare.os
os
Definition: ReadFromCoolCompare.py:226
python.CreateTierZeroArgdict.outputs
outputs
Definition: CreateTierZeroArgdict.py:189
SH::Sample
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition: Sample.h:54
SH::MetaObject::setString
void setString(const std::string &name, const std::string &value)
set the meta-data string with the given name
beamspotman.dir
string dir
Definition: beamspotman.py:619
GetAllXsec.entry
list entry
Definition: GetAllXsec.py:132
PathResolver.h
python.ExitCodes.what
def what(code)
Definition: ExitCodes.py:73
LoadMacro
gROOT LoadMacro("../ISF_FastCaloSimParametrization/MeanAndRMS.h+")
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
PrunDriver.h
SH::SampleHandler::begin
iterator begin() const
the begin iterator to use
SH::MetaObject::fetchDefaults
void fetchDefaults(const MetaObject &source)
fetch the meta-data from the given sample not present in this sample.
SampleGrid.h
EL::OutputStream::optContainerSuffix
static const std::string optContainerSuffix
Definition: OutputStream.h:122
PathResolverFindCalibFile
std::string PathResolverFindCalibFile(const std::string &logical_file_name)
Definition: PathResolver.cxx:325
EL::PrunDriver::testInvariant
void testInvariant() const
Definition: PrunDriver.cxx:499
pool.h
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
trigbs_mixBSevents.input
input
Definition: trigbs_mixBSevents.py:56
std::sort
void sort(typename std::reverse_iterator< DataModel_detail::iterator< DVL > > beg, typename std::reverse_iterator< DataModel_detail::iterator< DVL > > end, const Compare &comp)
Specialization of sort for DataVector/List.
Definition: DVL_algorithms.h:623
copySelective.target
string target
Definition: copySelective.py:36
RCU::Shell::exec
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition: ShellExec.cxx:29
python.SystemOfUnits.s
float s
Definition: SystemOfUnits.py:147
EL::Detail::ManagerData
an internal data structure for passing data between different manager objects anbd step
Definition: ManagerData.h:46
SH::SampleHandler
A class that manages a list of Sample objects.
Definition: SampleHandler.h:60
RCU_ASSERT0
#define RCU_ASSERT0(y)
Definition: Assert.h:226
merge.status
status
Definition: merge.py:16
ManagerStep.h
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
EL::Job
Definition: Job.h:51
EL::PrunDriver::doRetrieve
::StatusCode doRetrieve(Detail::ManagerData &data) const
Definition: PrunDriver.cxx:605
python.PyAthena.obj
obj
Definition: PyAthena.py:132
outputFileNames
std::string outputFileNames(const EL::Job &job)
Definition: PrunDriver.cxx:438
RCU_ASSERT
#define RCU_ASSERT(x)
Definition: Assert.h:222
SH::rucioDownloadList
std::vector< RucioDownloadResult > rucioDownloadList(const std::string &location, const std::vector< std::string > &datasets)
run rucio-download with multiple datasets
Definition: GridTools.cxx:523
create_dcsc_inputs_sqlite.RUN
int RUN
Definition: create_dcsc_inputs_sqlite.py:45
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
EL::Detail::ManagerStep::submitJob
@ submitJob
do the actual job submission
skel.keepDir
keepDir
Definition: skel.ABtoEVGEN.py:499
Sample.h
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
getEFTrackSample.sampleName
sampleName
Definition: getEFTrackSample.py:13
ClassImp
ClassImp(EL::PrunDriver) namespace
Definition: PrunDriver.cxx:48
merge
Definition: merge.py:1
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233