ATLAS Offline Software
Loading...
Searching...
No Matches
EL::PrunDriver Class Referencefinal

a Driver to submit jobs via prun More...

#include <PrunDriver.h>

Inheritance diagram for EL::PrunDriver:
Collaboration diagram for EL::PrunDriver:

Public Member Functions

 PrunDriver ()
void testInvariant () const
SH::MetaObjectoptions ()
 the list of options to jobs with this driver
const SH::MetaObjectoptions () const
std::string submit (const Job &job, const std::string &location) const
 submit the given job with the given output location and wait for it to finish
std::string submitOnly (const Job &job, const std::string &location) const
 submit the given job with the given output location and return immediately

Static Public Member Functions

static void status (const std::string &location)
static void setState (const std::string &location, const std::string &task, const std::string &state)
static void resubmit (const std::string &location, const std::string &option)
 resubmit all failed sub-jobs for the job in the given location
static bool retrieve (const std::string &location)
 retrieve all the output for the job in the given location
static bool wait (const std::string &location, unsigned time=60)
 retrieve all the output for the job in the given location and wait until it is finished completely.
static void updateLocation (const std::string &location)
 update the internal location of files, after moving the submission directory
static void mergedOutputSave (Detail::ManagerData &data)
 create and save a sample handler assuming we created all the merged files at the requested locations
static void diskOutputSave (Detail::ManagerData &data)
 make the output sample handler for the given job or stream from the information stored in the histogram files.

Protected Member Functions

virtual::StatusCode doManagerStep (Detail::ManagerData &data) const override

Static Protected Attributes

static bool abortRetrieve
 this flag is set to true when the wait() function is running and a SIGINT is caught, meaning that control should be returned to the user as soon as possible.

Private Member Functions

::StatusCode doRetrieve (Detail::ManagerData &data) const
 ClassDef (EL::PrunDriver, 1)

Private Attributes

SH::MetaObject m_options
 members directly corresponding to accessors

Detailed Description

a Driver to submit jobs via prun

Definition at line 23 of file PrunDriver.h.

Constructor & Destructor Documentation

◆ PrunDriver()

EL::PrunDriver::PrunDriver ( )

Definition at line 504 of file PrunDriver.cxx.

505{
506 RCU_NEW_INVARIANT(this);
507}
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233

Member Function Documentation

◆ ClassDef()

EL::PrunDriver::ClassDef ( EL::PrunDriver ,
1  )
private

◆ diskOutputSave()

void EL::Driver::diskOutputSave ( Detail::ManagerData & data)
staticinherited

make the output sample handler for the given job or stream from the information stored in the histogram files.

This is optional, but it is convenient for drivers that use (conventional) writers

Guarantee
basic
Failures
out of memory II
i/o errors

◆ doManagerStep()

StatusCode EL::PrunDriver::doManagerStep ( Detail::ManagerData & data) const
overrideprotected

Definition at line 509 of file PrunDriver.cxx.

