ATLAS Offline Software
Functions
BatchDriver.cxx File Reference
#include <EventLoop/BatchDriver.h>
#include <EventLoop/BatchJob.h>
#include <EventLoop/BatchSample.h>
#include <EventLoop/BatchSegment.h>
#include <EventLoop/ManagerData.h>
#include <EventLoop/ManagerStep.h>
#include <EventLoop/MessageCheck.h>
#include <EventLoop/OutputStream.h>
#include <RootCoreUtils/Assert.h>
#include <RootCoreUtils/ShellExec.h>
#include <RootCoreUtils/StringUtil.h>
#include <RootCoreUtils/ThrowMsg.h>
#include <RootCoreUtils/hadd.h>
#include <SampleHandler/DiskOutputLocal.h>
#include <SampleHandler/MetaFields.h>
#include <SampleHandler/MetaVector.h>
#include <SampleHandler/Sample.h>
#include <TFile.h>
#include <TSystem.h>
#include <cmath>
#include <cstdlib>
#include <fstream>
#include <memory>
#include <sstream>

Go to the source code of this file.

Functions

 ClassImp (EL::BatchDriver) namespace EL
 

Function Documentation

◆ ClassImp()

ClassImp ( EL::BatchDriver  )
Author
Nils Krumnack

effects: fill the job object from the original job information except for the actual samples submitted guarantee: basic failures: out of memory II

effects: fill the given sample object with the information from the SampleHandler object, except for the segments that get filled separately guarantee: basic failures: out of memory II

effects: add the given sample with the given segments to the job guarantee: basic failures: out of memory II failures: segment misconfiguration requires: !segments.empty()

effects: generate the splits for the sample by file guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()

effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()

effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() postcondition: !segments.empty()

effects: fill the job object from the original job information including the actual samples submitted guarantee: basic failures: out of memory II

Definition at line 42 of file BatchDriver.cxx.

