52 static const unsigned int NSTATES = 6;
53 enum Enum {
INIT=0, RUN=1, DOWNLOAD=2, MERGE=3, FINISHED=4, FAILED=5 };
54 static const char* name[NSTATES] =
55 {
"INIT",
"RUNNING",
"DOWNLOAD",
"MERGE",
"FINISHED",
"FAILED" };
56 Enum
parse(
const std::string& what)
58 for (
unsigned int i = 0; i != NSTATES; ++i) {
59 if (what == name[i]) {
return static_cast<Enum
>(i); }
62 throw std::runtime_error(
"PrunDriver.cxx: Failed to parse job state string");
71 enum Enum { DONE=0, PENDING=1, FAIL=2 };
74 struct TransitionRule {
75 JobState::Enum fromState;
77 JobState::Enum toState;
78 TransitionRule(JobState::Enum fromState,
80 JobState::Enum toState)
81 : fromState(fromState)
89 const std::string origDir;
90 TmpCd(
const std::string & dir)
91 : origDir(gSystem->pwd())
93 gSystem->cd(dir.c_str());
97 gSystem->cd(origDir.c_str());
105 static const std::string defaultState = JobState::name[JobState::INIT];
107 return JobState::parse(
label);
110static JobState::Enum
nextState(JobState::Enum state, Status::Enum status)
114 static const TransitionRule TABLE[] =
116 TransitionRule(JobState::INIT, Status::DONE, JobState::RUN),
117 TransitionRule(JobState::INIT, Status::PENDING, JobState::INIT),
118 TransitionRule(JobState::INIT, Status::FAIL, JobState::FAILED),
119 TransitionRule(JobState::RUN, Status::DONE, JobState::DOWNLOAD),
120 TransitionRule(JobState::RUN, Status::PENDING, JobState::RUN),
121 TransitionRule(JobState::RUN, Status::FAIL, JobState::FAILED),
122 TransitionRule(JobState::DOWNLOAD, Status::DONE, JobState::MERGE),
123 TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124 TransitionRule(JobState::DOWNLOAD, Status::FAIL, JobState::FAILED),
125 TransitionRule(JobState::MERGE, Status::DONE, JobState::FINISHED),
126 TransitionRule(JobState::MERGE, Status::PENDING, JobState::MERGE),
127 TransitionRule(JobState::MERGE, Status::FAIL, JobState::DOWNLOAD)
129 static const unsigned int TABLE_SIZE =
sizeof(TABLE) /
sizeof(TABLE[0]);
130 for (
unsigned int i = 0; i != TABLE_SIZE; ++i) {
131 if (TABLE[i].fromState == state && TABLE[i].status == status) {
132 return TABLE[i].toState;
136 throw std::logic_error(
"PrunDriver.cxx: Missing state transition rule");
144 o.
setString(
"nc_cmtConfig", gSystem->ExpandPathName(
"$AnalysisBase_PLATFORM"));
145 o.
setString(
"nc_useAthenaPackages",
"true");
146 const std::string mergestr =
"elg_merge jobdef.root %OUT %IN";
152 const std::string& location)
159 gSystem->Exec(Form(
"mkdir -p %s", location.c_str()));
161 std::vector<std::string> datasets;
164 if (entry.type ==
"CONTAINER" || entry.type ==
"DIDType.CONTAINER")
165 datasets.push_back (entry.name);
169 for (
const auto&
result : downloadResult)
171 if (
result.notDownloaded != 0)
183 using namespace EL::msgEventLoop;
185 ANA_MSG_INFO(
"Submitting " << sample->name() <<
"..." );
187 static bool loaded =
false;
193 TPython::LoadMacro(path.c_str());
197 TPython::Bind(
dynamic_cast<TObject*
>(sample),
"ELG_SAMPLE");
198#if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
200 TPython::Exec(
"_anyresult = ROOT.std.make_any['int'](ELG_prun(ELG_SAMPLE))", &
result);
201 int ret = std::any_cast<int>(
result);
203 int ret = TPython::Eval(
"ELG_prun(ELG_SAMPLE)");
205 TPython::Bind(0,
"ELG_SAMPLE");
212 if (isFirstSample && ret == 1){
214 throw std::runtime_error(
"PrunDriver.cxx: aborting due to tarball creation issue");
218 sample->meta()->setString(
"nc_ELG_state_details",
219 "problem submitting");
223 sample->meta()->setDouble(
"nc_jediTaskID", ret);
236 static bool loaded =
false;
239 TPython::LoadMacro(path.c_str());
243 TPython::Bind(
dynamic_cast<TObject*
>(sample),
"ELG_SAMPLE");
244#if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
246 TPython::Exec(
"_anyresult = ROOT.std.make_any['int'](ELG_jediState(ELG_SAMPLE))", &
result);
247 int ret = std::any_cast<int>(
result);
249 int ret = TPython::Eval(
"ELG_jediState(ELG_SAMPLE)");
251 TPython::Bind(0,
"ELG_SAMPLE");
253 if (ret == Status::DONE)
return Status::DONE;
254 if (ret == Status::FAIL)
return Status::FAIL;
257 if (ret != 90) { sample->meta()->setString(
"nc_ELG_state_details",
"task status other than done/finished/failed/running"); }
260 sample->meta()->setString(
"nc_ELG_state_details",
261 "problem checking jedi task status");
264 return Status::PENDING;
272 static std::mutex
mutex;
273 std::lock_guard<std::mutex> lock(
mutex);
274 std::cout <<
"Downloading output from: "
275 << sample->name() <<
"..." << std::endl;
287 if (not downloadOk) {
288 std::cerr <<
"Failed to download one or more files" << std::endl;
289 sample->meta()->setString(
"nc_ELG_state_details",
290 "error, check log for details");
291 return Status::PENDING;
307 const std::string dir =
"elg/download/" +
container;
309 const std::string fileName =
"hist-output.root";
311 const std::string target = Form(
"hist-%s.root", sample->name().c_str());
313 const std::string findCmd(Form(
"find %s -name \"*.%s*\" | tr '\n' ' '",
314 dir.c_str(), fileName.c_str()));
315 std::istringstream input(gSystem->GetFromPipe(findCmd.c_str()).Data());
316 std::vector<std::string>
files((std::istream_iterator<std::string>(input)),
317 std::istream_iterator<std::string>());
322 if (not
files.size()) {
323 std::cerr <<
"Found no input files for merging! "
324 <<
"Requeueing sample for download..." << std::endl;
325 sample->meta()->setString(
"nc_ELG_state_details",
"retry, files were lost");
332 sample->meta()->setString(
"nc_ELG_state_details",
333 "error, check log for details");
334 gSystem->Exec(Form(
"rm -f %s", target.c_str()));
335 return Status::PENDING;
338 for (
size_t i = 0; i !=
files.size(); ++i) {
339 gSystem->Exec(Form(
"rm %s",
files[i].c_str()));
341 gSystem->Exec(Form(
"rmdir %s/*", dir.c_str()));
342 gSystem->Exec(Form(
"rmdir %s", dir.c_str()));
353 sample->meta()->setString(
"nc_ELG_state_details",
"");
355 Status::Enum status = Status::PENDING;
358 status =
submit(sample, isFirstSample);
363 case JobState::DOWNLOAD:
366 case JobState::MERGE:
367 status =
merge(sample);
369 case JobState::FINISHED:
370 case JobState::FAILED:
375 sample->meta()->setString(
"nc_ELG_state", JobState::name[state]);
379 const size_t nThreads)
385 bool isFirstSample =
true;
388 workList.push_back([s, isFirstSample]()->
void{
processTask(*s, isFirstSample); });
390 isFirstSample =
false;
397 const std::string & pattern)
399 const std::string sampleName = sampleMeta.
castString(
"sample_name");
401 using namespace EL::msgEventLoop;
403 static const std::string nickname =
404 gSystem->GetFromPipe(Form(
"python -c \"%s\" 2>/dev/null",
405 "from pandatools import PsubUtils;"
406 "print(PsubUtils.getNickname());")).Data();
408 TString out = pattern.c_str();
411 if (nickname.length()>20){
412 ANA_MSG_WARNING(
"No proxy available - cannot use nickname yet. Will try a late replacement.");
414 out.ReplaceAll(
"%nickname%", nickname);
417 out.ReplaceAll(
"%in:name%", sampleName);
419 std::stringstream
ss(sampleName);
422 while(std::getline(
ss, item,
'.')) {
423 std::stringstream sskey;
424 sskey <<
"%in:name[" << ++field <<
"]%";
425 out.ReplaceAll(sskey.str(), item);
427 while (out.Index(
"%in:") != -1) {
428 int i1 = out.Index(
"%in:");
429 int i2 = out.Index(
"%", i1+1);
430 TString metaName = out(i1+4, i2-i1-4);
431 out.ReplaceAll(
"%in:"+metaName+
"%",
432 sampleMeta.
castString(std::string(metaName.Data())));
434 out.ReplaceAll(
"/",
"");
442 end = job.outputEnd(); out != end; ++out) {
443 outputs.Add(out->Clone());
445 std::string out =
"hist:hist-output.root";
448 while ((obj = itr())) {
450 const std::string name = os->label() +
".root";
451 const std::string ds =
453 out +=
"," + (ds.empty() ? name : ds +
":" + name);
463 TFile
file(fileName.c_str(),
"RECREATE");
466 outputs.Add(o->Clone());
467 file.WriteTObject(&job.jobConfig(),
"jobConfig",
"SingleKey");
468 file.WriteTObject(&outputs,
"outputs",
"SingleKey");
469 bool haveDefault =
false;
472 file.WriteObject(&
meta,
meta.castString(
"sample_name").c_str());
475 file.WriteObject (&
meta,
"defaultMetaObject");
486 const std::string outputFile =
"*" +
outputLabel +
".root*";
487 const std::string outDSSuffix =
'_' +
outputLabel +
".root/";
512 using namespace msgEventLoop;
518 const std::string jobELGDir =
data.submitDir +
"/elg";
519 const std::string runShFile = jobELGDir +
"/runjob.sh";
521 const std::string mergeShFile = jobELGDir +
"/elg_merge";
527 const std::string jobDefFile = jobELGDir +
"/jobdef.root";
528 gSystem->Exec(Form(
"mkdir -p %s", jobELGDir.c_str()));
529 gSystem->Exec(Form(
"cp %s %s", runShOrig.c_str(), runShFile.c_str()));
530 gSystem->Exec(Form(
"chmod +x %s", runShFile.c_str()));
531 gSystem->Exec(Form(
"cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
532 gSystem->Exec(Form(
"chmod +x %s", mergeShFile.c_str()));
537 if (listToShipToGrid.size()){
539 "Creating symbolic links for additional files or directories to be sent to grid.\n"
540 "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
541 "Otherwise prun ignores those files."
544 std::vector<std::string> vect_filesOrDirToShip;
545 for (
auto&& part : std::views::split(listToShipToGrid,
',')) vect_filesOrDirToShip.emplace_back(part.begin(), part.end());
547 for (
const std::string & fileOrDirToShip: vect_filesOrDirToShip){
548 ANA_MSG_INFO ((
"Creating symbolic link for: " +fileOrDirToShip).c_str());
561 std::string outputSampleName =
meta.castString(
"nc_outputSampleName");
562 if (outputSampleName.empty()) {
563 outputSampleName =
"user.%nickname%.%in:name%";
566 meta.setString(
"nc_inDS",
meta.castString(
"nc_grid", (*s)->name()));
567 meta.setString(
"nc_writeInputToTxt",
"IN:input.txt");
568 meta.setString(
"nc_match",
meta.castString(
"nc_grid_filter"));
569 const std::string execstr =
"runjob.sh " + (*s)->name();
570 meta.setString(
"nc_exec", execstr);
571 meta.setString(
"nc_framework",
"EventLoopGrid");
577 out !=
data.job->outputEnd(); ++out) {
579 shOut.
save(
data.submitDir +
"/output-" + out->label());
582 shHist.
save(
data.submitDir +
"/output-hist");
584 TmpCd keepDir(jobELGDir);
588 sh.save(
data.submitDir +
"/input");
589 data.submitted =
true;
602 return ::StatusCode::SUCCESS;
610 TmpCd tmpDir(
data.submitDir);
616 const size_t nRunThreads =
options()->castDouble(
"nc_run_threads", 0);
617 const size_t nDlThreads =
options()->castDouble(
"nc_download_threads", 0);
625 std::cout << std::endl;
633 std::cout << (*s)->name() <<
"\t";
637 case JobState::DOWNLOAD:
638 case JobState::MERGE:
639 std::cout << JobState::name[state] <<
"\t";
641 case JobState::FINISHED:
642 std::cout <<
"\033[1;32m" << JobState::name[state] <<
"\033[0m\t";
644 case JobState::FAILED:
645 std::cout <<
"\033[1;31m" << JobState::name[state] <<
"\033[0m\t";
648 std::cout <<
details << std::endl;
650 allDone &= (state == JobState::FINISHED || state == JobState::FAILED);
653 std::cout << std::endl;
655 data.retrieved =
true;
656 data.completed = allDone;
657 return ::StatusCode::SUCCESS;
663 TmpCd tmpDir(location);
673 std::cout << (*s)->name() <<
"\t" << JobState::name[state]
674 <<
"\t" <<
details << std::endl;
679 const std::string& task,
680 const std::string& state)
685 TmpCd tmpDir(location);
689 if (not
sh.get(task)) {
690 std::cout <<
"Unknown task: " << task << std::endl;
691 std::cout <<
"Choose one of: " << std::endl;
695 JobState::parse(state);
696 sh.get(task)->meta()->setString(
"nc_ELG_state", state);
#define RCU_NEW_INVARIANT(x)
#define RCU_READ_INVARIANT(x)
char data[hepevt_bytes_allocation_ATLAS]
const std::string outputLabel
std::string PathResolverFindCalibFile(const std::string &logical_file_name)
static SH::MetaObject defaultOpts()
static void processAllInState(const SH::SampleHandler &sh, JobState::Enum state, const size_t nThreads)
ClassImp(EL::PrunDriver) namespace
std::string outputFileNames(const EL::Job &job)
static bool downloadContainer(const std::string &name, const std::string &location)
static JobState::Enum nextState(JobState::Enum state, Status::Enum status)
static std::string formatOutputName(const SH::MetaObject &sampleMeta, const std::string &pattern)
static Status::Enum checkPandaTask(SH::Sample *const sample)
static void saveJobDef(const std::string &fileName, const EL::Job &job, const SH::SampleHandler sh)
static Status::Enum download(SH::Sample *const sample)
static JobState::Enum sampleState(SH::Sample *sample)
static SH::SampleHandler outputSH(const SH::SampleHandler &in, const std::string &outputLabel)
static void processTask(SH::Sample *const sample, const bool isFirstSample)
SH::MetaObject * options()
the list of options to jobs with this driver
virtual::StatusCode doManagerStep(Detail::ManagerData &data) const
const OutputStream * outputIter
static const std::string optGridPrunShipAdditionalFilesOrDirs
Enables to ship additional files to the tarbal sent to the grid Should be a list of comma separated p...
static const std::string optContainerSuffix
a Driver to submit jobs via prun
static void status(const std::string &location)
::StatusCode doRetrieve(Detail::ManagerData &data) const
static void setState(const std::string &location, const std::string &task, const std::string &state)
void testInvariant() const
This class implements a Sample located on the grid.
A class that manages a list of Sample objects.
void save(const std::string &directory) const
save the list of samples to the given directory
iterator begin() const
the begin iterator to use
std::vector< Sample * >::const_iterator iterator
the iterator to use
iterator end() const
the end iterator to use
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
MetaObject * meta()
the meta-information for this sample
const std::string process
std::vector< std::string > files
file names and file pointers
std::string label(const std::string &format, int i)
@ doRetrieve
call the actual doRetrieve method
@ submitJob
do the actual job submission
::StatusCode StatusCode
StatusCode definition for legacy code.
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
void hadd(const std::string &output_file, const std::vector< std::string > &input_files, unsigned max_files)
effects: perform the hadd functionality guarantee: basic failures: out of memory III failures: i/o er...
std::vector< RucioDownloadResult > rucioDownloadList(const std::string &location, const std::vector< std::string > &datasets)
run rucio-download with multiple datasets
std::vector< RucioListDidsEntry > rucioListDids(const std::string &dataset)
run rucio-list-dids for the given dataset
DataModel_detail::iterator< DVL > unique(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of unique for DataVector/List.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
an internal data structure for passing data between different manager objects anbd step