effects: fill the job object from the original job information except for the actual samples submitted guarantee: basic failures: out of memory II
effects: fill the given sample object with the information from the SampleHandler object, except for the segments that get filled separately guarantee: basic failures: out of memory II
effects: add the given sample with the given segments to the job guarantee: basic failures: out of memory II failures: segment misconfiguration requires: !segments.empty()
effects: generate the splits for the sample by file guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()
effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()
effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() postcondition: !segments.empty()
effects: fill the job object from the original job information including the actual samples submitted guarantee: basic failures: out of memory II
52 void fillJob (BatchJob& myjob,
const Job&
job,
66 void fillSample (BatchSample& mysample,
69 mysample.name =
sample.name();
70 mysample.meta = *
sample.meta();
71 mysample.meta.fetchDefaults (meta);
73 mysample.files =
sample.makeFileList ();
84 void addSample (BatchJob&
job, BatchSample
sample,
85 const std::vector<BatchSegment>& segments)
89 sample.begin_segments =
job.segments.size();
93 for (std::size_t iter = 0,
end = segments.size(); iter !=
end; ++ iter)
95 BatchSegment
segment = segments[iter];
99 std::ostringstream myname;
100 myname <<
sample.name <<
"-" << iter;
101 segment.fullName = myname.str();
104 std::ostringstream myname;
106 segment.segmentName = myname.str();
113 segment.end_file = segments[iter+1].begin_file;
114 segment.end_event = segments[iter+1].begin_event;
124 sample.end_segments =
job.segments.size();
139 void splitSampleByFile (
const BatchSample&
sample,
140 std::vector<BatchSegment>& segments,
147 for (std::size_t
index = 0,
end = numJobs;
167 void splitSampleByEvent (
const BatchSample&
sample,
168 std::vector<BatchSegment>& segments,
176 Long64_t eventsSum = 0;
183 if (numJobs > eventsSum)
186 =
sample.meta.castDouble (Job::optEventsPerWorker);
188 numJobs = ceil (
double (eventsSum) / eventsMax);
189 eventsMax = Long64_t (ceil (
double (eventsSum) / numJobs));
197 segment.begin_event += eventsMax;
216 std::vector<BatchSegment>& segments)
225 const double filesPerWorker
226 =
sample.meta.castDouble (Job::optFilesPerWorker, 1);
227 if (filesPerWorker < 1)
229 std::ostringstream
msg;
230 msg <<
"invalid number of files per worker: " << filesPerWorker;
233 double numJobs = ceil (
sample.files.size() / filesPerWorker);
243 splitSampleByEvent (
sample, segments, meta_nentries->
value, numJobs);
246 splitSampleByFile (
sample, segments, numJobs);
250 if (segments.empty())
255 segments.push_back (empty);
266 void fillFullJob (BatchJob& myjob,
const Job&
job,
267 const std::string& location,
270 fillJob (myjob,
job, location);
271 *myjob.job.options() = meta;
273 for (std::size_t sampleIndex = 0,
end =
job.sampleHandler().size();
274 sampleIndex !=
end; ++ sampleIndex)
276 BatchSample mysample;
277 fillSample (mysample, *
job.sampleHandler()[sampleIndex], meta);
279 std::vector<BatchSegment> subsegments;
281 myjob.njobs_old.push_back (subsegments.size());
283 addSample (myjob, mysample, subsegments);
291 testInvariant ()
const
307 doManagerStep (Detail::ManagerData&
data)
const
309 using namespace msgEventLoop;
313 case Detail::ManagerStep::fillOptions:
315 data.sharedFileSystem
316 =
data.options.castBool (Job::optBatchSharedFileSystem,
true);
320 case Detail::ManagerStep::createSubmitDir:
322 if (
data.sharedFileSystem)
323 data.batchWriteLocation =
data.submitDir;
325 data.batchWriteLocation =
".";
327 if (
data.sharedFileSystem)
328 data.batchSubmitLocation =
data.submitDir+
"/submit";
330 data.batchSubmitLocation =
".";
334 case Detail::ManagerStep::updateOutputLocation:
336 const std::string writeLocation=
data.batchWriteLocation;
337 for (Job::outputMIter
out =
data.job->outputBegin(),
340 if (
out->output() ==
nullptr)
343 (writeLocation +
"/fetch/data-" +
out->label() +
"/"));
349 case Detail::ManagerStep::batchCreateDirectories:
353 if (gSystem->MakeDirectory (
submitDir.c_str()) != 0)
356 return ::StatusCode::FAILURE;
358 const std::string runDir =
data.submitDir +
"/run";
359 if (gSystem->MakeDirectory (runDir.c_str()) != 0)
362 return ::StatusCode::FAILURE;
364 const std::string fetchDir =
data.submitDir +
"/fetch";
365 if (gSystem->MakeDirectory (fetchDir.c_str()) != 0)
368 return ::StatusCode::FAILURE;
370 const std::string statusDir =
data.submitDir +
"/status";
371 if (gSystem->MakeDirectory (statusDir.c_str()) != 0)
374 return ::StatusCode::FAILURE;
379 case Detail::ManagerStep::batchCreateJob:
381 data.batchJob = std::make_unique<BatchJob> ();
383 data.batchJob->location=
data.batchWriteLocation;
385 std::string
path =
data.submitDir +
"/submit/config.root";
386 std::unique_ptr<TFile>
file (TFile::Open (
path.c_str(),
"RECREATE"));
387 data.batchJob->Write (
"job");
390 std::ofstream
file ((
data.submitDir +
"/submit/segments").c_str());
391 for (std::size_t iter = 0,
end =
data.batchJob->segments.size();
392 iter !=
end; ++ iter)
394 file << iter <<
" " <<
data.batchJob->segments[iter].fullName <<
"\n";
400 case Detail::ManagerStep::batchScriptVar:
402 data.batchName =
"run";
404 data.batchJobId =
"EL_JOBID=$1\n";
408 case Detail::ManagerStep::batchMakeScript:
410 makeScript (
data,
data.batchJob->segments.size());
414 case Detail::ManagerStep::batchMakeIndices:
421 case Detail::ManagerStep::readConfigResubmit:
422 case Detail::ManagerStep::readConfigRetrieve:
426 std::unique_ptr<TFile>
file
427 {TFile::Open ((
data.submitDir +
"/submit/config.root").c_str(),
"READ")};
428 if (
file ==
nullptr ||
file->IsZombie())
431 return ::StatusCode::FAILURE;
433 data.batchJob.reset (
dynamic_cast<BatchJob*
>(
file->Get (
"job")));
434 if (
data.batchJob ==
nullptr)
437 return ::StatusCode::FAILURE;
443 case Detail::ManagerStep::batchJobStatusResubmit:
444 case Detail::ManagerStep::batchJobStatusRetrieve:
446 for (std::size_t
job = 0;
job !=
data.batchJob->segments.size(); ++
job)
448 std::ostringstream completedFile;
449 completedFile <<
data.submitDir <<
"/status/completed-" <<
job;
450 const bool hasCompleted =
451 (gSystem->AccessPathName (completedFile.str().c_str()) == 0);
453 std::ostringstream failFile;
454 failFile <<
data.submitDir <<
"/status/fail-" <<
job;
456 (gSystem->AccessPathName (failFile.str().c_str()) == 0);
463 return ::StatusCode::FAILURE;
465 data.batchJobSuccess.insert (
job);
469 data.batchJobFailure.insert (
job);
471 data.batchJobUnknown.insert (
job);
474 ANA_MSG_INFO (
"current job status: " <<
data.batchJobSuccess.size() <<
" success, " <<
data.batchJobFailure.size() <<
" failure, " <<
data.batchJobUnknown.size() <<
" running/unknown");
478 case Detail::ManagerStep::batchPreResubmit:
480 bool all_missing =
false;
481 if (
data.resubmitOption ==
"ALL_MISSING")
484 }
else if (!
data.resubmitOption.empty())
487 return ::StatusCode::FAILURE;
503 if (
data.batchJobIndices.empty())
506 data.nextStep = Detail::ManagerStep::final;
507 return ::StatusCode::SUCCESS;
519 data.options = *
data.batchJob->job.options();
523 case Detail::ManagerStep::doRetrieve:
525 bool merged = mergeHists (
data);
528 diskOutputSave (
data);
530 data.retrieved =
true;
531 data.completed = merged;
538 return ::StatusCode::SUCCESS;
543 std::string BatchDriver ::
544 defaultReleaseSetup (
const Detail::ManagerData&
data)
const
549 const std::string tarballName(
"AnalysisPackage.tar.gz");
551 std::ostringstream
file;
554 const char *WORKDIR_DIR =
getenv (
"WorkDir_DIR");
556 std::string CMAKE_DIR_str (
getenv (
"CMAKE_PREFIX_PATH") );
557 if (WORKDIR_DIR ==
nullptr){
560 if (CMAKE_DIR_str.find(
":") != std::string::npos){
562 CMAKE_DIR_str.erase( CMAKE_DIR_str.find(
":") , std::string::npos );
565 WORKDIR_DIR = CMAKE_DIR_str.data();
568 if(!
data.sharedFileSystem)
570 file <<
"mkdir -p build && tar -C build/ -xf " << tarballName <<
" || abortJob\n";
577 if(
getenv(
"AtlasSetupSite"))
file <<
"export AtlasSetupSite=" <<
getenv(
"AtlasSetupSite") <<
"\n";
579 if(
getenv(
"AtlasSetup"))
file <<
"export AtlasSetup=" <<
getenv(
"AtlasSetup") <<
"\n";
583 file <<
"export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase\n";
584 file <<
"source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh --quiet\n";
587 std::ostringstream defaultSetupCommand;
590 if(
getenv(
"AtlasProject")) defaultSetupCommand <<
"export AtlasProject=" <<
getenv(
"AtlasProject") <<
"\n";
592 if(
getenv(
"AtlasVersion")) defaultSetupCommand <<
"export AtlasVersion=" <<
getenv(
"AtlasVersion") <<
"\n";
594 if(
getenv(
"AtlasBuildStamp")) defaultSetupCommand <<
"export AtlasBuildStamp=" <<
getenv(
"AtlasBuildStamp") <<
"\n";
596 if(
getenv(
"AtlasBuildBranch")) defaultSetupCommand <<
"export AtlasBuildBranch=" <<
getenv(
"AtlasBuildBranch") <<
"\n";
598 if(
getenv(
"AtlasReleaseType")) defaultSetupCommand <<
"export AtlasReleaseType=" <<
getenv(
"AtlasReleaseType") <<
"\n";
599 defaultSetupCommand <<
"if [ \"${AtlasReleaseType}\" == \"stable\" ]; then\n";
600 defaultSetupCommand <<
" source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasVersion} || abortJob\n";
601 defaultSetupCommand <<
"else\n";
602 defaultSetupCommand <<
" source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasBuildBranch},${AtlasBuildStamp} || abortJob\n";
603 defaultSetupCommand <<
"fi\n";
604 defaultSetupCommand <<
"echo \"Using default setup command\"";
607 file <<
options()->castString(Job::optBatchSetupCommand, defaultSetupCommand.str()) <<
" || abortJob\n";
608 if(
data.sharedFileSystem)
file <<
"source " << WORKDIR_DIR <<
"/setup.sh || abortJob\n";
609 else file <<
"source build/setup.sh || abortJob\n";
612 if(!
data.sharedFileSystem)
614 std::ostringstream
cmd;
616 cmd <<
"cpack -D CPACK_INSTALL_PREFIX=. -G TGZ --config $TestArea/CPackConfig.cmake";
619 if (gSystem->Exec (
cmd.str().c_str()) != 0){
623 std::ostringstream mv_command;
624 mv_command <<
"mv WorkDir_" <<
std::getenv(
"WorkDir_VERSION") <<
"_" <<
std::getenv(
"WorkDir_PLATFORM") <<
".tar.gz " << tarballName;
625 if (gSystem->Exec (mv_command.str().c_str()) != 0){
626 RCU_THROW_MSG ((
"failed to execute: " + mv_command.str()).c_str());
636 makeScript (Detail::ManagerData&
data,
637 std::size_t
njobs)
const
642 bool multiFile = (
name.find (
"{JOBID}") != std::string::npos);
645 std::ostringstream
str;
651 file <<
"#!/bin/bash\n";
652 file <<
"echo starting batch job initialization\n";
654 file <<
"echo batch job user initialization finished\n";
655 if (multiFile)
file <<
"EL_JOBID=" <<
index <<
"\n\n";
656 else file <<
data.batchJobId <<
"\n";
658 file <<
"function abortJob {\n";
659 file <<
" echo \"abort EL_JOBID=${EL_JOBID}\"\n";
660 file <<
" touch \"" <<
data.batchWriteLocation <<
"/status/fail-$EL_JOBID\"\n";
661 file <<
" touch \"" <<
data.batchWriteLocation <<
"/status/done-$EL_JOBID\"\n";
666 file <<
"EL_JOBSEG=`grep \"^$EL_JOBID \" \"" <<
data.batchSubmitLocation <<
"/segments\" | awk ' { print $2 }'`\n";
667 file <<
"test \"$EL_JOBSEG\" != \"\" || abortJob\n";
668 file <<
"hostname\n";
671 file << shellInit <<
"\n";
673 if(!
data.sharedFileSystem)
675 file <<
"mkdir \"fetch\" || abortJob\n";
676 file <<
"mkdir \"status\" || abortJob\n";
680 if(
data.sharedFileSystem)
682 file <<
"test \"$TMPDIR\" == \"\" && TMPDIR=/tmp\n";
683 file <<
"RUNDIR=${TMPDIR}/EventLoop-Worker-$EL_JOBSEG-`date +%s`-$$\n";
684 file <<
"mkdir \"$RUNDIR\" || abortJob\n";
685 file <<
"cd \"$RUNDIR\" || abortJob\n";
688 if (!
data.batchSkipReleaseSetup)
691 file <<
"eventloop_batch_worker $EL_JOBID '" <<
data.batchSubmitLocation <<
"/config.root' || abortJob\n";
693 file <<
"test -f \"" <<
data.batchWriteLocation <<
"/status/completed-$EL_JOBID\" || "
694 <<
"touch \"" <<
data.batchWriteLocation <<
"/status/fail-$EL_JOBID\"\n";
695 file <<
"touch \"" <<
data.batchWriteLocation <<
"/status/done-$EL_JOBID\"\n";
696 if(
data.sharedFileSystem)
file <<
"cd .. && rm -rf \"$RUNDIR\"\n";
700 std::ostringstream
cmd;
702 if (gSystem->Exec (
cmd.str().c_str()) != 0)
711 mergeHists (Detail::ManagerData&
data)
713 using namespace msgEventLoop;
720 std::unique_ptr<SH::DiskOutput> origHistOutputMemory;
722 for (
auto iter =
data.job->outputBegin(),
end =
data.job->outputEnd();
723 iter !=
end; ++ iter)
725 if (iter->label() == Job::histogramStreamName)
726 origHistOutput = iter->output();
728 if (origHistOutput ==
nullptr)
730 origHistOutputMemory = std::make_unique<SH::DiskOutputLocal>
731 (
data.submitDir +
"/fetch/hist-");
732 origHistOutput = origHistOutputMemory.get();
741 for (std::size_t
sample = 0,
end =
data.batchJob->samples.size();
744 const BatchSample& mysample (
data.batchJob->samples[
sample]);
746 std::ostringstream
output;
748 if (gSystem->AccessPathName (
output.str().c_str()) != 0)
752 bool complete =
true;
753 std::vector<std::string>
input;
754 for (std::size_t
segment = mysample.begin_segments,
757 const BatchSegment& mysegment =
data.batchJob->segments[
segment];
760 (mysegment.sampleName, mysegment.segmentName,
".root");
773 else if (
data.batchJobSuccess.find(
segment)==
data.batchJobSuccess.end())
774 complete =
false,
result =
false;
781 for (Job::outputIter
out =
data.batchJob->job.outputBegin(),
787 if(gSystem->AccessPathName(
output.str().c_str()))
788 gSystem->mkdir(
output.str().c_str(),
true);
793 std::vector<std::string>
input;
794 for (std::size_t
segment = mysample.begin_segments,
797 const BatchSegment& mysegment =
data.batchJob->segments[
segment];
799 const std::string
infile =
800 data.submitDir +
"/fetch/data-" +
out->label() +
"/" + mysegment.fullName +
".root";