511{
512 using namespace msgEventLoop;
514 switch (data.step)
515 {
517 {
518 const std::string jobELGDir = data.submitDir + "/elg";
519 const std::string runShFile = jobELGDir + "/runjob.sh";
520 //const std::string runShOrig = "$ROOTCOREBIN/data/EventLoopGrid/runjob.sh";
521 const std::string mergeShFile = jobELGDir + "/elg_merge";
522 //const std::string mergeShOrig =
523 // "$ROOTCOREBIN/user_scripts/EventLoopGrid/elg_merge";
524 const std::string runShOrig = PathResolverFindCalibFile("EventLoopGrid/runjob.sh");
525 const std::string mergeShOrig = PathResolverFindCalibFile("EventLoopGrid/elg_merge");
526
527 const std::string jobDefFile = jobELGDir + "/jobdef.root";
528 gSystem->Exec(Form("mkdir -p %s", jobELGDir.c_str()));
529 gSystem->Exec(Form("cp %s %s", runShOrig.c_str(), runShFile.c_str()));
530 gSystem->Exec(Form("chmod +x %s", runShFile.c_str()));
531 gSystem->Exec(Form("cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
532 gSystem->Exec(Form("chmod +x %s", mergeShFile.c_str()));
533
534 // create symbolic links for additionnal files/directories if any to ship to the grid
535 std::string listToShipToGrid = data.options.castString(EL::Job::optGridPrunShipAdditionalFilesOrDirs, "");
536 // parse the list of comma separated files and/or directories to ship to the grid
537 if (listToShipToGrid.size()){
539 "Creating symbolic links for additional files or directories to be sent to grid.\n"
540 "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
541 "Otherwise prun ignores those files."
542 );
543
544 std::vector<std::string> vect_filesOrDirToShip;
545 for (auto&& part : std::views::split(listToShipToGrid, ',')) vect_filesOrDirToShip.emplace_back(part.begin(), part.end());
546 // Create symbolic links of files or directories to the submission directory
547 for (const std::string & fileOrDirToShip: vect_filesOrDirToShip){
548 ANA_MSG_INFO (("Creating symbolic link for: " +fileOrDirToShip).c_str());
549 RCU::Shell::exec("ln -sf " + fileOrDirToShip + " " + jobELGDir);
550 }
551 ANA_MSG_INFO ("Finished creation of symbolic links");
552 }
553
554 const SH::SampleHandler& sh = data.job->sampleHandler();
555
556 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
557 SH::MetaObject& meta = *(*s)->meta();
558 meta.fetchDefaults(data.options);
559 meta.fetchDefaults(defaultOpts());
560 meta.setString("nc_outputs", outputFileNames(*data.job));
561 std::string outputSampleName = meta.castString("nc_outputSampleName");
562 if (outputSampleName.empty()) {
563 outputSampleName = "user.%nickname%.%in:name%";
564 }
565 meta.setString("nc_outDS", formatOutputName(meta, outputSampleName));
566 meta.setString("nc_inDS", meta.castString("nc_grid", (*s)->name()));
567 meta.setString("nc_writeInputToTxt", "IN:input.txt");
568 meta.setString("nc_match", meta.castString("nc_grid_filter"));
569 const std::string execstr = "runjob.sh " + (*s)->name();
570 meta.setString("nc_exec", execstr);
571 meta.setString("nc_framework", "EventLoopGrid");
572 }
573
574 saveJobDef(jobDefFile, *data.job, sh);
575
576 for (EL::Job::outputIter out = data.job->outputBegin();
577 out != data.job->outputEnd(); ++out) {
578 SH::SampleHandler shOut = outputSH(sh, out->label());
579 shOut.save(data.submitDir + "/output-" + out->label());
580 }
581 SH::SampleHandler shHist = outputSH(sh, "hist-output");
582 shHist.save(data.submitDir + "/output-hist");
583
584 TmpCd keepDir(jobELGDir);
585
586 processAllInState(sh, JobState::INIT, 0);
587
588 sh.save(data.submitDir + "/input");
589 data.submitted = true;
590 }
591 break;
592
594 {
596 }
597 break;
598
599 default:
600 (void) true; // safe to do nothing
601 }
602 return ::StatusCode::SUCCESS;
603}
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
std::string PathResolverFindCalibFile(const std::string &logical_file_name)
std::string outputFileNames(const EL::Job &job)
virtual::StatusCode doManagerStep(Detail::ManagerData &data) const
const OutputStream * outputIter
Definition Job.h:143
static const std::string optGridPrunShipAdditionalFilesOrDirs
Enables to ship additional files to the tarbal sent to the grid Should be a list of comma separated p...
Definition Job.h:494
::StatusCode doRetrieve(Detail::ManagerData &data) const
void fetchDefaults(const MetaObject &source)
fetch the meta-data from the given sample not present in this sample.
void setString(const std::string &name, const std::string &value)
set the meta-data string with the given name
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name
void save(const std::string &directory) const
save the list of samples to the given directory
std::vector< Sample * >::const_iterator iterator
the iterator to use
@ doRetrieve
call the actual doRetrieve method
@ submitJob
do the actual job submission
Definition ManagerStep.h:92
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition ShellExec.cxx:29

