ATLAS Offline Software
Loading...
Searching...
No Matches
BatchDriver.cxx File Reference
#include <EventLoop/BatchDriver.h>
#include <EventLoop/BatchJob.h>
#include <EventLoop/BatchSample.h>
#include <EventLoop/BatchSegment.h>
#include <EventLoop/ManagerData.h>
#include <EventLoop/ManagerStep.h>
#include <EventLoop/MessageCheck.h>
#include <EventLoop/OutputStream.h>
#include <RootCoreUtils/Assert.h>
#include <RootCoreUtils/ShellExec.h>
#include <RootCoreUtils/StringUtil.h>
#include <RootCoreUtils/ThrowMsg.h>
#include <RootCoreUtils/hadd.h>
#include <SampleHandler/DiskOutputLocal.h>
#include <SampleHandler/MetaFields.h>
#include <SampleHandler/MetaVector.h>
#include <SampleHandler/Sample.h>
#include <TFile.h>
#include <TSystem.h>
#include <cmath>
#include <cstdlib>
#include <fstream>
#include <memory>
#include <sstream>

Go to the source code of this file.

Functions

 ClassImp (EL::BatchDriver) namespace EL

Function Documentation

◆ ClassImp()

ClassImp ( EL::BatchDriver )
Author
Nils Krumnack

effects: fill the job object from the original job information except for the actual samples submitted guarantee: basic failures: out of memory II

effects: fill the given sample object with the information from the SampleHandler object, except for the segments that get filled separately guarantee: basic failures: out of memory II

effects: add the given sample with the given segments to the job guarantee: basic failures: out of memory II failures: segment misconfiguration requires: !segments.empty()

effects: generate the splits for the sample by file guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()

effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() requires: numJobs == rint (numJobs) postcondition: !segments.empty()

effects: generate the splits for the sample guarantee: basic failures: out of memory II failures: i/o errors requires: !sample.files.empty() postcondition: !segments.empty()

effects: fill the job object from the original job information including the actual samples submitted guarantee: basic failures: out of memory II

Definition at line 42 of file BatchDriver.cxx.

