43 #include <boost/algorithm/string.hpp>
52 static const unsigned int NSTATES = 6;
54 static const char*
name[NSTATES] =
55 {
"INIT",
"RUNNING",
"DOWNLOAD",
"MERGE",
"FINISHED",
"FAILED" };
58 for (
unsigned int i = 0;
i != NSTATES; ++
i) {
59 if (
what ==
name[
i]) {
return static_cast<Enum
>(
i); }
62 throw std::runtime_error(
"PrunDriver.cxx: Failed to parse job state string");
71 enum Enum { DONE=0, PENDING=1, FAIL=2 };
74 struct TransitionRule {
75 JobState::Enum fromState;
77 JobState::Enum toState;
78 TransitionRule(JobState::Enum fromState,
80 JobState::Enum toState)
81 : fromState(fromState)
90 TmpCd(
const std::string &
dir)
93 gSystem->cd(
dir.c_str());
110 static JobState::Enum nextState(JobState::Enum state, Status::Enum
status)
114 static const TransitionRule TABLE[] =
119 TransitionRule(
JobState::RUN, Status::DONE, JobState::DOWNLOAD),
121 TransitionRule(
JobState::RUN, Status::FAIL, JobState::FAILED),
123 TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124 TransitionRule(JobState::DOWNLOAD, Status::FAIL, JobState::FAILED),
129 static const unsigned int TABLE_SIZE =
sizeof(TABLE) /
sizeof(TABLE[0]);
130 for (
unsigned int i = 0;
i != TABLE_SIZE; ++
i) {
132 return TABLE[
i].toState;
136 throw std::logic_error(
"PrunDriver.cxx: Missing state transition rule");
144 o.
setString(
"nc_cmtConfig", gSystem->ExpandPathName(
"$AnalysisBase_PLATFORM"));
145 o.
setString(
"nc_useAthenaPackages",
"true");
146 const std::string mergestr =
"elg_merge jobdef.root %OUT %IN";
151 static bool downloadContainer(
const std::string&
name,
152 const std::string& location)
159 gSystem->Exec(Form(
"mkdir -p %s", location.c_str()));
161 std::vector<std::string> datasets;
164 if (
entry.type ==
"CONTAINER" ||
entry.type ==
"DIDType.CONTAINER")
165 datasets.push_back (
entry.name);
169 for (
const auto&
result : downloadResult)
171 if (
result.notDownloaded != 0)
183 using namespace EL::msgEventLoop;
187 static bool loaded =
false;
197 TPython::Bind(
dynamic_cast<TObject*
>(
sample),
"ELG_SAMPLE");
198 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
200 TPython::Exec(
"_anyresult = ELG_prun(ELG_SAMPLE)", &
result);
201 int ret = std::any_cast<int>(
result);
203 int ret = TPython::Eval(
"ELG_prun(ELG_SAMPLE)");
205 TPython::Bind(0,
"ELG_SAMPLE");
208 sample->meta()->setString(
"nc_ELG_state_details",
209 "problem submitting");
213 sample->meta()->setDouble(
"nc_jediTaskID", ret);
226 static bool loaded =
false;
233 TPython::Bind(
dynamic_cast<TObject*
>(
sample),
"ELG_SAMPLE");
234 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
236 TPython::Exec(
"_anyresult = ELG_jediState(ELG_SAMPLE)", &
result);
237 int ret = std::any_cast<int>(
result);
239 int ret = TPython::Eval(
"ELG_jediState(ELG_SAMPLE)");
241 TPython::Bind(0,
"ELG_SAMPLE");
243 if (ret == Status::DONE)
return Status::DONE;
244 if (ret == Status::FAIL)
return Status::FAIL;
247 if (ret != 90) {
sample->meta()->setString(
"nc_ELG_state_details",
"task status other than done/finished/failed/running"); }
250 sample->meta()->setString(
"nc_ELG_state_details",
251 "problem checking jedi task status");
254 return Status::PENDING;
263 std::lock_guard<std::mutex> lock(
mutex);
264 std::cout <<
"Downloading output from: "
265 <<
sample->name() <<
"..." << std::endl;
270 if (container[container.size()-1] ==
'/') {
271 container.resize(container.size() - 1);
273 container +=
"_hist/";
275 bool downloadOk = downloadContainer(container,
"elg/download/" + container);
277 if (not downloadOk) {
278 std::cerr <<
"Failed to download one or more files" << std::endl;
279 sample->meta()->setString(
"nc_ELG_state_details",
280 "error, check log for details");
281 return Status::PENDING;
293 if (container[container.size()-1] ==
'/') {
294 container.resize(container.size() - 1);
296 container +=
"_hist/";
297 const std::string
dir =
"elg/download/" + container;
299 const std::string
fileName =
"hist-output.root";
301 const std::string
target = Form(
"hist-%s.root",
sample->name().c_str());
303 const std::string findCmd(Form(
"find %s -name \"*.%s*\" | tr '\n' ' '",
305 std::istringstream
input(gSystem->GetFromPipe(findCmd.c_str()).Data());
306 std::vector<std::string>
files((std::istream_iterator<std::string>(
input)),
307 std::istream_iterator<std::string>());
312 if (not
files.size()) {
313 std::cerr <<
"Found no input files for merging! "
314 <<
"Requeueing sample for download..." << std::endl;
315 sample->meta()->setString(
"nc_ELG_state_details",
"retry, files were lost");
322 sample->meta()->setString(
"nc_ELG_state_details",
323 "error, check log for details");
324 gSystem->Exec(Form(
"rm -f %s",
target.c_str()));
325 return Status::PENDING;
328 for (
size_t i = 0;
i !=
files.size(); ++
i) {
329 gSystem->Exec(Form(
"rm %s",
files[
i].c_str()));
331 gSystem->Exec(Form(
"rmdir %s/*",
dir.c_str()));
332 gSystem->Exec(Form(
"rmdir %s",
dir.c_str()));
341 JobState::Enum state = sampleState(
sample);
343 sample->meta()->setString(
"nc_ELG_state_details",
"");
345 Status::Enum
status = Status::PENDING;
353 case JobState::DOWNLOAD:
360 case JobState::FAILED:
364 state = nextState(state,
status);
375 if (sampleState(*
s) == state) {
376 workList.push_back([
s]()->
void{ processTask(*
s); });
383 static std::string formatOutputName(
const SH::MetaObject& sampleMeta,
386 const std::string sampleName = sampleMeta.
castString(
"sample_name");
388 using namespace EL::msgEventLoop;
390 static const std::string nickname =
391 gSystem->GetFromPipe(Form(
"python -c \"%s\" 2>/dev/null",
392 "from pandatools import PsubUtils;"
393 "print(PsubUtils.getNickname());")).Data();
398 if (nickname.length()>20){
399 ANA_MSG_WARNING(
"No proxy available - cannot use nickname yet. Will try a late replacement.");
401 out.ReplaceAll(
"%nickname%", nickname);
404 out.ReplaceAll(
"%in:name%", sampleName);
406 std::stringstream
ss(sampleName);
409 while(std::getline(
ss,
item,
'.')) {
410 std::stringstream sskey;
411 sskey <<
"%in:name[" << ++
field <<
"]%";
412 out.ReplaceAll(sskey.str(),
item);
414 while (
out.Index(
"%in:") != -1) {
415 int i1 =
out.Index(
"%in:");
416 int i2 =
out.Index(
"%", i1+1);
417 TString metaName =
out(i1+4, i2-i1-4);
418 out.ReplaceAll(
"%in:"+metaName+
"%",
419 sampleMeta.
castString(std::string(metaName.Data())));
421 out.ReplaceAll(
"/",
"");
432 std::string
out =
"hist:hist-output.root";
435 while ((
obj = itr())) {
437 const std::string
name =
os->label() +
".root";
438 const std::string
ds =
446 static void saveJobDef(
const std::string&
fileName,
454 file.WriteTObject(&
job.jobConfig(),
"jobConfig",
"SingleKey");
455 file.WriteTObject(&
outputs,
"outputs",
"SingleKey");
456 bool haveDefault =
false;
462 file.WriteObject (&meta,
"defaultMetaObject");
474 const std::string outDSSuffix =
'_' +
outputLabel +
".root/";
499 using namespace msgEventLoop;
505 const std::string jobELGDir =
data.submitDir +
"/elg";
506 const std::string runShFile = jobELGDir +
"/runjob.sh";
508 const std::string mergeShFile = jobELGDir +
"/elg_merge";
514 const std::string jobDefFile = jobELGDir +
"/jobdef.root";
515 gSystem->Exec(Form(
"mkdir -p %s", jobELGDir.c_str()));
516 gSystem->Exec(Form(
"cp %s %s", runShOrig.c_str(), runShFile.c_str()));
517 gSystem->Exec(Form(
"chmod +x %s", runShFile.c_str()));
518 gSystem->Exec(Form(
"cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
519 gSystem->Exec(Form(
"chmod +x %s", mergeShFile.c_str()));
524 if (listToShipToGrid.size()){
526 "Creating symbolic links for additional files or directories to be sent to grid.\n"
527 "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
528 "Otherwise prun ignores those files."
531 std::vector<std::string> vect_filesOrDirToShip;
533 boost::split(vect_filesOrDirToShip,listToShipToGrid,boost::is_any_of(
","));
536 for (
const std::string & fileOrDirToShip: vect_filesOrDirToShip){
537 ANA_MSG_INFO ((
"Creating symbolic link for: " +fileOrDirToShip).c_str());
550 std::string outputSampleName = meta.
castString(
"nc_outputSampleName");
551 if (outputSampleName.empty()) {
552 outputSampleName =
"user.%nickname%.%in:name%";
554 meta.
setString(
"nc_outDS", formatOutputName(meta, outputSampleName));
556 meta.
setString(
"nc_writeInputToTxt",
"IN:input.txt");
558 const std::string execstr =
"runjob.sh " + (*s)->name();
560 meta.
setString(
"nc_framework",
"EventLoopGrid");
563 saveJobDef(jobDefFile, *
data.job,
sh);
568 shOut.
save(
data.submitDir +
"/output-" +
out->label());
571 shHist.
save(
data.submitDir +
"/output-hist");
577 sh.save(
data.submitDir +
"/input");
578 data.submitted =
true;
591 return ::StatusCode::SUCCESS;
599 TmpCd tmpDir(
data.submitDir);
605 const size_t nRunThreads =
options()->castDouble(
"nc_run_threads", 0);
606 const size_t nDlThreads =
options()->castDouble(
"nc_download_threads", 0);
609 processAllInState(
sh, JobState::DOWNLOAD, nDlThreads);
614 std::cout << std::endl;
618 JobState::Enum state = sampleState(*
s);
622 std::cout << (*s)->name() <<
"\t";
626 case JobState::DOWNLOAD:
631 std::cout <<
"\033[1;32m" <<
JobState::name[state] <<
"\033[0m\t";
633 case JobState::FAILED:
634 std::cout <<
"\033[1;31m" <<
JobState::name[state] <<
"\033[0m\t";
637 std::cout <<
details << std::endl;
642 std::cout << std::endl;
644 data.retrieved =
true;
645 data.completed = allDone;
646 return ::StatusCode::SUCCESS;
652 TmpCd tmpDir(location);
659 JobState::Enum state = sampleState(*
s);
663 <<
"\t" <<
details << std::endl;
668 const std::string& task,
669 const std::string& state)
674 TmpCd tmpDir(location);
678 if (not
sh.get(task)) {
679 std::cout <<
"Unknown task: " << task << std::endl;
680 std::cout <<
"Choose one of: " << std::endl;
685 sh.get(task)->meta()->setString(
"nc_ELG_state", state);