45 {
46  namespace
47  {
52  void fillJob (BatchJob& myjob, const Job& job,
53  const std::string& submitDir)
54  {
55  myjob.job = job;
56  myjob.location = submitDir;
57  }
58 
59 
60 
66  void fillSample (BatchSample& mysample,
67  const SH::Sample& sample, const SH::MetaObject& meta)
68  {
69  mysample.name = sample.name();
70  mysample.meta = *sample.meta();
71  mysample.meta.fetchDefaults (meta);
72 
73  mysample.files = sample.makeFileList ();
74  }
75 
76 
77 
84  void addSample (BatchJob& job, BatchSample sample,
85  const std::vector<BatchSegment>& segments)
86  {
87  RCU_REQUIRE (!segments.empty());
88 
89  sample.begin_segments = job.segments.size();
90 
91  RCU_ASSERT (segments[0].begin_file == 0);
92  RCU_ASSERT (segments[0].begin_event == 0);
93  for (std::size_t iter = 0, end = segments.size(); iter != end; ++ iter)
94  {
95  BatchSegment segment = segments[iter];
96 
97  segment.sampleName = sample.name;
98  {
99  std::ostringstream myname;
100  myname << sample.name << "-" << iter;
101  segment.fullName = myname.str();
102  }
103  {
104  std::ostringstream myname;
105  myname << iter;
106  segment.segmentName = myname.str();
107  }
108 
109  segment.sample = job.samples.size();
110  segment.job_id = job.segments.size();
111  if (iter+1 < end)
112  {
113  segment.end_file = segments[iter+1].begin_file;
114  segment.end_event = segments[iter+1].begin_event;
115  } else
116  {
117  segment.end_file = sample.files.size();
118  segment.end_event = 0;
119  }
120  RCU_ASSERT (segment.begin_file < segment.end_file || (segment.begin_file == segment.end_file && segment.begin_event <= segment.end_event));
121  job.segments.push_back (segment);
122  }
123 
124  sample.end_segments = job.segments.size();
125  job.samples.push_back (sample);
126  }
127 
128 
129 
130 
131 
139  void splitSampleByFile (const BatchSample& sample,
140  std::vector<BatchSegment>& segments,
141  double numJobs)
142  {
143  // RCU_REQUIRE (!sample.files.empty());
144  RCU_REQUIRE (numJobs == rint (numJobs));
145  RCU_REQUIRE (numJobs <= sample.files.size());
146 
147  for (std::size_t index = 0, end = numJobs;
148  index < end; ++ index)
149  {
150  BatchSegment segment;
151  segment.begin_file = rint (index * (sample.files.size() / numJobs));
152  segments.push_back (segment);
153  }
154  }
155 
156 
157 
158 
159 
167  void splitSampleByEvent (const BatchSample& sample,
168  std::vector<BatchSegment>& segments,
169  const std::vector<Long64_t>& eventsFile,
170  double numJobs)
171  {
172  RCU_REQUIRE (!sample.files.empty());
173  RCU_REQUIRE (sample.files.size() == eventsFile.size());
174  RCU_REQUIRE (numJobs == rint (numJobs));
175 
176  Long64_t eventsSum = 0;
177  for (std::vector<Long64_t>::const_iterator nevents = eventsFile.begin(),
178  end = eventsFile.end(); nevents != end; ++ nevents)
179  {
180  RCU_ASSERT_SOFT (*nevents >= 0);
181  eventsSum += *nevents;
182  }
183  if (numJobs > eventsSum)
184  numJobs = eventsSum;
185  Long64_t eventsMax
186  = sample.meta.castDouble (Job::optEventsPerWorker);
187  if (eventsMax > 0)
188  numJobs = ceil (double (eventsSum) / eventsMax);
189  eventsMax = Long64_t (ceil (double (eventsSum) / numJobs));
190 
191  BatchSegment segment;
192  while (std::size_t (segment.begin_file) < eventsFile.size())
193  {
194  while (segment.begin_event < eventsFile[segment.begin_file])
195  {
196  segments.push_back (segment);
197  segment.begin_event += eventsMax;
198  }
199  segment.begin_event -= eventsFile[segment.begin_file];
200  ++ segment.begin_file;
201  }
202  RCU_ASSERT (segments.size() == numJobs);
203  }
204 
205 
206 
207 
208 
215  void splitSample (const BatchSample& sample,
216  std::vector<BatchSegment>& segments)
217  {
218  // RCU_REQUIRE (!sample.files.empty());
219 
220  // rationale: I'm now calculating the number of jobs here, so
221  // that independent of whether we divide on a per-file or
222  // per-event basis we can get the same number of jobs. also,
223  // it will balance out things slightly if we only have a few
224  // files and multiple files per job.
225  const double filesPerWorker
226  = sample.meta.castDouble (Job::optFilesPerWorker, 1);
227  if (filesPerWorker < 1)
228  {
229  std::ostringstream msg;
230  msg << "invalid number of files per worker: " << filesPerWorker;
231  RCU_THROW_MSG (msg.str());
232  }
233  double numJobs = ceil (sample.files.size() / filesPerWorker);
234 
235  const TObject *meta
237  if (meta)
238  {
239  const SH::MetaVector<Long64_t> *const meta_nentries
240  = dynamic_cast<const SH::MetaVector<Long64_t> *>(meta);
241  RCU_ASSERT_SOFT (meta == meta_nentries);
242  RCU_ASSERT_SOFT (meta_nentries->value.size() == sample.files.size());
243  splitSampleByEvent (sample, segments, meta_nentries->value, numJobs);
244  } else
245  {
246  splitSampleByFile (sample, segments, numJobs);
247  }
248 
249 
250  if (segments.empty())
251  {
252  // rationale: this isn't really the proper thing to do. if a
253  // sample is empty I should just run the job locally.
254  BatchSegment empty;
255  segments.push_back (empty);
256  }
257 
258  RCU_PROVIDE (!segments.empty());
259  }
260 
261 
266  void fillFullJob (BatchJob& myjob, const Job& job,
267  const std::string& location,
268  const SH::MetaObject& meta)
269  {
270  fillJob (myjob, job, location);
271  *myjob.job.options() = meta;
272 
273  for (std::size_t sampleIndex = 0, end = job.sampleHandler().size();
274  sampleIndex != end; ++ sampleIndex)
275  {
276  BatchSample mysample;
277  fillSample (mysample, *job.sampleHandler()[sampleIndex], meta);
278 
279  std::vector<BatchSegment> subsegments;
280  splitSample (mysample, subsegments);
281  myjob.njobs_old.push_back (subsegments.size());
282 
283  addSample (myjob, mysample, subsegments);
284  }
285  }
286  }
287 
288 
289 
290  void BatchDriver ::
291  testInvariant () const
292  {
293  RCU_INVARIANT (this != 0);
294  }
295 
296 
297 
298  BatchDriver ::
299  BatchDriver ()
300  {
301  RCU_NEW_INVARIANT (this);
302  }
303 
304 
305 
306  ::StatusCode BatchDriver ::
307  doManagerStep (Detail::ManagerData& data) const
308  {
309  using namespace msgEventLoop;
310  ANA_CHECK (Driver::doManagerStep (data));
311  switch (data.step)
312  {
313  case Detail::ManagerStep::fillOptions:
314  {
315  data.sharedFileSystem
316  = data.options.castBool (Job::optBatchSharedFileSystem, true);
317  }
318  break;
319 
320  case Detail::ManagerStep::createSubmitDir:
321  {
322  if (data.sharedFileSystem) // Shared file-system, write to output
323  data.batchWriteLocation = data.submitDir;
324  else
325  data.batchWriteLocation = ".";
326 
327  if (data.sharedFileSystem) // Shared file-system, use local path
328  data.batchSubmitLocation = data.submitDir+"/submit";
329  else
330  data.batchSubmitLocation = ".";
331  }
332  break;
333 
334  case Detail::ManagerStep::updateOutputLocation:
335  {
336  const std::string writeLocation=data.batchWriteLocation;
337  for (Job::outputMIter out = data.job->outputBegin(),
338  end = data.job->outputEnd(); out != end; ++ out)
339  {
340  if (out->output() == nullptr)
341  {
342  out->output (new SH::DiskOutputLocal
343  (writeLocation + "/fetch/data-" + out->label() + "/"));
344  }
345  }
346  }
347  break;
348 
349  case Detail::ManagerStep::batchCreateDirectories:
350  {
351  ANA_MSG_DEBUG ("submitting batch job in location " << data.submitDir);
352  const std::string submitDir = data.submitDir + "/submit";
353  if (gSystem->MakeDirectory (submitDir.c_str()) != 0)
354  {
355  ANA_MSG_ERROR ("failed to create directory " + submitDir);
356  return ::StatusCode::FAILURE;
357  }
358  const std::string runDir = data.submitDir + "/run";
359  if (gSystem->MakeDirectory (runDir.c_str()) != 0)
360  {
361  ANA_MSG_ERROR ("failed to create directory " + runDir);
362  return ::StatusCode::FAILURE;
363  }
364  const std::string fetchDir = data.submitDir + "/fetch";
365  if (gSystem->MakeDirectory (fetchDir.c_str()) != 0)
366  {
367  ANA_MSG_ERROR ("failed to create directory " + fetchDir);
368  return ::StatusCode::FAILURE;
369  }
370  const std::string statusDir = data.submitDir + "/status";
371  if (gSystem->MakeDirectory (statusDir.c_str()) != 0)
372  {
373  ANA_MSG_ERROR ("failed to create directory " + statusDir);
374  return ::StatusCode::FAILURE;
375  }
376  }
377  break;
378 
379  case Detail::ManagerStep::batchCreateJob:
380  {
381  data.batchJob = std::make_unique<BatchJob> ();
382  fillFullJob (*data.batchJob, *data.job, data.submitDir, data.options);
383  data.batchJob->location=data.batchWriteLocation;
384  {
385  std::string path = data.submitDir + "/submit/config.root";
386  std::unique_ptr<TFile> file (TFile::Open (path.c_str(), "RECREATE"));
387  data.batchJob->Write ("job");
388  }
389  {
390  std::ofstream file ((data.submitDir + "/submit/segments").c_str());
391  for (std::size_t iter = 0, end = data.batchJob->segments.size();
392  iter != end; ++ iter)
393  {
394  file << iter << " " << data.batchJob->segments[iter].fullName << "\n";
395  }
396  }
397  }
398  break;
399 
400  case Detail::ManagerStep::batchScriptVar:
401  {
402  data.batchName = "run";
403  data.batchInit = "";
404  data.batchJobId = "EL_JOBID=$1\n";
405  }
406  break;
407 
408  case Detail::ManagerStep::batchMakeScript:
409  {
410  makeScript (data, data.batchJob->segments.size());
411  }
412  break;
413 
414  case Detail::ManagerStep::batchMakeIndices:
415  {
416  for (std::size_t index = 0; index != data.batchJob->segments.size(); ++ index)
417  data.batchJobIndices.push_back (index);
418  }
419  break;
420 
421  case Detail::ManagerStep::readConfigResubmit:
422  case Detail::ManagerStep::readConfigRetrieve:
423  {
424  ANA_MSG_INFO ("retrieving batch job in location " << data.submitDir);
425 
426  std::unique_ptr<TFile> file
427  {TFile::Open ((data.submitDir + "/submit/config.root").c_str(), "READ")};
428  if (file == nullptr || file->IsZombie())
429  {
430  ANA_MSG_ERROR ("failed to read config.root");
431  return ::StatusCode::FAILURE;
432  }
433  data.batchJob.reset (dynamic_cast<BatchJob*>(file->Get ("job")));
434  if (data.batchJob == nullptr)
435  {
436  ANA_MSG_ERROR ("failed to get job object from config.root");
437  return ::StatusCode::FAILURE;
438  }
439  data.job = &data.batchJob->job;
440  }
441  break;
442 
443  case Detail::ManagerStep::batchJobStatusResubmit:
444  case Detail::ManagerStep::batchJobStatusRetrieve:
445  {
446  for (std::size_t job = 0; job != data.batchJob->segments.size(); ++ job)
447  {
448  std::ostringstream completedFile;
449  completedFile << data.submitDir << "/status/completed-" << job;
450  const bool hasCompleted =
451  (gSystem->AccessPathName (completedFile.str().c_str()) == 0);
452 
453  std::ostringstream failFile;
454  failFile << data.submitDir << "/status/fail-" << job;
455  const bool hasFail =
456  (gSystem->AccessPathName (failFile.str().c_str()) == 0);
457 
458  if (hasCompleted)
459  {
460  if (hasFail)
461  {
462  ANA_MSG_ERROR ("sub-job " << job << " reported both success and failure");
463  return ::StatusCode::FAILURE;
464  } else
465  data.batchJobSuccess.insert (job);
466  } else
467  {
468  if (hasFail)
469  data.batchJobFailure.insert (job);
470  else
471  data.batchJobUnknown.insert (job);
472  }
473  }
474  ANA_MSG_INFO ("current job status: " << data.batchJobSuccess.size() << " success, " << data.batchJobFailure.size() << " failure, " << data.batchJobUnknown.size() << " running/unknown");
475  }
476  break;
477 
478  case Detail::ManagerStep::batchPreResubmit:
479  {
480  bool all_missing = false;
481  if (data.resubmitOption == "ALL_MISSING")
482  {
483  all_missing = true;
484  } else if (!data.resubmitOption.empty())
485  {
486  ANA_MSG_ERROR ("unknown resubmit option " + data.resubmitOption);
487  return ::StatusCode::FAILURE;
488  }
489 
490  for (std::size_t segment = 0; segment != data.batchJob->segments.size(); ++ segment)
491  {
492  if (all_missing)
493  {
494  if (data.batchJobSuccess.find (segment) == data.batchJobSuccess.end())
495  data.batchJobIndices.push_back (segment);
496  } else
497  {
498  if (data.batchJobFailure.find (segment) != data.batchJobFailure.end())
499  data.batchJobIndices.push_back (segment);
500  }
501  }
502 
503  if (data.batchJobIndices.empty())
504  {
505  ANA_MSG_INFO ("found no jobs to resubmit");
506  data.nextStep = Detail::ManagerStep::final;
507  return ::StatusCode::SUCCESS;
508  }
509 
510  for (std::size_t segment : data.batchJobIndices)
511  {
512  std::ostringstream command;
513  command << "rm -rf";
514  command << " " << data.submitDir << "/status/completed-" << segment;
515  command << " " << data.submitDir << "/status/fail-" << segment;
516  command << " " << data.submitDir << "/status/done-" << segment;
517  RCU::Shell::exec (command.str());
518  }
519  data.options = *data.batchJob->job.options();
520  }
521  break;
522 
523  case Detail::ManagerStep::doRetrieve:
524  {
525  bool merged = mergeHists (data);
526  if (merged)
527  {
528  diskOutputSave (data);
529  }
530  data.retrieved = true;
531  data.completed = merged;
532  }
533  break;
534 
535  default:
536  (void) true; // safe to do nothing
537  }
538  return ::StatusCode::SUCCESS;
539  }
540 
541 
542 
543  std::string BatchDriver ::
544  defaultReleaseSetup (const Detail::ManagerData& data) const
545  {
546  RCU_READ_INVARIANT (this);
547 
548  // name of tarball being made (this needs to match CondorDriver.cxx)
549  const std::string tarballName("AnalysisPackage.tar.gz");
550 
551  std::ostringstream file;
552 
553  // <path of build dir>/x86_64-slc6-gcc62-opt (comes from CMake, we need this)
554  const char *WORKDIR_DIR = getenv ("WorkDir_DIR");
555  // As a backup, keep the CMAKE_PREFIX_PATH
556  std::string CMAKE_DIR_str ( getenv ("CMAKE_PREFIX_PATH") );
557  if (WORKDIR_DIR == nullptr){
558  msgEventLoop::ANA_MSG_INFO ("Could not find environment variable $WorkDir_DIR");
559  // Instead, build from the first path in CMAKE_PREFIX_PATH
560  if (CMAKE_DIR_str.find(":") != std::string::npos){
561  // Erase everything from the colon onwards
562  CMAKE_DIR_str.erase( CMAKE_DIR_str.find(":") , std::string::npos );
563  }
564  // Provide the remainder of the string to the workdir
565  WORKDIR_DIR = CMAKE_DIR_str.data();
566  }
567 
568  if(!data.sharedFileSystem)
569  {
570  file << "mkdir -p build && tar -C build/ -xf " << tarballName << " || abortJob\n";
571  file << "\n";
572  }
573 
574 
575  file << "\n";
576  // /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/AtlasSetup/.config/.asetup.site
577  if(getenv("AtlasSetupSite")) file << "export AtlasSetupSite=" << getenv("AtlasSetupSite") << "\n";
578  // /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/AtlasSetup/V00-07-75/AtlasSetup
579  if(getenv("AtlasSetup")) file << "export AtlasSetup=" << getenv("AtlasSetup") << "\n";
580  // for now, needed because of errors like:
581  // /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/swConfig/asetup/asetupEpilog.sh: line 38:
582  // /swConfig/python/pythonFix-Linux.sh: No such file or directory
583  file << "export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase\n";
584  file << "source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh --quiet\n";
585 
586  // default setup command
587  std::ostringstream defaultSetupCommand;
588  {
589  // AnalysisBase
590  if(getenv("AtlasProject")) defaultSetupCommand << "export AtlasProject=" << getenv("AtlasProject") << "\n";
591  // 21.2.3
592  if(getenv("AtlasVersion")) defaultSetupCommand << "export AtlasVersion=" << getenv("AtlasVersion") << "\n";
593  // 2017-08-16T2249 (only set if using a nightly release)
594  if(getenv("AtlasBuildStamp")) defaultSetupCommand << "export AtlasBuildStamp=" << getenv("AtlasBuildStamp") << "\n";
595  // 21.2
596  if(getenv("AtlasBuildBranch")) defaultSetupCommand << "export AtlasBuildBranch=" << getenv("AtlasBuildBranch") << "\n";
597  // stable vs nightly
598  if(getenv("AtlasReleaseType")) defaultSetupCommand << "export AtlasReleaseType=" << getenv("AtlasReleaseType") << "\n";
599  defaultSetupCommand << "if [ \"${AtlasReleaseType}\" == \"stable\" ]; then\n";
600  defaultSetupCommand << " source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasVersion} || abortJob\n";
601  defaultSetupCommand << "else\n";
602  defaultSetupCommand << " source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasBuildBranch},${AtlasBuildStamp} || abortJob\n";
603  defaultSetupCommand << "fi\n";
604  defaultSetupCommand << "echo \"Using default setup command\"";
605  }
606 
607  file << options()->castString(Job::optBatchSetupCommand, defaultSetupCommand.str()) << " || abortJob\n";
608  if(data.sharedFileSystem) file << "source " << WORKDIR_DIR << "/setup.sh || abortJob\n";
609  else file << "source build/setup.sh || abortJob\n";
610  file << "\n";
611 
612  if(!data.sharedFileSystem)
613  {
614  std::ostringstream cmd;
615  //cmd << "tar --dereference -C " << WORKDIR_DIR << " -czf " << tarballName << " .";
616  cmd << "cpack -D CPACK_INSTALL_PREFIX=. -G TGZ --config $TestArea/CPackConfig.cmake";
617 
618  // suppress the output from the command
619  if (gSystem->Exec (cmd.str().c_str()) != 0){
620  RCU_THROW_MSG (("failed to execute: " + cmd.str()).c_str());
621  }
622 
623  std::ostringstream mv_command;
624  mv_command << "mv WorkDir_" << std::getenv("WorkDir_VERSION") << "_" << std::getenv("WorkDir_PLATFORM") << ".tar.gz " << tarballName;
625  if (gSystem->Exec (mv_command.str().c_str()) != 0){
626  RCU_THROW_MSG (("failed to execute: " + mv_command.str()).c_str());
627  }
628  }
629 
630  return file.str();
631  }
632 
633 
634 
635  void BatchDriver ::
636  makeScript (Detail::ManagerData& data,
637  std::size_t njobs) const
638  {
639  RCU_READ_INVARIANT (this);
640 
641  std::string name = data.batchName;
642  bool multiFile = (name.find ("{JOBID}") != std::string::npos);
643  for (std::size_t index = 0, end = multiFile ? njobs : 1; index != end; ++ index)
644  {
645  std::ostringstream str;
646  str << index;
647  const std::string fileName = data.submitDir + "/submit/" + RCU::substitute (name, "{JOBID}", str.str());
648 
649  {
650  std::ofstream file (fileName.c_str());
651  file << "#!/bin/bash\n";
652  file << "echo starting batch job initialization\n";
653  file << RCU::substitute (data.batchInit, "{JOBID}", str.str()) << "\n";
654  file << "echo batch job user initialization finished\n";
655  if (multiFile) file << "EL_JOBID=" << index << "\n\n";
656  else file << data.batchJobId << "\n";
657 
658  file << "function abortJob {\n";
659  file << " echo \"abort EL_JOBID=${EL_JOBID}\"\n";
660  file << " touch \"" << data.batchWriteLocation << "/status/fail-$EL_JOBID\"\n";
661  file << " touch \"" << data.batchWriteLocation << "/status/done-$EL_JOBID\"\n";
662  file << " exit 1\n";
663  file << "}\n\n";
664 
665 
666  file << "EL_JOBSEG=`grep \"^$EL_JOBID \" \"" << data.batchSubmitLocation << "/segments\" | awk ' { print $2 }'`\n";
667  file << "test \"$EL_JOBSEG\" != \"\" || abortJob\n";
668  file << "hostname\n";
669  file << "pwd\n";
670  file << "whoami\n";
671  file << shellInit << "\n";
672 
673  if(!data.sharedFileSystem)
674  { // Create output transfer directories
675  file << "mkdir \"fetch\" || abortJob\n";
676  file << "mkdir \"status\" || abortJob\n";
677  file << "\n";
678  }
679 
680  if(data.sharedFileSystem)
681  {
682  file << "test \"$TMPDIR\" == \"\" && TMPDIR=/tmp\n";
683  file << "RUNDIR=${TMPDIR}/EventLoop-Worker-$EL_JOBSEG-`date +%s`-$$\n";
684  file << "mkdir \"$RUNDIR\" || abortJob\n";
685  file << "cd \"$RUNDIR\" || abortJob\n";
686  }
687 
688  if (!data.batchSkipReleaseSetup)
689  file << defaultReleaseSetup (data);
690 
691  file << "eventloop_batch_worker $EL_JOBID '" << data.batchSubmitLocation << "/config.root' || abortJob\n";
692 
693  file << "test -f \"" << data.batchWriteLocation << "/status/completed-$EL_JOBID\" || "
694  << "touch \"" << data.batchWriteLocation << "/status/fail-$EL_JOBID\"\n";
695  file << "touch \"" << data.batchWriteLocation << "/status/done-$EL_JOBID\"\n";
696  if(data.sharedFileSystem) file << "cd .. && rm -rf \"$RUNDIR\"\n";
697  }
698 
699  {
700  std::ostringstream cmd;
701  cmd << "chmod +x " << fileName;
702  if (gSystem->Exec (cmd.str().c_str()) != 0)
703  RCU_THROW_MSG (("failed to execute: " + cmd.str()).c_str());
704  }
705  }
706  }
707 
708 
709 
710  bool BatchDriver ::
711  mergeHists (Detail::ManagerData& data)
712  {
713  using namespace msgEventLoop;
714 
715  // This picks up the DiskOutput object used to write out our
716  // histograms, we will then use that to locate the output files.
717  // this is not really the best way of doing this, but there are
718  // bigger rewrites of this code excepted, so I don't want to spend
719  // a lot of time on this now (06 Feb 19).
720  std::unique_ptr<SH::DiskOutput> origHistOutputMemory;
721  const SH::DiskOutput *origHistOutput = nullptr;
722  for (auto iter = data.job->outputBegin(), end = data.job->outputEnd();
723  iter != end; ++ iter)
724  {
725  if (iter->label() == Job::histogramStreamName)
726  origHistOutput = iter->output();
727  }
728  if (origHistOutput == nullptr)
729  {
730  origHistOutputMemory = std::make_unique<SH::DiskOutputLocal>
731  (data.submitDir + "/fetch/hist-");
732  origHistOutput = origHistOutputMemory.get();
733  }
734  RCU_ASSERT (origHistOutput != nullptr);
735 
736  bool result = true;
737 
738  ANA_MSG_DEBUG ("merging histograms in location " << data.submitDir);
739 
740  RCU_ASSERT (data.batchJob->njobs_old.size() == data.batchJob->samples.size());
741  for (std::size_t sample = 0, end = data.batchJob->samples.size();
742  sample != end; ++ sample)
743  {
744  const BatchSample& mysample (data.batchJob->samples[sample]);
745 
746  std::ostringstream output;
747  output << data.submitDir << "/hist-" << data.batchJob->samples[sample].name << ".root";
748  if (gSystem->AccessPathName (output.str().c_str()) != 0)
749  {
750  ANA_MSG_VERBOSE ("merge files for sample " << data.batchJob->samples[sample].name);
751 
752  bool complete = true;
753  std::vector<std::string> input;
754  for (std::size_t segment = mysample.begin_segments,
755  end = mysample.end_segments; segment != end; ++ segment)
756  {
757  const BatchSegment& mysegment = data.batchJob->segments[segment];
758 
759  const std::string hist_file = origHistOutput->targetURL
760  (mysegment.sampleName, mysegment.segmentName, ".root");
761 
762  ANA_MSG_VERBOSE ("merge segment " << segment << " completed=" << (data.batchJobSuccess.find(segment)!=data.batchJobSuccess.end()) << " fail=" << (data.batchJobFailure.find(segment)!=data.batchJobFailure.end()) << " unknown=" << (data.batchJobUnknown.find(segment)!=data.batchJobUnknown.end()));
763 
764  input.push_back (hist_file);
765 
766  if (data.batchJobFailure.find(segment)!=data.batchJobFailure.end())
767  {
768  std::ostringstream message;
769  message << "subjob " << segment << "/" << mysegment.fullName
770  << " failed";
771  RCU_THROW_MSG (message.str());
772  }
773  else if (data.batchJobSuccess.find(segment)==data.batchJobSuccess.end())
774  complete = false, result = false;
775  }
776  if (complete)
777  {
778  RCU::hadd (output.str(), input);
779 
780  // Merge output data directories
781  for (Job::outputIter out = data.batchJob->job.outputBegin(),
782  end = data.batchJob->job.outputEnd(); out != end; ++ out)
783  {
784  output.str("");
785  output << data.submitDir << "/data-" << out->label();
786 
787  if(gSystem->AccessPathName(output.str().c_str()))
788  gSystem->mkdir(output.str().c_str(),true);
789 
790 
791  output << "/" << data.batchJob->samples[sample].name << ".root";
792 
793  std::vector<std::string> input;
794  for (std::size_t segment = mysample.begin_segments,
795  end = mysample.end_segments; segment != end; ++ segment)
796  {
797  const BatchSegment& mysegment = data.batchJob->segments[segment];
798 
799  const std::string infile =
800  data.submitDir + "/fetch/data-" + out->label() + "/" + mysegment.fullName + ".root";
801 
802  input.push_back (infile);
803  }
804 
805  RCU::hadd(output.str(), input);
806  }
807  }
808  }
809  }
810  return result;
811  }
812 }
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
get_generator_info.result
result
Definition: get_generator_info.py:21
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
run.infile
string infile
Definition: run.py:13
LHEonly.njobs
njobs
Definition: LHEonly.py:14
SH::MetaObject
A class that manages meta-data to be associated with an object.
Definition: MetaObject.h:56
index
Definition: index.py:1
SH::DiskOutputLocal
an implementation of DiskOutput for local disks
Definition: DiskOutputLocal.h:23
rerun_display.cmd
string cmd
Definition: rerun_display.py:67
RCU_REQUIRE
#define RCU_REQUIRE(x)
Definition: Assert.h:208
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
SH::MetaVector::value
std::vector< T > value
the value contained
Definition: MetaVector.h:65
ANA_MSG_ERROR
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:294
ANA_CHECK
#define ANA_CHECK(EXP)
check whether the given expression was successful
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:324
SH::MetaVector
This class defines a templatized version of the meta-data in vector form.
Definition: D3PDTools/SampleHandler/SampleHandler/Global.h:42
empty
bool empty(TH1 *h)
Definition: computils.cxx:295
ReweightUtils.message
message
Definition: ReweightUtils.py:15
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
RCU::hadd
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
Definition: hadd.cxx:28
RCU_PROVIDE
#define RCU_PROVIDE(x)
Definition: Assert.h:215
SH::DiskOutput
a class/interface representing an output location for files
Definition: DiskOutput.h:46
SH::DiskOutput::targetURL
std::string targetURL(const std::string &sampleName, const std::string &segmentName, const std::string &suffix) const
the final output location for the given segment
FortranAlgorithmOptions.fileName
fileName
Definition: FortranAlgorithmOptions.py:13
FullCPAlgorithmsTest_eljob.sample
sample
Definition: FullCPAlgorithmsTest_eljob.py:113
RCU_ASSERT_SOFT
#define RCU_ASSERT_SOFT(x)
Definition: Assert.h:167
FullCPAlgorithmsTest_eljob.submitDir
submitDir
Definition: FullCPAlgorithmsTest_eljob.py:160
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
SH::MetaFields::numEventsPerFile
static const std::string numEventsPerFile
the number of events in each file
Definition: MetaFields.h:67
ANA_MSG_INFO
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:290
PlotPulseshapeFromCool.input
input
Definition: PlotPulseshapeFromCool.py:106
file
TFile * file
Definition: tile_monitor.h:29
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:379
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
SH::Sample
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition: Sample.h:54
merge.output
output
Definition: merge.py:17
ANA_MSG_VERBOSE
#define ANA_MSG_VERBOSE(xmsg)
Macro printing verbose messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:286
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
RCU::substitute
std::string substitute(const std::string &str, const std::string &pattern, const std::string &with)
effects: substitute all occurences of "pattern" with "with" in the string "str" returns: the substitu...
Definition: StringUtil.cxx:24
DeMoScan.index
string index
Definition: DeMoScan.py:364
SCT_ConditionsAlgorithms::CoveritySafe::getenv
std::string getenv(const std::string &variableName)
get an environment variable
Definition: SCT_ConditionsUtilities.cxx:17
python.CaloScaleNoiseConfig.str
str
Definition: CaloScaleNoiseConfig.py:78
plotmaker.hist_file
hist_file
Definition: plotmaker.py:129
RCU::Shell::exec
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition: ShellExec.cxx:29
str
Definition: BTagTrackIpAccessor.cxx:11
LArG4GenerateShowerLib.nevents
nevents
Definition: LArG4GenerateShowerLib.py:19
RCU_THROW_MSG
#define RCU_THROW_MSG(message)
Definition: PrintMsg.h:58
SH::splitSample
SampleHandler splitSample(Sample &sample, const Long64_t nevt)
effects: split the given sample into a set of samples, with each sample containing either exactly one...
Definition: ToolsSplit.cxx:79
test_interactive_athena.job
job
Definition: test_interactive_athena.py:6
skel.eventsFile
eventsFile
Events files.
Definition: skel.ABtoEVGEN.py:458
RCU_ASSERT
#define RCU_ASSERT(x)
Definition: Assert.h:222
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
get_generator_info.command
string command
Definition: get_generator_info.py:38
NSWL1::PadTriggerAdapter::segment
Muon::NSW_PadTriggerSegment segment(const NSWL1::PadTrigger &data)
Definition: PadTriggerAdapter.cxx:5
ANA_MSG_DEBUG
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:288
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233