52 void fillJob (BatchJob& myjob,
const Job& job,
53 const std::string& submitDir)
56 myjob.location = submitDir;
66 void fillSample (BatchSample& mysample,
69 mysample.name = sample.name();
70 mysample.meta = *sample.meta();
71 mysample.meta.fetchDefaults (
meta);
73 mysample.files = sample.makeFileList ();
84 void addSample (BatchJob& job, BatchSample sample,
85 const std::vector<BatchSegment>& segments)
89 sample.begin_segments = job.segments.size();
93 for (std::size_t iter = 0, end = segments.size(); iter != end; ++ iter)
95 BatchSegment segment = segments[iter];
97 segment.sampleName = sample.name;
99 std::ostringstream myname;
100 myname << sample.name <<
"-" << iter;
101 segment.fullName = myname.str();
104 std::ostringstream myname;
106 segment.segmentName = myname.str();
109 segment.sample = job.samples.size();
110 segment.job_id = job.segments.size();
113 segment.end_file = segments[iter+1].begin_file;
114 segment.end_event = segments[iter+1].begin_event;
117 segment.end_file = sample.files.size();
118 segment.end_event = 0;
120 RCU_ASSERT (segment.begin_file < segment.end_file || (segment.begin_file == segment.end_file && segment.begin_event <= segment.end_event));
121 job.segments.push_back (segment);
124 sample.end_segments = job.segments.size();
125 job.samples.push_back (sample);
139 void splitSampleByFile (
const BatchSample& sample,
140 std::vector<BatchSegment>& segments,
147 for (std::size_t
index = 0, end = numJobs;
150 BatchSegment segment;
151 segment.begin_file = rint (
index * (sample.files.size() / numJobs));
152 segments.push_back (segment);
167 void splitSampleByEvent (
const BatchSample& sample,
168 std::vector<BatchSegment>& segments,
169 const std::vector<Long64_t>& eventsFile,
173 RCU_REQUIRE (sample.files.size() == eventsFile.size());
176 Long64_t eventsSum = 0;
177 for (std::vector<Long64_t>::const_iterator nevents = eventsFile.begin(),
178 end = eventsFile.end(); nevents != end; ++ nevents)
181 eventsSum += *nevents;
183 if (numJobs > eventsSum)
188 numJobs = ceil (
double (eventsSum) / eventsMax);
189 eventsMax = Long64_t (ceil (
double (eventsSum) / numJobs));
191 BatchSegment segment;
192 while (std::size_t (segment.begin_file) < eventsFile.size())
194 while (segment.begin_event < eventsFile[segment.begin_file])
196 segments.push_back (segment);
197 segment.begin_event += eventsMax;
199 segment.begin_event -= eventsFile[segment.begin_file];
200 ++ segment.begin_file;
215 void splitSample (
const BatchSample& sample,
216 std::vector<BatchSegment>& segments)
225 const double filesPerWorker
227 if (filesPerWorker < 1)
229 std::ostringstream
msg;
230 msg <<
"invalid number of files per worker: " << filesPerWorker;
233 double numJobs = ceil (sample.files.size() / filesPerWorker);
243 splitSampleByEvent (sample, segments, meta_nentries->
value, numJobs);
246 splitSampleByFile (sample, segments, numJobs);
250 if (segments.empty())
255 segments.push_back (
empty);
266 void fillFullJob (BatchJob& myjob,
const Job& job,
267 const std::string& location,
270 fillJob (myjob, job, location);
271 *myjob.job.options() =
meta;
273 for (std::size_t sampleIndex = 0, end = job.sampleHandler().size();
274 sampleIndex != end; ++ sampleIndex)
276 BatchSample mysample;
277 fillSample (mysample, *job.sampleHandler()[sampleIndex],
meta);
279 std::vector<BatchSegment> subsegments;
280 splitSample (mysample, subsegments);
281 myjob.njobs_old.push_back (subsegments.size());
283 addSample (myjob, mysample, subsegments);
291 testInvariant ()
const
306 ::StatusCode BatchDriver ::
307 doManagerStep (Detail::ManagerData&
data)
const
309 using namespace msgEventLoop;
315 data.sharedFileSystem
322 if (
data.sharedFileSystem)
323 data.batchWriteLocation =
data.submitDir;
325 data.batchWriteLocation =
".";
327 if (
data.sharedFileSystem)
328 data.batchSubmitLocation =
data.submitDir+
"/submit";
330 data.batchSubmitLocation =
".";
336 const std::string writeLocation=
data.batchWriteLocation;
338 end =
data.job->outputEnd(); out != end; ++ out)
340 if (out->output() ==
nullptr)
343 (writeLocation +
"/fetch/data-" + out->label() +
"/"));
352 const std::string submitDir =
data.submitDir +
"/submit";
353 if (gSystem->MakeDirectory (submitDir.c_str()) != 0)
356 return ::StatusCode::FAILURE;
358 const std::string runDir =
data.submitDir +
"/run";
359 if (gSystem->MakeDirectory (runDir.c_str()) != 0)
362 return ::StatusCode::FAILURE;
364 const std::string fetchDir =
data.submitDir +
"/fetch";
365 if (gSystem->MakeDirectory (fetchDir.c_str()) != 0)
368 return ::StatusCode::FAILURE;
370 const std::string statusDir =
data.submitDir +
"/status";
371 if (gSystem->MakeDirectory (statusDir.c_str()) != 0)
374 return ::StatusCode::FAILURE;
381 data.batchJob = std::make_unique<BatchJob> ();
383 data.batchJob->location=
data.batchWriteLocation;
385 std::string path =
data.submitDir +
"/submit/config.root";
386 std::unique_ptr<TFile>
file (TFile::Open (path.c_str(),
"RECREATE"));
387 data.batchJob->Write (
"job");
390 std::ofstream
file ((
data.submitDir +
"/submit/segments").c_str());
391 for (std::size_t iter = 0, end =
data.batchJob->segments.size();
392 iter != end; ++ iter)
394 file << iter <<
" " <<
data.batchJob->segments[iter].fullName <<
"\n";
402 data.batchName =
"run";
404 data.batchJobId =
"EL_JOBID=$1\n";
410 makeScript (
data,
data.batchJob->segments.size());
426 std::unique_ptr<TFile>
file
427 {TFile::Open ((
data.submitDir +
"/submit/config.root").c_str(),
"READ")};
428 if (
file ==
nullptr ||
file->IsZombie())
431 return ::StatusCode::FAILURE;
433 data.batchJob.reset (
dynamic_cast<BatchJob*
>(
file->Get (
"job")));
434 if (
data.batchJob ==
nullptr)
437 return ::StatusCode::FAILURE;
446 for (std::size_t job = 0; job !=
data.batchJob->segments.size(); ++ job)
448 std::ostringstream completedFile;
449 completedFile <<
data.submitDir <<
"/status/completed-" << job;
450 const bool hasCompleted =
451 (gSystem->AccessPathName (completedFile.str().c_str()) == 0);
453 std::ostringstream failFile;
454 failFile <<
data.submitDir <<
"/status/fail-" << job;
456 (gSystem->AccessPathName (failFile.str().c_str()) == 0);
462 ANA_MSG_ERROR (
"sub-job " << job <<
" reported both success and failure");
463 return ::StatusCode::FAILURE;
465 data.batchJobSuccess.insert (job);
469 data.batchJobFailure.insert (job);
471 data.batchJobUnknown.insert (job);
474 ANA_MSG_INFO (
"current job status: " <<
data.batchJobSuccess.size() <<
" success, " <<
data.batchJobFailure.size() <<
" failure, " <<
data.batchJobUnknown.size() <<
" running/unknown");
480 bool all_missing =
false;
481 if (
data.resubmitOption ==
"ALL_MISSING")
484 }
else if (!
data.resubmitOption.empty())
487 return ::StatusCode::FAILURE;
490 for (std::size_t segment = 0; segment !=
data.batchJob->segments.size(); ++ segment)
494 if (
data.batchJobSuccess.find (segment) ==
data.batchJobSuccess.end())
495 data.batchJobIndices.push_back (segment);
498 if (
data.batchJobFailure.find (segment) !=
data.batchJobFailure.end())
499 data.batchJobIndices.push_back (segment);
503 if (
data.batchJobIndices.empty())
507 return ::StatusCode::SUCCESS;
510 for (std::size_t segment :
data.batchJobIndices)
512 std::ostringstream command;
514 command <<
" " <<
data.submitDir <<
"/status/completed-" << segment;
515 command <<
" " <<
data.submitDir <<
"/status/fail-" << segment;
516 command <<
" " <<
data.submitDir <<
"/status/done-" << segment;
519 data.options = *
data.batchJob->job.options();
525 bool merged = mergeHists (
data);
528 diskOutputSave (
data);
530 data.retrieved =
true;
531 data.completed = merged;
538 return ::StatusCode::SUCCESS;
543 std::string BatchDriver ::
544 defaultReleaseSetup (
const Detail::ManagerData&
data)
const
549 const std::string tarballName(
"AnalysisPackage.tar.gz");
551 std::ostringstream
file;
554 const char *WORKDIR_DIR = getenv (
"WorkDir_DIR");
556 std::string CMAKE_DIR_str ( getenv (
"CMAKE_PREFIX_PATH") );
557 if (WORKDIR_DIR ==
nullptr){
558 msgEventLoop::ANA_MSG_INFO (
"Could not find environment variable $WorkDir_DIR");
560 if (CMAKE_DIR_str.find(
":") != std::string::npos){
562 CMAKE_DIR_str.erase( CMAKE_DIR_str.find(
":") , std::string::npos );
565 WORKDIR_DIR = CMAKE_DIR_str.data();
568 if(!
data.sharedFileSystem)
570 file <<
"mkdir -p build && tar -C build/ -xf " << tarballName <<
" || abortJob\n";
577 if(getenv(
"AtlasSetupSite"))
file <<
"export AtlasSetupSite=" << getenv(
"AtlasSetupSite") <<
"\n";
579 if(getenv(
"AtlasSetup"))
file <<
"export AtlasSetup=" << getenv(
"AtlasSetup") <<
"\n";
583 file <<
"export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase\n";
584 file <<
"source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh --quiet\n";
587 std::ostringstream defaultSetupCommand;
590 if(getenv(
"AtlasProject")) defaultSetupCommand <<
"export AtlasProject=" << getenv(
"AtlasProject") <<
"\n";
592 if(getenv(
"AtlasVersion")) defaultSetupCommand <<
"export AtlasVersion=" << getenv(
"AtlasVersion") <<
"\n";
594 if(getenv(
"AtlasBuildStamp")) defaultSetupCommand <<
"export AtlasBuildStamp=" << getenv(
"AtlasBuildStamp") <<
"\n";
596 if(getenv(
"AtlasBuildBranch")) defaultSetupCommand <<
"export AtlasBuildBranch=" << getenv(
"AtlasBuildBranch") <<
"\n";
598 if(getenv(
"AtlasReleaseType")) defaultSetupCommand <<
"export AtlasReleaseType=" << getenv(
"AtlasReleaseType") <<
"\n";
599 defaultSetupCommand <<
"if [ \"${AtlasReleaseType}\" == \"stable\" ]; then\n";
600 defaultSetupCommand <<
" source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasVersion} || abortJob\n";
601 defaultSetupCommand <<
"else\n";
602 defaultSetupCommand <<
" source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasBuildBranch},${AtlasBuildStamp} || abortJob\n";
603 defaultSetupCommand <<
"fi\n";
604 defaultSetupCommand <<
"echo \"Using default setup command\"";
608 if(
data.sharedFileSystem)
file <<
"source " << WORKDIR_DIR <<
"/setup.sh || abortJob\n";
609 else file <<
"source build/setup.sh || abortJob\n";
612 if(!
data.sharedFileSystem)
614 std::ostringstream cmd;
616 cmd <<
"cpack -D CPACK_INSTALL_PREFIX=. -G TGZ --config $TestArea/CPackConfig.cmake";
619 if (gSystem->Exec (cmd.str().c_str()) != 0){
623 std::ostringstream mv_command;
624 mv_command <<
"mv WorkDir_" << std::getenv(
"WorkDir_VERSION") <<
"_" << std::getenv(
"WorkDir_PLATFORM") <<
".tar.gz " << tarballName;
625 if (gSystem->Exec (mv_command.str().c_str()) != 0){
626 RCU_THROW_MSG ((
"failed to execute: " + mv_command.str()).c_str());
636 makeScript (Detail::ManagerData&
data,
637 std::size_t njobs)
const
641 std::string name =
data.batchName;
642 bool multiFile = (name.find (
"{JOBID}") != std::string::npos);
643 for (std::size_t
index = 0, end = multiFile ? njobs : 1;
index != end; ++
index)
645 std::ostringstream
str;
650 std::ofstream
file (fileName.c_str());
651 file <<
"#!/bin/bash\n";
652 file <<
"echo starting batch job initialization\n";
654 file <<
"echo batch job user initialization finished\n";
655 if (multiFile)
file <<
"EL_JOBID=" <<
index <<
"\n\n";
656 else file <<
data.batchJobId <<
"\n";
658 file <<
"function abortJob {\n";
659 file <<
" echo \"abort EL_JOBID=${EL_JOBID}\"\n";
660 file <<
" touch \"" <<
data.batchWriteLocation <<
"/status/fail-$EL_JOBID\"\n";
661 file <<
" touch \"" <<
data.batchWriteLocation <<
"/status/done-$EL_JOBID\"\n";
666 file <<
"EL_JOBSEG=`grep \"^$EL_JOBID \" \"" <<
data.batchSubmitLocation <<
"/segments\" | awk ' { print $2 }'`\n";
667 file <<
"test \"$EL_JOBSEG\" != \"\" || abortJob\n";
668 file <<
"hostname\n";
671 file << shellInit <<
"\n";
673 if(!
data.sharedFileSystem)
675 file <<
"mkdir \"fetch\" || abortJob\n";
676 file <<
"mkdir \"status\" || abortJob\n";
680 if(
data.sharedFileSystem)
682 file <<
"test \"$TMPDIR\" == \"\" && TMPDIR=/tmp\n";
683 file <<
"RUNDIR=${TMPDIR}/EventLoop-Worker-$EL_JOBSEG-`date +%s`-$$\n";
684 file <<
"mkdir \"$RUNDIR\" || abortJob\n";
685 file <<
"cd \"$RUNDIR\" || abortJob\n";
688 if (!
data.batchSkipReleaseSetup)
691 file <<
"eventloop_batch_worker $EL_JOBID '" <<
data.batchSubmitLocation <<
"/config.root' || abortJob\n";
693 file <<
"test -f \"" <<
data.batchWriteLocation <<
"/status/completed-$EL_JOBID\" || "
694 <<
"touch \"" <<
data.batchWriteLocation <<
"/status/fail-$EL_JOBID\"\n";
695 file <<
"touch \"" <<
data.batchWriteLocation <<
"/status/done-$EL_JOBID\"\n";
696 if(
data.sharedFileSystem)
file <<
"cd .. && rm -rf \"$RUNDIR\"\n";
700 std::ostringstream cmd;
701 cmd <<
"chmod +x " << fileName;
702 if (gSystem->Exec (cmd.str().c_str()) != 0)
711 mergeHists (Detail::ManagerData&
data)
713 using namespace msgEventLoop;
720 std::unique_ptr<SH::DiskOutput> origHistOutputMemory;
722 for (
auto iter =
data.job->outputBegin(), end =
data.job->outputEnd();
723 iter != end; ++ iter)
726 origHistOutput = iter->output();
728 if (origHistOutput ==
nullptr)
730 origHistOutputMemory = std::make_unique<SH::DiskOutputLocal>
731 (
data.submitDir +
"/fetch/hist-");
732 origHistOutput = origHistOutputMemory.get();
741 for (std::size_t sample = 0, end =
data.batchJob->samples.size();
742 sample != end; ++ sample)
744 const BatchSample& mysample (
data.batchJob->samples[sample]);
746 std::ostringstream output;
747 output <<
data.submitDir <<
"/hist-" <<
data.batchJob->samples[sample].name <<
".root";
748 if (gSystem->AccessPathName (output.str().c_str()) != 0)
752 bool complete =
true;
753 std::vector<std::string> input;
754 for (std::size_t segment = mysample.begin_segments,
755 end = mysample.end_segments; segment != end; ++ segment)
757 const BatchSegment& mysegment =
data.batchJob->segments[segment];
759 const std::string hist_file = origHistOutput->
targetURL
760 (mysegment.sampleName, mysegment.segmentName,
".root");
762 ANA_MSG_VERBOSE (
"merge segment " << segment <<
" completed=" << (
data.batchJobSuccess.find(segment)!=
data.batchJobSuccess.end()) <<
" fail=" << (
data.batchJobFailure.find(segment)!=
data.batchJobFailure.end()) <<
" unknown=" << (
data.batchJobUnknown.find(segment)!=
data.batchJobUnknown.end()));
764 input.push_back (hist_file);
766 if (
data.batchJobFailure.find(segment)!=
data.batchJobFailure.end())
768 std::ostringstream message;
769 message <<
"subjob " << segment <<
"/" << mysegment.fullName
773 else if (
data.batchJobSuccess.find(segment)==
data.batchJobSuccess.end())
774 complete =
false,
result =
false;
782 end =
data.batchJob->job.outputEnd(); out != end; ++ out)
785 output <<
data.submitDir <<
"/data-" << out->label();
787 if(gSystem->AccessPathName(output.str().c_str()))
788 gSystem->mkdir(output.str().c_str(),
true);
791 output <<
"/" <<
data.batchJob->samples[sample].name <<
".root";
793 std::vector<std::string> input;
794 for (std::size_t segment = mysample.begin_segments,
795 end = mysample.end_segments; segment != end; ++ segment)
797 const BatchSegment& mysegment =
data.batchJob->segments[segment];
799 const std::string infile =
800 data.submitDir +
"/fetch/data-" + out->label() +
"/" + mysegment.fullName +
".root";
802 input.push_back (infile);