◆ doRetrieve()

StatusCode EL::PrunDriver::doRetrieve ( Detail::ManagerData & data) const
private

Definition at line 605 of file PrunDriver.cxx.

606{
607 RCU_READ_INVARIANT(this);
608 RCU_REQUIRE(not data.submitDir.empty());
609
610 TmpCd tmpDir(data.submitDir);
611
612 SH::SampleHandler sh;
613 sh.load("input");
614 RCU_ASSERT(sh.size());
615
616 const size_t nRunThreads = options()->castDouble("nc_run_threads", 0);
617 const size_t nDlThreads = options()->castDouble("nc_download_threads", 0);
618 processAllInState(sh, JobState::INIT, 0);
619 processAllInState(sh, JobState::RUN, nRunThreads);
620 processAllInState(sh, JobState::DOWNLOAD, nDlThreads);
621 processAllInState(sh, JobState::MERGE, 0);
622
623 sh.save("input");
624
625 std::cout << std::endl;
626
627 bool allDone = true;
628 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
629 JobState::Enum state = sampleState(*s);
630 std::string details = (*s)->meta()->castString("nc_ELG_state_details", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
631 if (not details.empty()) { details = '(' + details + ')'; }
632
633 std::cout << (*s)->name() << "\t";
634 switch (state) {
635 case JobState::INIT:
636 case JobState::RUN:
637 case JobState::DOWNLOAD:
638 case JobState::MERGE:
639 std::cout << JobState::name[state] << "\t";
640 break;
641 case JobState::FINISHED:
642 std::cout << "\033[1;32m" << JobState::name[state] << "\033[0m\t";
643 break;
644 case JobState::FAILED:
645 std::cout << "\033[1;31m" << JobState::name[state] << "\033[0m\t";
646 break;
647 }
648 std::cout << details << std::endl;
649
650 allDone &= (state == JobState::FINISHED || state == JobState::FAILED);
651 }
652
653 std::cout << std::endl;
654
655 data.retrieved = true;
656 data.completed = allDone;
657 return ::StatusCode::SUCCESS;
658}
#define RCU_ASSERT(x)
Definition Assert.h:222
#define RCU_REQUIRE(x)
Definition Assert.h:208
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
SH::MetaObject * options()
the list of options to jobs with this driver
double castDouble(const std::string &name, double def_val=0, CastMode mode=CAST_ERROR_THROW) const
the meta-data double with the given name
@ CAST_NOCAST_DEFAULT
cast and return the default value if the input has the wrong type
Definition MetaObject.h:78

◆ mergedOutputSave()

void EL::Driver::mergedOutputSave ( Detail::ManagerData & data)
staticinherited

create and save a sample handler assuming we created all the merged files at the requested locations

This is optional, but it is convenient for drivers that want to keep their outputs locally.

Guarantee
basic
Failures
out of memory II
i/o errors

◆ options() [1/2]

SH::MetaObject * EL::Driver::options ( )
inherited

the list of options to jobs with this driver

Guarantee
no-fail
Postcondition
result != 0

◆ options() [2/2]

const SH::MetaObject * EL::Driver::options ( ) const
inherited

◆ resubmit()

void EL::Driver::resubmit ( const std::string & location,
const std::string & option )
staticinherited

resubmit all failed sub-jobs for the job in the given location

\parm option driver-specific option string selecting which jobs to resubmit (and how)

Guarantee
basic, may partially resubmit
Failures
out of memory III
job resubmission errors
job can't be read
job was made with different driver

◆ retrieve()

bool EL::Driver::retrieve ( const std::string & location)
staticinherited

retrieve all the output for the job in the given location

While job failures will cause this method to fail you can typically retry it multiple times if you can use partial results.

Returns
whether the job completed successfully
Guarantee
basic, may partially retrieve
Failures
out of memory III
job failures
job can't be read
job was made with different driver

◆ setState()

void EL::PrunDriver::setState ( const std::string & location,
const std::string & task,
const std::string & state )
static

Definition at line 678 of file PrunDriver.cxx.

