52 static const unsigned int NSTATES = 6;
54 static const char*
name[NSTATES] =
55 {
"INIT",
"RUNNING",
"DOWNLOAD",
"MERGE",
"FINISHED",
"FAILED" };
58 for (
unsigned int i = 0;
i != NSTATES; ++
i) {
59 if (
what ==
name[
i]) {
return static_cast<Enum
>(
i); }
62 throw std::runtime_error(
"PrunDriver.cxx: Failed to parse job state string");
71 enum Enum { DONE=0, PENDING=1, FAIL=2 };
74 struct TransitionRule {
75 JobState::Enum fromState;
77 JobState::Enum toState;
78 TransitionRule(JobState::Enum fromState,
80 JobState::Enum toState)
81 : fromState(fromState)
90 TmpCd(
const std::string &
dir)
93 gSystem->cd(
dir.c_str());
110 static JobState::Enum nextState(JobState::Enum state, Status::Enum
status)
114 static const TransitionRule TABLE[] =
119 TransitionRule(
JobState::RUN, Status::DONE, JobState::DOWNLOAD),
121 TransitionRule(
JobState::RUN, Status::FAIL, JobState::FAILED),
123 TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124 TransitionRule(JobState::DOWNLOAD, Status::FAIL, JobState::FAILED),
129 static const unsigned int TABLE_SIZE =
sizeof(TABLE) /
sizeof(TABLE[0]);
130 for (
unsigned int i = 0;
i != TABLE_SIZE; ++
i) {
132 return TABLE[
i].toState;
136 throw std::logic_error(
"PrunDriver.cxx: Missing state transition rule");
144 o.
setString(
"nc_cmtConfig", gSystem->ExpandPathName(
"$AnalysisBase_PLATFORM"));
145 o.
setString(
"nc_useAthenaPackages",
"true");
146 const std::string mergestr =
"elg_merge jobdef.root %OUT %IN";
151 static bool downloadContainer(
const std::string&
name,
152 const std::string& location)
159 gSystem->Exec(Form(
"mkdir -p %s", location.c_str()));
164 if (
entry.type ==
"CONTAINER" ||
entry.type ==
"DIDType.CONTAINER")
169 for (
const auto&
result : downloadResult)
171 if (
result.notDownloaded != 0)
183 using namespace EL::msgEventLoop;
187 static bool loaded =
false;
197 TPython::Bind(
dynamic_cast<TObject*
>(
sample),
"ELG_SAMPLE");
198 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
200 TPython::Exec(
"_anyresult = ROOT.std.make_any['int'](ELG_prun(ELG_SAMPLE))", &
result);
201 int ret = std::any_cast<int>(
result);
203 int ret = TPython::Eval(
"ELG_prun(ELG_SAMPLE)");
205 TPython::Bind(0,
"ELG_SAMPLE");
212 if (isFirstSample && ret == 1){
214 throw std::runtime_error(
"PrunDriver.cxx: aborting due to tarball creation issue");
218 sample->meta()->setString(
"nc_ELG_state_details",
219 "problem submitting");
223 sample->meta()->setDouble(
"nc_jediTaskID", ret);
236 static bool loaded =
false;
243 TPython::Bind(
dynamic_cast<TObject*
>(
sample),
"ELG_SAMPLE");
244 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01)
246 TPython::Exec(
"_anyresult = ROOT.std.make_any['int'](ELG_jediState(ELG_SAMPLE))", &
result);
247 int ret = std::any_cast<int>(
result);
249 int ret = TPython::Eval(
"ELG_jediState(ELG_SAMPLE)");
251 TPython::Bind(0,
"ELG_SAMPLE");
253 if (ret == Status::DONE)
return Status::DONE;
254 if (ret == Status::FAIL)
return Status::FAIL;
257 if (ret != 90) {
sample->meta()->setString(
"nc_ELG_state_details",
"task status other than done/finished/failed/running"); }
260 sample->meta()->setString(
"nc_ELG_state_details",
261 "problem checking jedi task status");
264 return Status::PENDING;
274 std::cout <<
"Downloading output from: "
275 <<
sample->name() <<
"..." << std::endl;
280 if (container[container.size()-1] ==
'/') {
281 container.resize(container.size() - 1);
283 container +=
"_hist/";
285 bool downloadOk = downloadContainer(container,
"elg/download/" + container);
287 if (not downloadOk) {
288 std::cerr <<
"Failed to download one or more files" << std::endl;
289 sample->meta()->setString(
"nc_ELG_state_details",
290 "error, check log for details");
291 return Status::PENDING;
303 if (container[container.size()-1] ==
'/') {
304 container.resize(container.size() - 1);
306 container +=
"_hist/";
307 const std::string
dir =
"elg/download/" + container;
309 const std::string
fileName =
"hist-output.root";
311 const std::string
target = Form(
"hist-%s.root",
sample->name().c_str());
313 const std::string findCmd(Form(
"find %s -name \"*.%s*\" | tr '\n' ' '",
315 std::istringstream
input(gSystem->GetFromPipe(findCmd.c_str()).Data());
316 std::vector<std::string>
files((std::istream_iterator<std::string>(
input)),
317 std::istream_iterator<std::string>());
322 if (not
files.size()) {
323 std::cerr <<
"Found no input files for merging! "
324 <<
"Requeueing sample for download..." << std::endl;
325 sample->meta()->setString(
"nc_ELG_state_details",
"retry, files were lost");
332 sample->meta()->setString(
"nc_ELG_state_details",
333 "error, check log for details");
334 gSystem->Exec(Form(
"rm -f %s",
target.c_str()));
335 return Status::PENDING;
338 for (
size_t i = 0;
i !=
files.size(); ++
i) {
339 gSystem->Exec(Form(
"rm %s",
files[
i].c_str()));
341 gSystem->Exec(Form(
"rmdir %s/*",
dir.c_str()));
342 gSystem->Exec(Form(
"rmdir %s",
dir.c_str()));
351 JobState::Enum state = sampleState(
sample);
353 sample->meta()->setString(
"nc_ELG_state_details",
"");
355 Status::Enum
status = Status::PENDING;
363 case JobState::DOWNLOAD:
370 case JobState::FAILED:
374 state = nextState(state,
status);
385 bool isFirstSample =
true;
387 if (sampleState(*
s) == state) {
388 workList.push_back([
s, isFirstSample]()->
void{ processTask(*
s, isFirstSample); });
390 isFirstSample =
false;
396 static std::string formatOutputName(
const SH::MetaObject& sampleMeta,
401 using namespace EL::msgEventLoop;
403 static const std::string nickname =
404 gSystem->GetFromPipe(Form(
"python -c \"%s\" 2>/dev/null",
405 "from pandatools import PsubUtils;"
406 "print(PsubUtils.getNickname());")).Data();
411 if (nickname.length()>20){
412 ANA_MSG_WARNING(
"No proxy available - cannot use nickname yet. Will try a late replacement.");
414 out.ReplaceAll(
"%nickname%", nickname);
422 while(std::getline(
ss,
item,
'.')) {
423 std::stringstream sskey;
424 sskey <<
"%in:name[" << ++
field <<
"]%";
425 out.ReplaceAll(sskey.str(),
item);
427 while (
out.Index(
"%in:") != -1) {
428 int i1 =
out.Index(
"%in:");
429 int i2 =
out.Index(
"%", i1+1);
430 TString metaName =
out(i1+4, i2-i1-4);
431 out.ReplaceAll(
"%in:"+metaName+
"%",
432 sampleMeta.
castString(std::string(metaName.Data())));
434 out.ReplaceAll(
"/",
"");
445 std::string
out =
"hist:hist-output.root";
448 while ((
obj = itr())) {
450 const std::string
name =
os->label() +
".root";
451 const std::string
ds =
459 static void saveJobDef(
const std::string&
fileName,
467 file.WriteTObject(&
job.jobConfig(),
"jobConfig",
"SingleKey");
468 file.WriteTObject(&
outputs,
"outputs",
"SingleKey");
469 bool haveDefault =
false;
475 file.WriteObject (&meta,
"defaultMetaObject");
487 const std::string outDSSuffix =
'_' +
outputLabel +
".root/";
512 using namespace msgEventLoop;
518 const std::string jobELGDir =
data.submitDir +
"/elg";
519 const std::string runShFile = jobELGDir +
"/runjob.sh";
521 const std::string mergeShFile = jobELGDir +
"/elg_merge";
527 const std::string jobDefFile = jobELGDir +
"/jobdef.root";
528 gSystem->Exec(Form(
"mkdir -p %s", jobELGDir.c_str()));
529 gSystem->Exec(Form(
"cp %s %s", runShOrig.c_str(), runShFile.c_str()));
530 gSystem->Exec(Form(
"chmod +x %s", runShFile.c_str()));
531 gSystem->Exec(Form(
"cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
532 gSystem->Exec(Form(
"chmod +x %s", mergeShFile.c_str()));
537 if (listToShipToGrid.size()){
539 "Creating symbolic links for additional files or directories to be sent to grid.\n"
540 "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
541 "Otherwise prun ignores those files."
544 std::vector<std::string> vect_filesOrDirToShip;
547 for (
const std::string & fileOrDirToShip: vect_filesOrDirToShip){
548 ANA_MSG_INFO ((
"Creating symbolic link for: " +fileOrDirToShip).c_str());
561 std::string outputSampleName = meta.
castString(
"nc_outputSampleName");
562 if (outputSampleName.empty()) {
563 outputSampleName =
"user.%nickname%.%in:name%";
565 meta.
setString(
"nc_outDS", formatOutputName(meta, outputSampleName));
567 meta.
setString(
"nc_writeInputToTxt",
"IN:input.txt");
569 const std::string execstr =
"runjob.sh " + (*s)->name();
571 meta.
setString(
"nc_framework",
"EventLoopGrid");
574 saveJobDef(jobDefFile, *
data.job,
sh);
579 shOut.
save(
data.submitDir +
"/output-" +
out->label());
582 shHist.
save(
data.submitDir +
"/output-hist");
588 sh.save(
data.submitDir +
"/input");
589 data.submitted =
true;
602 return ::StatusCode::SUCCESS;
610 TmpCd tmpDir(
data.submitDir);
616 const size_t nRunThreads =
options()->castDouble(
"nc_run_threads", 0);
617 const size_t nDlThreads =
options()->castDouble(
"nc_download_threads", 0);
620 processAllInState(
sh, JobState::DOWNLOAD, nDlThreads);
625 std::cout << std::endl;
629 JobState::Enum state = sampleState(*
s);
633 std::cout << (*s)->name() <<
"\t";
637 case JobState::DOWNLOAD:
642 std::cout <<
"\033[1;32m" <<
JobState::name[state] <<
"\033[0m\t";
644 case JobState::FAILED:
645 std::cout <<
"\033[1;31m" <<
JobState::name[state] <<
"\033[0m\t";
648 std::cout <<
details << std::endl;
653 std::cout << std::endl;
655 data.retrieved =
true;
656 data.completed = allDone;
657 return ::StatusCode::SUCCESS;
663 TmpCd tmpDir(location);
670 JobState::Enum state = sampleState(*
s);
674 <<
"\t" <<
details << std::endl;
679 const std::string& task,
680 const std::string& state)
685 TmpCd tmpDir(location);
689 if (not
sh.get(task)) {
690 std::cout <<
"Unknown task: " << task << std::endl;
691 std::cout <<
"Choose one of: " << std::endl;
696 sh.get(task)->meta()->setString(
"nc_ELG_state", state);