45{
46 namespace
47 {
52 void fillJob (BatchJob& myjob, const Job& job,
53 const std::string& submitDir)
54 {
55 myjob.job = job;
56 myjob.location = submitDir;
57 }
58
59
60
66 void fillSample (BatchSample& mysample,
67 const SH::Sample& sample, const SH::MetaObject& meta)
68 {
69 mysample.name = sample.name();
70 mysample.meta = *sample.meta();
71 mysample.meta.fetchDefaults (meta);
72
73 mysample.files = sample.makeFileList ();
74 }
75
76
77
84 void addSample (BatchJob& job, BatchSample sample,
85 const std::vector<BatchSegment>& segments)
86 {
87 RCU_REQUIRE (!segments.empty());
88
89 sample.begin_segments = job.segments.size();
90
91 RCU_ASSERT (segments[0].begin_file == 0);
92 RCU_ASSERT (segments[0].begin_event == 0);
93 for (std::size_t iter = 0, end = segments.size(); iter != end; ++ iter)
94 {
95 BatchSegment segment = segments[iter];
96
97 segment.sampleName = sample.name;
98 {
99 std::ostringstream myname;
100 myname << sample.name << "-" << iter;
101 segment.fullName = myname.str();
102 }
103 {
104 std::ostringstream myname;
105 myname << iter;
106 segment.segmentName = myname.str();
107 }
108
109 segment.sample = job.samples.size();
110 segment.job_id = job.segments.size();
111 if (iter+1 < end)
112 {
113 segment.end_file = segments[iter+1].begin_file;
114 segment.end_event = segments[iter+1].begin_event;
115 } else
116 {
117 segment.end_file = sample.files.size();
118 segment.end_event = 0;
119 }
120 RCU_ASSERT (segment.begin_file < segment.end_file || (segment.begin_file == segment.end_file && segment.begin_event <= segment.end_event));
121 job.segments.push_back (segment);
122 }
123
124 sample.end_segments = job.segments.size();
125 job.samples.push_back (sample);
126 }
127
128
129
130
131
139 void splitSampleByFile (const BatchSample& sample,
140 std::vector<BatchSegment>& segments,
141 double numJobs)
142 {
143 // RCU_REQUIRE (!sample.files.empty());
144 RCU_REQUIRE (numJobs == rint (numJobs));
145 RCU_REQUIRE (numJobs <= sample.files.size());
146
147 for (std::size_t index = 0, end = numJobs;
148 index < end; ++ index)
149 {
150 BatchSegment segment;
151 segment.begin_file = rint (index * (sample.files.size() / numJobs));
152 segments.push_back (segment);
153 }
154 }
155
156
157
158
159
167 void splitSampleByEvent (const BatchSample& sample,
168 std::vector<BatchSegment>& segments,
169 const std::vector<Long64_t>& eventsFile,
170 double numJobs)
171 {
172 RCU_REQUIRE (!sample.files.empty());
173 RCU_REQUIRE (sample.files.size() == eventsFile.size());
174 RCU_REQUIRE (numJobs == rint (numJobs));
175
176 Long64_t eventsSum = 0;
177 for (std::vector<Long64_t>::const_iterator nevents = eventsFile.begin(),
178 end = eventsFile.end(); nevents != end; ++ nevents)
179 {
180 RCU_ASSERT_SOFT (*nevents >= 0);
181 eventsSum += *nevents;
182 }
183 if (numJobs > eventsSum)
184 numJobs = eventsSum;
185 Long64_t eventsMax
186 = sample.meta.castDouble (Job::optEventsPerWorker);
187 if (eventsMax > 0)
188 numJobs = ceil (double (eventsSum) / eventsMax);
189 eventsMax = Long64_t (ceil (double (eventsSum) / numJobs));
190
191 BatchSegment segment;
192 while (std::size_t (segment.begin_file) < eventsFile.size())
193 {
194 while (segment.begin_event < eventsFile[segment.begin_file])
195 {
196 segments.push_back (segment);
197 segment.begin_event += eventsMax;
198 }
199 segment.begin_event -= eventsFile[segment.begin_file];
200 ++ segment.begin_file;
201 }
202 RCU_ASSERT (segments.size() == numJobs);
203 }
204
205
206
207
208
215 void splitSample (const BatchSample& sample,
216 std::vector<BatchSegment>& segments)
217 {
218 // RCU_REQUIRE (!sample.files.empty());
219
220 // rationale: I'm now calculating the number of jobs here, so
221 // that independent of whether we divide on a per-file or
222 // per-event basis we can get the same number of jobs. also,
223 // it will balance out things slightly if we only have a few
224 // files and multiple files per job.
225 const double filesPerWorker
226 = sample.meta.castDouble (Job::optFilesPerWorker, 1);
227 if (filesPerWorker < 1)
228 {
229 std::ostringstream msg;
230 msg << "invalid number of files per worker: " << filesPerWorker;
231 RCU_THROW_MSG (msg.str());
232 }
233 double numJobs = ceil (sample.files.size() / filesPerWorker);
234
235 const TObject *meta
237 if (meta)
238 {
239 const SH::MetaVector<Long64_t> *const meta_nentries
240 = dynamic_cast<const SH::MetaVector<Long64_t> *>(meta);
241 RCU_ASSERT_SOFT (meta == meta_nentries);
242 RCU_ASSERT_SOFT (meta_nentries->value.size() == sample.files.size());
243 splitSampleByEvent (sample, segments, meta_nentries->value, numJobs);
244 } else
245 {
246 splitSampleByFile (sample, segments, numJobs);
247 }
248
249
250 if (segments.empty())
251 {
252 // rationale: this isn't really the proper thing to do. if a
253 // sample is empty I should just run the job locally.
254 BatchSegment empty;
255 segments.push_back (empty);
256 }
257
258 RCU_PROVIDE (!segments.empty());
259 }
260
261
266 void fillFullJob (BatchJob& myjob, const Job& job,
267 const std::string& location,
268 const SH::MetaObject& meta)
269 {
270 fillJob (myjob, job, location);
271 *myjob.job.options() = meta;
272
273 for (std::size_t sampleIndex = 0, end = job.sampleHandler().size();
274 sampleIndex != end; ++ sampleIndex)
275 {
276 BatchSample mysample;
277 fillSample (mysample, *job.sampleHandler()[sampleIndex], meta);
278
279 std::vector<BatchSegment> subsegments;
280 splitSample (mysample, subsegments);
281 myjob.njobs_old.push_back (subsegments.size());
282
283 addSample (myjob, mysample, subsegments);
284 }
285 }
286 }
287
288
289
290 void BatchDriver ::
291 testInvariant () const
292 {
293 RCU_INVARIANT (this != 0);
294 }
295
296
297
298 BatchDriver ::
299 BatchDriver ()
300 {
301 RCU_NEW_INVARIANT (this);
302 }
303
304
305
306 ::StatusCode BatchDriver ::
307 doManagerStep (Detail::ManagerData& data) const
308 {
309 using namespace msgEventLoop;
310 ANA_CHECK (Driver::doManagerStep (data));
311 switch (data.step)
312 {
313 case Detail::ManagerStep::fillOptions:
314 {
315 data.sharedFileSystem
316 = data.options.castBool (Job::optBatchSharedFileSystem, true);
317 }
318 break;
319
320 case Detail::ManagerStep::createSubmitDir:
321 {
322 if (data.sharedFileSystem) // Shared file-system, write to output
323 data.batchWriteLocation = data.submitDir;
324 else
325 data.batchWriteLocation = ".";
326
327 if (data.sharedFileSystem) // Shared file-system, use local path
328 data.batchSubmitLocation = data.submitDir+"/submit";
329 else
330 data.batchSubmitLocation = ".";
331 }
332 break;
333
334 case Detail::ManagerStep::updateOutputLocation:
335 {
336 const std::string writeLocation=data.batchWriteLocation;
337 for (Job::outputMIter out = data.job->outputBegin(),
338 end = data.job->outputEnd(); out != end; ++ out)
339 {
340 if (out->output() == nullptr)
341 {
342 out->output (new SH::DiskOutputLocal
343 (writeLocation + "/fetch/data-" + out->label() + "/"));
344 }
345 }
346 }
347 break;
348
349 case Detail::ManagerStep::batchCreateDirectories:
350 {
351 ANA_MSG_DEBUG ("submitting batch job in location " << data.submitDir);
352 const std::string submitDir = data.submitDir + "/submit";
353 if (gSystem->MakeDirectory (submitDir.c_str()) != 0)
354 {
355 ANA_MSG_ERROR ("failed to create directory " + submitDir);
356 return ::StatusCode::FAILURE;
357 }
358 const std::string runDir = data.submitDir + "/run";
359 if (gSystem->MakeDirectory (runDir.c_str()) != 0)
360 {
361 ANA_MSG_ERROR ("failed to create directory " + runDir);
362 return ::StatusCode::FAILURE;
363 }
364 const std::string fetchDir = data.submitDir + "/fetch";
365 if (gSystem->MakeDirectory (fetchDir.c_str()) != 0)
366 {
367 ANA_MSG_ERROR ("failed to create directory " + fetchDir);
368 return ::StatusCode::FAILURE;
369 }
370 const std::string statusDir = data.submitDir + "/status";
371 if (gSystem->MakeDirectory (statusDir.c_str()) != 0)
372 {
373 ANA_MSG_ERROR ("failed to create directory " + statusDir);
374 return ::StatusCode::FAILURE;
375 }
376 }
377 break;
378
379 case Detail::ManagerStep::batchCreateJob:
380 {
381 data.batchJob = std::make_unique<BatchJob> ();
382 fillFullJob (*data.batchJob, *data.job, data.submitDir, data.options);
383 data.batchJob->location=data.batchWriteLocation;
384 {
385 std::string path = data.submitDir + "/submit/config.root";
386 std::unique_ptr<TFile> file (TFile::Open (path.c_str(), "RECREATE"));
387 data.batchJob->Write ("job");
388 }
389 {
390 std::ofstream file ((data.submitDir + "/submit/segments").c_str());
391 for (std::size_t iter = 0, end = data.batchJob->segments.size();
392 iter != end; ++ iter)
393 {
394 file << iter << " " << data.batchJob->segments[iter].fullName << "\n";
395 }
396 }
397 }
398 break;
399
400 case Detail::ManagerStep::batchScriptVar:
401 {
402 data.batchName = "run";
403 data.batchInit = "";
404 data.batchJobId = "EL_JOBID=$1\n";
405 }
406 break;
407
408 case Detail::ManagerStep::batchMakeScript:
409 {
410 makeScript (data, data.batchJob->segments.size());
411 }
412 break;
413
414 case Detail::ManagerStep::batchMakeIndices:
415 {
416 for (std::size_t index = 0; index != data.batchJob->segments.size(); ++ index)
417 data.batchJobIndices.push_back (index);
418 }
419 break;
420
421 case Detail::ManagerStep::readConfigResubmit:
422 case Detail::ManagerStep::readConfigRetrieve:
423 {
424 ANA_MSG_INFO ("retrieving batch job in location " << data.submitDir);
425
426 std::unique_ptr<TFile> file
427 {TFile::Open ((data.submitDir + "/submit/config.root").c_str(), "READ")};
428 if (file == nullptr || file->IsZombie())
429 {
430 ANA_MSG_ERROR ("failed to read config.root");
431 return ::StatusCode::FAILURE;
432 }
433 data.batchJob.reset (dynamic_cast<BatchJob*>(file->Get ("job")));
434 if (data.batchJob == nullptr)
435 {
436 ANA_MSG_ERROR ("failed to get job object from config.root");
437 return ::StatusCode::FAILURE;
438 }
439 data.job = &data.batchJob->job;
440 }
441 break;
442
443 case Detail::ManagerStep::batchJobStatusResubmit:
444 case Detail::ManagerStep::batchJobStatusRetrieve:
445 {
446 for (std::size_t job = 0; job != data.batchJob->segments.size(); ++ job)
447 {
448 std::ostringstream completedFile;
449 completedFile << data.submitDir << "/status/completed-" << job;
450 const bool hasCompleted =
451 (gSystem->AccessPathName (completedFile.str().c_str()) == 0);
452
453 std::ostringstream failFile;
454 failFile << data.submitDir << "/status/fail-" << job;
455 const bool hasFail =
456 (gSystem->AccessPathName (failFile.str().c_str()) == 0);
457
458 if (hasCompleted)
459 {
460 if (hasFail)
461 {
462 ANA_MSG_ERROR ("sub-job " << job << " reported both success and failure");
463 return ::StatusCode::FAILURE;
464 } else
465 data.batchJobSuccess.insert (job);
466 } else
467 {
468 if (hasFail)
469 data.batchJobFailure.insert (job);
470 else
471 data.batchJobUnknown.insert (job);
472 }
473 }
474 ANA_MSG_INFO ("current job status: " << data.batchJobSuccess.size() << " success, " << data.batchJobFailure.size() << " failure, " << data.batchJobUnknown.size() << " running/unknown");
475 }
476 break;
477
478 case Detail::ManagerStep::batchPreResubmit:
479 {
480 bool all_missing = false;
481 if (data.resubmitOption == "ALL_MISSING")
482 {
483 all_missing = true;
484 } else if (!data.resubmitOption.empty())
485 {
486 ANA_MSG_ERROR ("unknown resubmit option " + data.resubmitOption);
487 return ::StatusCode::FAILURE;
488 }
489
490 for (std::size_t segment = 0; segment != data.batchJob->segments.size(); ++ segment)
491 {
492 if (all_missing)
493 {
494 if (data.batchJobSuccess.find (segment) == data.batchJobSuccess.end())
495 data.batchJobIndices.push_back (segment);
496 } else
497 {
498 if (data.batchJobFailure.find (segment) != data.batchJobFailure.end())
499 data.batchJobIndices.push_back (segment);
500 }
501 }
502
503 if (data.batchJobIndices.empty())
504 {
505 ANA_MSG_INFO ("found no jobs to resubmit");
506 data.nextStep = Detail::ManagerStep::final;
507 return ::StatusCode::SUCCESS;
508 }
509
510 for (std::size_t segment : data.batchJobIndices)
511 {
512 std::ostringstream command;
513 command << "rm -rf";
514 command << " " << data.submitDir << "/status/completed-" << segment;
515 command << " " << data.submitDir << "/status/fail-" << segment;
516 command << " " << data.submitDir << "/status/done-" << segment;
518 }
519 data.options = *data.batchJob->job.options();
520 }
521 break;
522
523 case Detail::ManagerStep::doRetrieve:
524 {
525 bool merged = mergeHists (data);
526 if (merged)
527 {
528 diskOutputSave (data);
529 }
530 data.retrieved = true;
531 data.completed = merged;
532 }
533 break;
534
535 default:
536 (void) true; // safe to do nothing
537 }
538 return ::StatusCode::SUCCESS;
539 }
540
541
542
543 std::string BatchDriver ::
544 defaultReleaseSetup (const Detail::ManagerData& data) const
545 {
546 RCU_READ_INVARIANT (this);
547
548 // name of tarball being made (this needs to match CondorDriver.cxx)
549 const std::string tarballName("AnalysisPackage.tar.gz");
550
551 std::ostringstream file;
552
553 // <path of build dir>/x86_64-slc6-gcc62-opt (comes from CMake, we need this)
554 const char *WORKDIR_DIR = getenv ("WorkDir_DIR");
555 // As a backup, keep the CMAKE_PREFIX_PATH
556 std::string CMAKE_DIR_str ( getenv ("CMAKE_PREFIX_PATH") );
557 if (WORKDIR_DIR == nullptr){
558 msgEventLoop::ANA_MSG_INFO ("Could not find environment variable $WorkDir_DIR");
559 // Instead, build from the first path in CMAKE_PREFIX_PATH
560 if (CMAKE_DIR_str.find(":") != std::string::npos){
561 // Erase everything from the colon onwards
562 CMAKE_DIR_str.erase( CMAKE_DIR_str.find(":") , std::string::npos );
563 }
564 // Provide the remainder of the string to the workdir
565 WORKDIR_DIR = CMAKE_DIR_str.data();
566 }
567
568 if(!data.sharedFileSystem)
569 {
570 file << "mkdir -p build && tar -C build/ -xf " << tarballName << " || abortJob\n";
571 file << "\n";
572 }
573
574
575 file << "\n";
576 // /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/AtlasSetup/.config/.asetup.site
577 if(getenv("AtlasSetupSite")) file << "export AtlasSetupSite=" << getenv("AtlasSetupSite") << "\n";
578 // /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/AtlasSetup/V00-07-75/AtlasSetup
579 if(getenv("AtlasSetup")) file << "export AtlasSetup=" << getenv("AtlasSetup") << "\n";
580 // for now, needed because of errors like:
581 // /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/swConfig/asetup/asetupEpilog.sh: line 38:
582 // /swConfig/python/pythonFix-Linux.sh: No such file or directory
583 file << "export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase\n";
584 file << "source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh --quiet\n";
585
586 // default setup command
587 std::ostringstream defaultSetupCommand;
588 {
589 // AnalysisBase
590 if(getenv("AtlasProject")) defaultSetupCommand << "export AtlasProject=" << getenv("AtlasProject") << "\n";
591 // 21.2.3
592 if(getenv("AtlasVersion")) defaultSetupCommand << "export AtlasVersion=" << getenv("AtlasVersion") << "\n";
593 // 2017-08-16T2249 (only set if using a nightly release)
594 if(getenv("AtlasBuildStamp")) defaultSetupCommand << "export AtlasBuildStamp=" << getenv("AtlasBuildStamp") << "\n";
595 // 21.2
596 if(getenv("AtlasBuildBranch")) defaultSetupCommand << "export AtlasBuildBranch=" << getenv("AtlasBuildBranch") << "\n";
597 // stable vs nightly
598 if(getenv("AtlasReleaseType")) defaultSetupCommand << "export AtlasReleaseType=" << getenv("AtlasReleaseType") << "\n";
599 defaultSetupCommand << "if [ \"${AtlasReleaseType}\" == \"stable\" ]; then\n";
600 defaultSetupCommand << " source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasVersion} || abortJob\n";
601 defaultSetupCommand << "else\n";
602 defaultSetupCommand << " source ${AtlasSetup}/scripts/asetup.sh ${AtlasProject},${AtlasBuildBranch},${AtlasBuildStamp} || abortJob\n";
603 defaultSetupCommand << "fi\n";
604 defaultSetupCommand << "echo \"Using default setup command\"";
605 }
606
607 file << options()->castString(Job::optBatchSetupCommand, defaultSetupCommand.str()) << " || abortJob\n";
608 if(data.sharedFileSystem) file << "source " << WORKDIR_DIR << "/setup.sh || abortJob\n";
609 else file << "source build/setup.sh || abortJob\n";
610 file << "\n";
611
612 if(!data.sharedFileSystem)
613 {
614 std::ostringstream cmd;
615 //cmd << "tar --dereference -C " << WORKDIR_DIR << " -czf " << tarballName << " .";
616 cmd << "cpack -D CPACK_INSTALL_PREFIX=. -G TGZ --config $TestArea/CPackConfig.cmake";
617
618 // suppress the output from the command
619 if (gSystem->Exec (cmd.str().c_str()) != 0){
620 RCU_THROW_MSG (("failed to execute: " + cmd.str()).c_str());
621 }
622
623 std::ostringstream mv_command;
624 mv_command << "mv WorkDir_" << std::getenv("WorkDir_VERSION") << "_" << std::getenv("WorkDir_PLATFORM") << ".tar.gz " << tarballName;
625 if (gSystem->Exec (mv_command.str().c_str()) != 0){
626 RCU_THROW_MSG (("failed to execute: " + mv_command.str()).c_str());
627 }
628 }
629
630 return file.str();
631 }
632
633
634
635 void BatchDriver ::
636 makeScript (Detail::ManagerData& data,
637 std::size_t njobs) const
638 {
639 RCU_READ_INVARIANT (this);
640
641 std::string name = data.batchName;
642 bool multiFile = (name.find ("{JOBID}") != std::string::npos);
643 for (std::size_t index = 0, end = multiFile ? njobs : 1; index != end; ++ index)
644 {
645 std::ostringstream str;
646 str << index;
647 const std::string fileName = data.submitDir + "/submit/" + RCU::substitute (name, "{JOBID}", str.str());
648
649 {
650 std::ofstream file (fileName.c_str());
651 file << "#!/bin/bash\n";
652 file << "echo starting batch job initialization\n";
653 file << RCU::substitute (data.batchInit, "{JOBID}", str.str()) << "\n";
654 file << "echo batch job user initialization finished\n";
655 if (multiFile) file << "EL_JOBID=" << index << "\n\n";
656 else file << data.batchJobId << "\n";
657
658 file << "function abortJob {\n";
659 file << " echo \"abort EL_JOBID=${EL_JOBID}\"\n";
660 file << " touch \"" << data.batchWriteLocation << "/status/fail-$EL_JOBID\"\n";
661 file << " touch \"" << data.batchWriteLocation << "/status/done-$EL_JOBID\"\n";
662 file << " exit 1\n";
663 file << "}\n\n";
664
665
666 file << "EL_JOBSEG=`grep \"^$EL_JOBID \" \"" << data.batchSubmitLocation << "/segments\" | awk ' { print $2 }'`\n";
667 file << "test \"$EL_JOBSEG\" != \"\" || abortJob\n";
668 file << "hostname\n";
669 file << "pwd\n";
670 file << "whoami\n";
671 file << shellInit << "\n";
672
673 if(!data.sharedFileSystem)
674 { // Create output transfer directories
675 file << "mkdir \"fetch\" || abortJob\n";
676 file << "mkdir \"status\" || abortJob\n";
677 file << "\n";
678 }
679
680 if(data.sharedFileSystem)
681 {
682 file << "test \"$TMPDIR\" == \"\" && TMPDIR=/tmp\n";
683 file << "RUNDIR=${TMPDIR}/EventLoop-Worker-$EL_JOBSEG-`date +%s`-$$\n";
684 file << "mkdir \"$RUNDIR\" || abortJob\n";
685 file << "cd \"$RUNDIR\" || abortJob\n";
686 }
687
688 if (!data.batchSkipReleaseSetup)
689 file << defaultReleaseSetup (data);
690
691 file << "eventloop_batch_worker $EL_JOBID '" << data.batchSubmitLocation << "/config.root' || abortJob\n";
692
693 file << "test -f \"" << data.batchWriteLocation << "/status/completed-$EL_JOBID\" || "
694 << "touch \"" << data.batchWriteLocation << "/status/fail-$EL_JOBID\"\n";
695 file << "touch \"" << data.batchWriteLocation << "/status/done-$EL_JOBID\"\n";
696 if(data.sharedFileSystem) file << "cd .. && rm -rf \"$RUNDIR\"\n";
697 }
698
699 {
700 std::ostringstream cmd;
701 cmd << "chmod +x " << fileName;
702 if (gSystem->Exec (cmd.str().c_str()) != 0)
703 RCU_THROW_MSG (("failed to execute: " + cmd.str()).c_str());
704 }
705 }
706 }
707
708
709
710 bool BatchDriver ::
711 mergeHists (Detail::ManagerData& data)
712 {
713 using namespace msgEventLoop;
714
715 // This picks up the DiskOutput object used to write out our
716 // histograms, we will then use that to locate the output files.
717 // this is not really the best way of doing this, but there are
718 // bigger rewrites of this code excepted, so I don't want to spend
719 // a lot of time on this now (06 Feb 19).
720 std::unique_ptr<SH::DiskOutput> origHistOutputMemory;
721 const SH::DiskOutput *origHistOutput = nullptr;
722 for (auto iter = data.job->outputBegin(), end = data.job->outputEnd();
723 iter != end; ++ iter)
724 {
725 if (iter->label() == Job::histogramStreamName)
726 origHistOutput = iter->output();
727 }
728 if (origHistOutput == nullptr)
729 {
730 origHistOutputMemory = std::make_unique<SH::DiskOutputLocal>
731 (data.submitDir + "/fetch/hist-");
732 origHistOutput = origHistOutputMemory.get();
733 }
734 RCU_ASSERT (origHistOutput != nullptr);
735
736 bool result = true;
737
738 ANA_MSG_DEBUG ("merging histograms in location " << data.submitDir);
739
740 RCU_ASSERT (data.batchJob->njobs_old.size() == data.batchJob->samples.size());
741 for (std::size_t sample = 0, end = data.batchJob->samples.size();
742 sample != end; ++ sample)
743 {
744 const BatchSample& mysample (data.batchJob->samples[sample]);
745
746 std::ostringstream output;
747 output << data.submitDir << "/hist-" << data.batchJob->samples[sample].name << ".root";
748 if (gSystem->AccessPathName (output.str().c_str()) != 0)
749 {
750 ANA_MSG_VERBOSE ("merge files for sample " << data.batchJob->samples[sample].name);
751
752 bool complete = true;
753 std::vector<std::string> input;
754 for (std::size_t segment = mysample.begin_segments,
755 end = mysample.end_segments; segment != end; ++ segment)
756 {
757 const BatchSegment& mysegment = data.batchJob->segments[segment];
758
759 const std::string hist_file = origHistOutput->targetURL
760 (mysegment.sampleName, mysegment.segmentName, ".root");
761
762 ANA_MSG_VERBOSE ("merge segment " << segment << " completed=" << (data.batchJobSuccess.find(segment)!=data.batchJobSuccess.end()) << " fail=" << (data.batchJobFailure.find(segment)!=data.batchJobFailure.end()) << " unknown=" << (data.batchJobUnknown.find(segment)!=data.batchJobUnknown.end()));
763
764 input.push_back (hist_file);
765
766 if (data.batchJobFailure.find(segment)!=data.batchJobFailure.end())
767 {
768 std::ostringstream message;
769 message << "subjob " << segment << "/" << mysegment.fullName
770 << " failed";
771 RCU_THROW_MSG (message.str());
772 }
773 else if (data.batchJobSuccess.find(segment)==data.batchJobSuccess.end())
774 complete = false, result = false;
775 }
776 if (complete)
777 {
778 RCU::hadd (output.str(), input);
779
780 // Merge output data directories
781 for (Job::outputIter out = data.batchJob->job.outputBegin(),
782 end = data.batchJob->job.outputEnd(); out != end; ++ out)
783 {
784 output.str("");
785 output << data.submitDir << "/data-" << out->label();
786
787 if(gSystem->AccessPathName(output.str().c_str()))
788 gSystem->mkdir(output.str().c_str(),true);
789
790
791 output << "/" << data.batchJob->samples[sample].name << ".root";
792
793 std::vector<std::string> input;
794 for (std::size_t segment = mysample.begin_segments,
795 end = mysample.end_segments; segment != end; ++ segment)
796 {
797 const BatchSegment& mysegment = data.batchJob->segments[segment];
798
799 const std::string infile =
800 data.submitDir + "/fetch/data-" + out->label() + "/" + mysegment.fullName + ".root";
801
802 input.push_back (infile);
803 }
804
805 RCU::hadd(output.str(), input);
806 }
807 }
808 }
809 }
810 return result;
811 }
812}
#define RCU_INVARIANT(x)
Definition Assert.h:201
#define RCU_ASSERT(x)
Definition Assert.h:222
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233
#define RCU_ASSERT_SOFT(x)
Definition Assert.h:167
#define RCU_PROVIDE(x)
Definition Assert.h:215
#define RCU_REQUIRE(x)
Definition Assert.h:208
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
#define ANA_MSG_VERBOSE(xmsg)
Macro printing verbose messages.
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:58
static const Attributes_t empty
an implementation of DiskOutput for local disks
a class/interface representing an output location for files
Definition DiskOutput.h:46
std::string targetURL(const std::string &sampleName, const std::string &segmentName, const std::string &suffix) const
the final output location for the given segment
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:56
This class defines a templatized version of the meta-data in vector form.
Definition MetaVector.h:28
std::vector< T > value
the value contained
Definition MetaVector.h:65
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition Sample.h:54
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition ShellExec.cxx:29
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
Definition hadd.cxx:28
std::string substitute(const std::string &str, const std::string &pattern, const std::string &with)
effects: substitute all occurences of "pattern" with "with" in the string "str" returns: the substitu...
std::string getenv(const std::string &variableName)
get an environment variable
SampleHandler splitSample(Sample &sample, const Long64_t nevt)
effects: split the given sample into a set of samples, with each sample containing either exactly one...
path
python interpreter configuration --------------------------------------—
Definition athena.py:128
Definition index.py:1
output
Definition merge.py:16
-diff
str infile
Definition run.py:13
eventsFile
Events files.
static const std::string numEventsPerFile
the number of events in each file
Definition MetaFields.h:67
MsgStream & msg
Definition testRead.cxx:32
TFile * file