effects: fill the job object from the original job information except for the actual samples submitted guarantee: basic failures: out of memory II
effects: fill the given sample object with the information from the SampleHandler object, except for the segments that get filled separately guarantee: basic failures: out of memory II
effects: add the given sample with the given segments to the job guarantee: basic failures: out of memory II failures: segment misconfiguration requires: !segments.empty()
effects: generate the splits for the sample by file guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()
effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()
effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() postcondition: !segments.empty()
effects: fill the job object from the original job information including the actual samples submitted guarantee: basic failures: out of memory II
45{
46 namespace
47 {
52 void fillJob (BatchJob& myjob, const Job& job,
53 const std::string& submitDir)
54 {
57 }
58
59
60
66 void fillSample (BatchSample& mysample,
68 {
69 mysample.name =
sample.name();
70 mysample.meta = *
sample.meta();
71 mysample.meta.fetchDefaults (
meta);
72
73 mysample.files =
sample.makeFileList ();
74 }
75
76
77
84 void addSample (BatchJob& job, BatchSample sample,
85 const std::vector<BatchSegment>& segments)
86 {
88
89 sample.begin_segments =
job.segments.size();
90
93 for (std::size_t iter = 0, end = segments.size(); iter != end; ++ iter)
94 {
95 BatchSegment segment = segments[
iter];
96
97 segment.sampleName =
sample.name;
98 {
99 std::ostringstream myname;
101 segment.fullName = myname.str();
102 }
103 {
104 std::ostringstream myname;
106 segment.segmentName = myname.str();
107 }
108
109 segment.sample =
job.samples.size();
110 segment.job_id =
job.segments.size();
111 if (iter+1 < end)
112 {
113 segment.end_file = segments[
iter+1].begin_file;
114 segment.end_event = segments[
iter+1].begin_event;
115 } else
116 {
117 segment.end_file =
sample.files.size();
118 segment.end_event = 0;
119 }
120 RCU_ASSERT (segment.begin_file < segment.end_file || (segment.begin_file == segment.end_file && segment.begin_event <= segment.end_event));
121 job.segments.push_back (segment);
122 }
123
124 sample.end_segments =
job.segments.size();
125 job.samples.push_back (sample);
126 }
127
128
129
130
131
139 void splitSampleByFile (const BatchSample& sample,
140 std::vector<BatchSegment>& segments,
141 double numJobs)
142 {
143
146
147 for (std::size_t
index = 0, end = numJobs;
149 {
150 BatchSegment segment;
151 segment.begin_file = rint (
index * (
sample.files.size() / numJobs));
152 segments.push_back (segment);
153 }
154 }
155
156
157
158
159
167 void splitSampleByEvent (const BatchSample& sample,
168 std::vector<BatchSegment>& segments,
169 const std::vector<Long64_t>& eventsFile,
170 double numJobs)
171 {
175
176 Long64_t eventsSum = 0;
177 for (std::vector<Long64_t>::const_iterator nevents =
eventsFile.begin(),
178 end =
eventsFile.end(); nevents != end; ++ nevents)
179 {
182 }
183 if (numJobs > eventsSum)
184 numJobs = eventsSum;
185 Long64_t eventsMax
186 =
sample.meta.castDouble (Job::optEventsPerWorker);
187 if (eventsMax > 0)
188 numJobs = ceil (double (eventsSum) / eventsMax);
189 eventsMax = Long64_t (ceil (double (eventsSum) / numJobs));
190
191 BatchSegment segment;
192 while (std::size_t (segment.begin_file) <
eventsFile.size())
193 {
194 while (segment.begin_event < eventsFile[segment.begin_file])
195 {
196 segments.push_back (segment);
197 segment.begin_event += eventsMax;
198 }
199 segment.begin_event -=
eventsFile[segment.begin_file];
200 ++ segment.begin_file;
201 }
203 }
204
205
206
207
208
216 std::vector<BatchSegment>& segments)
217 {
218
219
220
221
222
223
224
225 const double filesPerWorker
226 =
sample.meta.castDouble (Job::optFilesPerWorker, 1);
227 if (filesPerWorker < 1)
228 {
229 std::ostringstream
msg;
230 msg <<
"invalid number of files per worker: " << filesPerWorker;
232 }
233 double numJobs = ceil (
sample.files.size() / filesPerWorker);
234
238 {
243 splitSampleByEvent (sample, segments, meta_nentries->
value, numJobs);
244 } else
245 {
246 splitSampleByFile (sample, segments, numJobs);
247 }
248
249
250 if (segments.empty())
251 {
252
253
255 segments.push_back (
empty);
256 }
257
259 }
260
261
266 void fillFullJob (BatchJob& myjob, const Job& job,
267 const std::string& location,
269 {
270 fillJob (myjob, job, location);
271 *myjob.job.options() =
meta;
272
273 for (std::size_t sampleIndex = 0, end =
job.sampleHandler().size();
274 sampleIndex != end; ++ sampleIndex)
275 {
276 BatchSample mysample;
277 fillSample (mysample, *
job.sampleHandler()[sampleIndex],
meta);
278
279 std::vector<BatchSegment> subsegments;
281 myjob.njobs_old.push_back (subsegments.size());
282
283 addSample (myjob, mysample, subsegments);
284 }
285 }
286 }
287
288
289
290 void BatchDriver ::
291 testInvariant () const
292 {
294 }
295
296
297
298 BatchDriver ::
299 BatchDriver ()
300 {
302 }
303
304
305
306 ::StatusCode BatchDriver ::
307 doManagerStep (Detail::ManagerData&
data)
const
308 {
309 using namespace msgEventLoop;
312 {
313 case Detail::ManagerStep::fillOptions:
314 {
315 data.sharedFileSystem
316 =
data.options.castBool (Job::optBatchSharedFileSystem,
true);
317 }
318 break;
319
320 case Detail::ManagerStep::createSubmitDir:
321 {
322 if (
data.sharedFileSystem)
323 data.batchWriteLocation =
data.submitDir;
324 else
325 data.batchWriteLocation =
".";
326
327 if (
data.sharedFileSystem)
328 data.batchSubmitLocation =
data.submitDir+
"/submit";
329 else
330 data.batchSubmitLocation =
".";
331 }
332 break;
333
334 case Detail::ManagerStep::updateOutputLocation:
335 {
336 const std::string writeLocation=
data.batchWriteLocation;
337 for (Job::outputMIter out =
data.job->outputBegin(),
338 end =
data.job->outputEnd(); out != end; ++ out)
339 {
340 if (
out->output() ==
nullptr)
341 {
343 (writeLocation +
"/fetch/data-" +
out->label() +
"/"));
344 }
345 }
346 }
347 break;
348
349 case Detail::ManagerStep::batchCreateDirectories:
350 {
353 if (gSystem->MakeDirectory (
submitDir.c_str()) != 0)
354 {
356 return ::StatusCode::FAILURE;
357 }
358 const std::string runDir =
data.submitDir +
"/run";
359 if (gSystem->MakeDirectory (runDir.c_str()) != 0)
360 {
362 return ::StatusCode::FAILURE;
363 }
364 const std::string fetchDir =
data.submitDir +
"/fetch";
365 if (gSystem->MakeDirectory (fetchDir.c_str()) != 0)
366 {
368 return ::StatusCode::FAILURE;
369 }
370 const std::string statusDir =
data.submitDir +
"/status";
371 if (gSystem->MakeDirectory (statusDir.c_str()) != 0)
372 {
374 return ::StatusCode::FAILURE;
375 }
376 }
377 break;
378
379 case Detail::ManagerStep::batchCreateJob:
380 {
381 data.batchJob = std::make_unique<BatchJob> ();
383 data.batchJob->location=
data.batchWriteLocation;
384 {
385 std::string
path =
data.submitDir +
"/submit/config.root";
386 std::unique_ptr<TFile>
file (TFile::Open (
path.c_str(),
"RECREATE"));
387 data.batchJob->Write (
"job");
388 }
389 {
390 std::ofstream
file ((
data.submitDir +
"/submit/segments").c_str());
391 for (std::size_t iter = 0, end =
data.batchJob->segments.size();
392 iter != end; ++ iter)
393 {
395 }
396 }
397 }
398 break;
399
400 case Detail::ManagerStep::batchScriptVar:
401 {
402 data.batchName =
"run";
404 data.batchJobId =
"EL_JOBID=$1\n";
405 }
406 break;
407
408 case Detail::ManagerStep::batchMakeScript:
409 {
410 makeScript (
data,
data.batchJob->segments.size());
411 }
412 break;
413
414 case Detail::ManagerStep::batchMakeIndices:
415 {
418 }
419 break;
420
421 case Detail::ManagerStep::readConfigResubmit:
422 case Detail::ManagerStep::readConfigRetrieve:
423 {
425
426 std::unique_ptr<TFile>
file
427 {TFile::Open ((
data.submitDir +
"/submit/config.root").c_str(),
"READ")};
428 if (
file ==
nullptr ||
file->IsZombie())
429 {
431 return ::StatusCode::FAILURE;
432 }
433 data.batchJob.reset (
dynamic_cast<BatchJob*
>(
file->Get (
"job")));
434 if (
data.batchJob ==
nullptr)
435 {
437 return ::StatusCode::FAILURE;
438 }
440 }
441 break;
442
443 case Detail::ManagerStep::batchJobStatusResubmit:
444 case Detail::ManagerStep::batchJobStatusRetrieve:
445 {
446 for (std::size_t job = 0;
job !=
data.batchJob->segments.size(); ++
job)
447 {
448 std::ostringstream completedFile;
449 completedFile <<
data.submitDir <<
"/status/completed-" <<
job;
450 const bool hasCompleted =
451 (gSystem->AccessPathName (completedFile.str().c_str()) == 0);
452
453 std::ostringstream failFile;
454 failFile <<
data.submitDir <<
"/status/fail-" <<
job;
455 const bool hasFail =
456 (gSystem->AccessPathName (failFile.str().c_str()) == 0);
457
458 if (hasCompleted)
459 {
460 if (hasFail)
461 {
462 ANA_MSG_ERROR (
"sub-job " << job <<
" reported both success and failure");
463 return ::StatusCode::FAILURE;
464 } else
465 data.batchJobSuccess.insert (job);
466 } else
467 {
468 if (hasFail)
469 data.batchJobFailure.insert (job);
470 else
471 data.batchJobUnknown.insert (job);
472 }
473 }
474 ANA_MSG_INFO (
"current job status: " <<
data.batchJobSuccess.size() <<
" success, " <<
data.batchJobFailure.size() <<
" failure, " <<
data.batchJobUnknown.size() <<
" running/unknown");
475 }
476 break;
477
478 case Detail::ManagerStep::batchPreResubmit:
479 {
480 bool all_missing = false;
481 if (
data.resubmitOption ==
"ALL_MISSING")
482 {
483 all_missing = true;
484 }
else if (!
data.resubmitOption.empty())
485 {
487 return ::StatusCode::FAILURE;
488 }
489
490 for (std::size_t segment = 0; segment !=
data.batchJob->segments.size(); ++ segment)
491 {
492 if (all_missing)
493 {
494 if (
data.batchJobSuccess.find (segment) ==
data.batchJobSuccess.end())
495 data.batchJobIndices.push_back (segment);
496 } else
497 {
498 if (
data.batchJobFailure.find (segment) !=
data.batchJobFailure.end())
499 data.batchJobIndices.push_back (segment);
500 }
501 }
502
503 if (
data.batchJobIndices.empty())
504 {
506 data.nextStep = Detail::ManagerStep::final;
507 return ::StatusCode::SUCCESS;
508 }
509
510 for (std::size_t segment :
data.batchJobIndices)
511 {
514 command <<
" " <<
data.submitDir <<
"/status/completed-" << segment;
515 command <<
" " <<
data.submitDir <<
"/status/fail-" << segment;
516 command <<
" " <<
data.submitDir <<
"/status/done-" << segment;
518 }
519 data.options = *
data.batchJob->job.options();
520 }
521 break;
522
523 case Detail::ManagerStep::doRetrieve:
524 {
525 bool merged = mergeHists (
data);
526 if (merged)
527 {
528 diskOutputSave (
data);
529 }
530 data.retrieved =
true;
531 data.completed = merged;
532 }
533 break;
534
535 default:
536 (void) true;
537 }
538 return ::StatusCode::SUCCESS;
539 }
540
541
542
543 std::string BatchDriver ::
544 defaultReleaseSetup (
const Detail::ManagerData&
data)
const
545 {
547
548
549 const std::string tarballName("AnalysisPackage.tar.gz");
550
551 std::ostringstream
file;
552
553
554 const char *WORKDIR_DIR =
getenv (
"WorkDir_DIR");
555
556 std::string CMAKE_DIR_str ( getenv ("CMAKE_PREFIX_PATH") );
557 if (WORKDIR_DIR == nullptr){
558 msgEventLoop::ANA_MSG_INFO ("Could not find environment variable $WorkDir_DIR");
559
560 if (CMAKE_DIR_str.find(":") != std::string::npos){
561
562 CMAKE_DIR_str.erase( CMAKE_DIR_str.find(":") , std::string::npos );
563 }
564
565 WORKDIR_DIR = CMAKE_DIR_str.data();
566 }
567
568 if(!
data.sharedFileSystem)
569 {
570 file <<
"mkdir -p build && tar -C build/ -xf " << tarballName <<
" || abortJob\n";
572 }
573
574
576
577 if(
getenv(
"AtlasSetupSite"))
file <<
"export AtlasSetupSite=" <<
getenv(
"AtlasSetupSite") <<
"\n";
578
579 if(
getenv(
"AtlasSetup"))
file <<
"export AtlasSetup=" <<
getenv(
"AtlasSetup") <<
"\n";
580
581
582
583 file <<
"export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase\n";
584 file <<
"source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh --quiet\n";
585
586
587 std::ostringstream defaultSetupCommand;
588 {
589
590 if(
getenv(
"AtlasProject")) defaultSetupCommand <<
"export AtlasProject=" <<
getenv(
"AtlasProject") <<
"\n";
591
592 if(
getenv(
"AtlasVersion")) defaultSetupCommand <<
"export AtlasVersion=" <<
getenv(
"AtlasVersion") <<
"\n";
593
594 if(
getenv(
"AtlasBuildStamp")) defaultSetupCommand <<
"export AtlasBuildStamp=" <<
getenv(
"AtlasBuildStamp") <<
"\n";
595
596 if(
getenv(
"AtlasBuildBranch")) defaultSetupCommand <<
"export AtlasBuildBranch=" <<
getenv(
"AtlasBuildBranch") <<
"\n";
597
598 if(
getenv(
"AtlasReleaseType")) defaultSetupCommand <<
"export AtlasReleaseType=" <<
getenv(
"AtlasReleaseType") <<
"\n";
599 defaultSetupCommand << "if [ \"${AtlasReleaseType}\" == \"stable\" ]; then\n";
600 defaultSetupCommand << " source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasVersion} || abortJob\n";
601 defaultSetupCommand << "else\n";
602 defaultSetupCommand << " source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasBuildBranch},${AtlasBuildStamp} || abortJob\n";
603 defaultSetupCommand << "fi\n";
604 defaultSetupCommand << "echo \"Using default setup command\"";
605 }
606
607 file <<
options()->castString(Job::optBatchSetupCommand, defaultSetupCommand.str()) <<
" || abortJob\n";
608 if(
data.sharedFileSystem)
file <<
"source " << WORKDIR_DIR <<
"/setup.sh || abortJob\n";
609 else file <<
"source build/setup.sh || abortJob\n";
611
612 if(!
data.sharedFileSystem)
613 {
614 std::ostringstream
cmd;
615
616 cmd <<
"cpack -D CPACK_INSTALL_PREFIX=. -G TGZ --config $TestArea/CPackConfig.cmake";
617
618
619 if (gSystem->Exec (
cmd.str().c_str()) != 0){
621 }
622
623 std::ostringstream mv_command;
624 mv_command << "mv WorkDir_" << std::getenv("WorkDir_VERSION") << "_" << std::getenv("WorkDir_PLATFORM") << ".tar.gz " << tarballName;
625 if (gSystem->Exec (mv_command.str().c_str()) != 0){
626 RCU_THROW_MSG ((
"failed to execute: " + mv_command.str()).c_str());
627 }
628 }
629
631 }
632
633
634
635 void BatchDriver ::
636 makeScript (Detail::ManagerData&
data,
637 std::size_t njobs) const
638 {
640
642 bool multiFile = (
name.find (
"{JOBID}") != std::string::npos);
644 {
645 std::ostringstream
str;
648
649 {
651 file <<
"#!/bin/bash\n";
652 file <<
"echo starting batch job initialization\n";
654 file <<
"echo batch job user initialization finished\n";
655 if (multiFile)
file <<
"EL_JOBID=" <<
index <<
"\n\n";
656 else file <<
data.batchJobId <<
"\n";
657
658 file <<
"function abortJob {\n";
659 file <<
" echo \"abort EL_JOBID=${EL_JOBID}\"\n";
660 file <<
" touch \"" <<
data.batchWriteLocation <<
"/status/fail-$EL_JOBID\"\n";
661 file <<
" touch \"" <<
data.batchWriteLocation <<
"/status/done-$EL_JOBID\"\n";
664
665
666 file <<
"EL_JOBSEG=`grep \"^$EL_JOBID \" \"" <<
data.batchSubmitLocation <<
"/segments\" | awk ' { print $2 }'`\n";
667 file <<
"test \"$EL_JOBSEG\" != \"\" || abortJob\n";
668 file <<
"hostname\n";
671 file << shellInit <<
"\n";
672
673 if(!
data.sharedFileSystem)
674 {
675 file <<
"mkdir \"fetch\" || abortJob\n";
676 file <<
"mkdir \"status\" || abortJob\n";
678 }
679
680 if(
data.sharedFileSystem)
681 {
682 file <<
"test \"$TMPDIR\" == \"\" && TMPDIR=/tmp\n";
683 file <<
"RUNDIR=${TMPDIR}/EventLoop-Worker-$EL_JOBSEG-`date +%s`-$$\n";
684 file <<
"mkdir \"$RUNDIR\" || abortJob\n";
685 file <<
"cd \"$RUNDIR\" || abortJob\n";
686 }
687
688 if (!
data.batchSkipReleaseSetup)
690
691 file <<
"eventloop_batch_worker $EL_JOBID '" <<
data.batchSubmitLocation <<
"/config.root' || abortJob\n";
692
693 file <<
"test -f \"" <<
data.batchWriteLocation <<
"/status/completed-$EL_JOBID\" || "
694 <<
"touch \"" <<
data.batchWriteLocation <<
"/status/fail-$EL_JOBID\"\n";
695 file <<
"touch \"" <<
data.batchWriteLocation <<
"/status/done-$EL_JOBID\"\n";
696 if(
data.sharedFileSystem)
file <<
"cd .. && rm -rf \"$RUNDIR\"\n";
697 }
698
699 {
700 std::ostringstream
cmd;
702 if (gSystem->Exec (
cmd.str().c_str()) != 0)
704 }
705 }
706 }
707
708
709
710 bool BatchDriver ::
711 mergeHists (Detail::ManagerData&
data)
712 {
713 using namespace msgEventLoop;
714
715
716
717
718
719
720 std::unique_ptr<SH::DiskOutput> origHistOutputMemory;
722 for (
auto iter =
data.job->outputBegin(), end =
data.job->outputEnd();
723 iter != end; ++ iter)
724 {
725 if (
iter->label() == Job::histogramStreamName)
726 origHistOutput =
iter->output();
727 }
728 if (origHistOutput == nullptr)
729 {
730 origHistOutputMemory = std::make_unique<SH::DiskOutputLocal>
731 (
data.submitDir +
"/fetch/hist-");
732 origHistOutput = origHistOutputMemory.get();
733 }
735
737
739
741 for (std::size_t sample = 0, end =
data.batchJob->samples.size();
742 sample != end; ++ sample)
743 {
744 const BatchSample& mysample (
data.batchJob->samples[sample]);
745
746 std::ostringstream
output;
748 if (gSystem->AccessPathName (
output.str().c_str()) != 0)
749 {
751
752 bool complete = true;
753 std::vector<std::string>
input;
754 for (std::size_t segment = mysample.begin_segments,
755 end = mysample.end_segments; segment != end; ++ segment)
756 {
757 const BatchSegment& mysegment =
data.batchJob->segments[segment];
758
760 (mysegment.sampleName, mysegment.segmentName, ".root");
761
762 ANA_MSG_VERBOSE (
"merge segment " << segment <<
" completed=" << (
data.batchJobSuccess.find(segment)!=
data.batchJobSuccess.end()) <<
" fail=" << (
data.batchJobFailure.find(segment)!=
data.batchJobFailure.end()) <<
" unknown=" << (
data.batchJobUnknown.find(segment)!=
data.batchJobUnknown.end()));
763
764 input.push_back (hist_file);
765
766 if (
data.batchJobFailure.find(segment)!=
data.batchJobFailure.end())
767 {
769 message <<
"subjob " << segment <<
"/" << mysegment.fullName
770 << " failed";
772 }
773 else if (
data.batchJobSuccess.find(segment)==
data.batchJobSuccess.end())
774 complete =
false,
result =
false;
775 }
776 if (complete)
777 {
779
780
781 for (Job::outputIter out =
data.batchJob->job.outputBegin(),
782 end =
data.batchJob->job.outputEnd(); out != end; ++ out)
783 {
786
787 if(gSystem->AccessPathName(
output.str().c_str()))
788 gSystem->mkdir(
output.str().c_str(),
true);
789
790
792
793 std::vector<std::string>
input;
794 for (std::size_t segment = mysample.begin_segments,
795 end = mysample.end_segments; segment != end; ++ segment)
796 {
797 const BatchSegment& mysegment =
data.batchJob->segments[segment];
798
799 const std::string
infile =
800 data.submitDir +
"/fetch/data-" +
out->label() +
"/" + mysegment.fullName +
".root";
801
802 input.push_back (infile);
803 }
804
806 }
807 }
808 }
809 }
811 }
812}
#define RCU_NEW_INVARIANT(x)
#define RCU_ASSERT_SOFT(x)
#define RCU_READ_INVARIANT(x)
char data[hepevt_bytes_allocation_ATLAS]
#define RCU_THROW_MSG(message)
static const Attributes_t empty
an implementation of DiskOutput for local disks
a class/interface representing an output location for files
std::string targetURL(const std::string &sampleName, const std::string &segmentName, const std::string &suffix) const
the final output location for the given segment
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
std::string substitute(const std::string &str, const std::string &pattern, const std::string &with)
effects: substitute all occurences of "pattern" with "with" in the string "str" returns: the substitu...
std::string getenv(const std::string &variableName)
get an environment variable
SampleHandler splitSample(Sample &sample, const Long64_t nevt)
effects: split the given sample into a set of samples, with each sample containing either exactly one...
path
python interpreter configuration --------------------------------------—