681{
682 RCU_REQUIRE(not location.empty());
683 RCU_REQUIRE(not task.empty());
684 RCU_REQUIRE(not state.empty());
685 TmpCd tmpDir(location);
686 SH::SampleHandler sh;
687 sh.load("input");
688 RCU_ASSERT(sh.size());
689 if (not sh.get(task)) {
690 std::cout << "Unknown task: " << task << std::endl;
691 std::cout << "Choose one of: " << std::endl;
692 sh.print();
693 return;
694 }
695 JobState::parse(state);
696 sh.get(task)->meta()->setString("nc_ELG_state", state);
697 sh.save("input");
698}

◆ status()

void EL::PrunDriver::status ( const std::string & location)
static

Definition at line 660 of file PrunDriver.cxx.

661{
662 RCU_REQUIRE(not location.empty());
663 TmpCd tmpDir(location);
664 SH::SampleHandler sh;
665 sh.load("input");
666 RCU_ASSERT(sh.size());
667 processAllInState(sh, JobState::RUN, 0);
668 sh.save("input");
669 for (SH::SampleHandler::iterator s = sh.begin(); s != sh.end(); ++s) {
670 JobState::Enum state = sampleState(*s);
671 std::string details = (*s)->meta()->castString("nc_ELG_state_details", "", SH::MetaObject::CAST_NOCAST_DEFAULT);
672 if (not details.empty()) { details = '(' + details + ')'; }
673 std::cout << (*s)->name() << "\t" << JobState::name[state]
674 << "\t" << details << std::endl;
675 }
676}

◆ submit()

std::string EL::Driver::submit ( const Job & job,
const std::string & location ) const
inherited

submit the given job with the given output location and wait for it to finish

This is mostly for small jobs and backward compatibility. For longer jobs use submitOnly instead.

Returns
The actual location of the submit directory, if the job was configured to generate a unique directory.
Guarantee
basic, may partially submit
Failures
out of memory II
Failures
can't create directory at location
submission errors

◆ submitOnly()

std::string EL::Driver::submitOnly ( const Job & job,
const std::string & location ) const
inherited

submit the given job with the given output location and return immediately

This method allows you to submit jobs to your local batch system, log out and at a later point log back in again.

Returns
The actual location of the submit directory, if the job was configured to generate a unique directory.
Guarantee
basic, may partially submit
Failures
out of memory II
can't create directory at location
submission errors \warn not all drivers support this. some will do all their work in the submit function. \warn you normally need to call wait() or retrieve() before you can use the output.

◆ testInvariant()

void EL::PrunDriver::testInvariant ( ) const

Definition at line 499 of file PrunDriver.cxx.

500{
501 RCU_INVARIANT(this != 0);
502}
#define RCU_INVARIANT(x)
Definition Assert.h:201

◆ updateLocation()

void EL::Driver::updateLocation ( const std::string & location)
staticinherited

update the internal location of files, after moving the submission directory

Guarantee
basic, may update partially
Failures
out of memory II \warn only move the submission directory after all your jobs are finished, or the results will be unpredictable

◆ wait()

bool EL::Driver::wait ( const std::string & location,
unsigned time = 60 )
staticinherited

retrieve all the output for the job in the given location and wait until it is finished completely.

poll the output every time seconds.

While job failures will cause this method to fail you can typically retry it multiple times if you can use partial results.

Typically sleeping for 60 seconds is an appropriate interval, but if it doesn't work for you, you can change it here.

Guarantee
basic, may partially retrieve
Failures
out of memory III
job failures
job can't be read
job was made with different driver

Member Data Documentation

◆ abortRetrieve

bool EL::Driver::abortRetrieve
staticprotectedinherited

this flag is set to true when the wait() function is running and a SIGINT is caught, meaning that control should be returned to the user as soon as possible.

drivers can use it to abort long running operations in doRetrieve before completion

Definition at line 212 of file Driver.h.

◆ m_options

SH::MetaObject EL::Driver::m_options
privateinherited

members directly corresponding to accessors

Definition at line 233 of file Driver.h.


The documentation for this class was generated from the following files: