43 #include <boost/algorithm/string.hpp>
52 static const unsigned int NSTATES = 6;
54 static const char*
name[NSTATES] =
55 {
"INIT",
"RUNNING",
"DOWNLOAD",
"MERGE",
"FINISHED",
"FAILED" };
58 for (
unsigned int i = 0;
i != NSTATES; ++
i) {
59 if (
what ==
name[
i]) {
return static_cast<Enum
>(
i); }
62 throw std::runtime_error(
"PrunDriver.cxx: Failed to parse job state string");
71 enum Enum { DONE=0, PENDING=1,
FAIL=2 };
74 struct TransitionRule {
75 JobState::Enum fromState;
77 JobState::Enum toState;
78 TransitionRule(JobState::Enum fromState,
80 JobState::Enum toState)
81 : fromState(fromState)
90 TmpCd(
const std::string &
dir)
93 gSystem->cd(
dir.c_str());
110 static JobState::Enum nextState(JobState::Enum state, Status::Enum
status)
114 static const TransitionRule TABLE[] =
119 TransitionRule(
JobState::RUN, Status::DONE, JobState::DOWNLOAD),
123 TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
124 TransitionRule(JobState::DOWNLOAD,
Status::FAIL, JobState::FAILED),
129 static const unsigned int TABLE_SIZE =
sizeof(TABLE) /
sizeof(TABLE[0]);
130 for (
unsigned int i = 0;
i != TABLE_SIZE; ++
i) {
132 return TABLE[
i].toState;
136 throw std::logic_error(
"PrunDriver.cxx: Missing state transition rule");
144 o.
setString(
"nc_cmtConfig", gSystem->ExpandPathName(
"$AnalysisBase_PLATFORM"));
145 o.
setString(
"nc_useAthenaPackages",
"true");
146 const std::string mergestr =
"elg_merge jobdef.root %OUT %IN";
151 static bool downloadContainer(
const std::string&
name,
159 gSystem->Exec(Form(
"mkdir -p %s",
location.c_str()));
164 if (
entry.type ==
"CONTAINER" ||
entry.type ==
"DIDType.CONTAINER")
169 for (
const auto&
result : downloadResult)
171 if (
result.notDownloaded != 0)
184 std::cout <<
"Submitting " <<
sample->name() <<
"...\n";
186 static bool loaded =
false;
196 TPython::Bind(
dynamic_cast<TObject*
>(
sample),
"ELG_SAMPLE");
197 int ret = TPython::Eval(
"ELG_prun(ELG_SAMPLE)");
198 TPython::Bind(0,
"ELG_SAMPLE");
201 sample->meta()->setString(
"nc_ELG_state_details",
202 "problem submitting");
206 sample->meta()->setDouble(
"nc_jediTaskID",
ret);
216 static bool loaded =
false;
223 TPython::Bind(
dynamic_cast<TObject*
>(
sample),
"ELG_SAMPLE");
224 int ret = TPython::Eval(
"ELG_jediState(ELG_SAMPLE)");
225 TPython::Bind(0,
"ELG_SAMPLE");
227 if (
ret == Status::DONE)
return Status::DONE;
231 if (
ret != 90) {
sample->meta()->setString(
"nc_ELG_state_details",
"task status other than done/finished/failed/running"); }
234 sample->meta()->setString(
"nc_ELG_state_details",
235 "problem checking jedi task status");
238 return Status::PENDING;
247 std::lock_guard<std::mutex> lock(
mutex);
248 std::cout <<
"Downloading output from: "
249 <<
sample->name() <<
"..." << std::endl;
254 if (container[container.size()-1] ==
'/') {
255 container.resize(container.size() - 1);
257 container +=
"_hist/";
259 bool downloadOk = downloadContainer(container,
"elg/download/" + container);
261 if (not downloadOk) {
262 std::cerr <<
"Failed to download one or more files" << std::endl;
263 sample->meta()->setString(
"nc_ELG_state_details",
264 "error, check log for details");
265 return Status::PENDING;
277 if (container[container.size()-1] ==
'/') {
278 container.resize(container.size() - 1);
280 container +=
"_hist/";
281 const std::string
dir =
"elg/download/" + container;
283 const std::string
fileName =
"hist-output.root";
285 const std::string
target = Form(
"hist-%s.root",
sample->name().c_str());
287 const std::string findCmd(Form(
"find %s -name \"*.%s*\" | tr '\n' ' '",
289 std::istringstream
input(gSystem->GetFromPipe(findCmd.c_str()).Data());
290 std::vector<std::string>
files((std::istream_iterator<std::string>(
input)),
291 std::istream_iterator<std::string>());
296 if (not
files.size()) {
297 std::cerr <<
"Found no input files for merging! "
298 <<
"Requeueing sample for download..." << std::endl;
299 sample->meta()->setString(
"nc_ELG_state_details",
"retry, files were lost");
306 sample->meta()->setString(
"nc_ELG_state_details",
307 "error, check log for details");
308 gSystem->Exec(Form(
"rm -f %s",
target.c_str()));
309 return Status::PENDING;
312 for (
size_t i = 0;
i !=
files.size(); ++
i) {
313 gSystem->Exec(Form(
"rm %s",
files[
i].c_str()));
315 gSystem->Exec(Form(
"rmdir %s/*",
dir.c_str()));
316 gSystem->Exec(Form(
"rmdir %s",
dir.c_str()));
325 JobState::Enum state = sampleState(
sample);
327 sample->meta()->setString(
"nc_ELG_state_details",
"");
329 Status::Enum
status = Status::PENDING;
337 case JobState::DOWNLOAD:
344 case JobState::FAILED:
348 state = nextState(state,
status);
359 if (sampleState(*
s) == state) {
360 workList.push_back([
s]()->
void{ processTask(*
s); });
367 static std::string formatOutputName(
const SH::MetaObject& sampleMeta,
370 const std::string sampleName = sampleMeta.
castString(
"sample_name");
373 static const std::string nickname =
374 gSystem->GetFromPipe(Form(
"python -c \"%s\" 2>/dev/null",
375 "from pandatools import PsubUtils;"
376 "print(PsubUtils.getNickname());")).Data();
380 out.ReplaceAll(
"%nickname%", nickname);
381 out.ReplaceAll(
"%in:name%", sampleName);
383 std::stringstream
ss(sampleName);
386 while(std::getline(
ss,
item,
'.')) {
387 std::stringstream sskey;
388 sskey <<
"%in:name[" << ++
field <<
"]%";
389 out.ReplaceAll(sskey.str(),
item);
391 while (
out.Index(
"%in:") != -1) {
392 int i1 =
out.Index(
"%in:");
393 int i2 =
out.Index(
"%", i1+1);
394 TString metaName =
out(i1+4, i2-i1-4);
395 out.ReplaceAll(
"%in:"+metaName+
"%",
396 sampleMeta.
castString(std::string(metaName.Data())));
398 out.ReplaceAll(
"/",
"");
409 std::string
out =
"hist:hist-output.root";
412 while ((
obj = itr())) {
414 const std::string
name =
os->label() +
".root";
415 const std::string
ds =
423 static void saveJobDef(
const std::string&
fileName,
431 file.WriteTObject(&
job.jobConfig(),
"jobConfig",
"SingleKey");
432 file.WriteTObject(&
outputs,
"outputs",
"SingleKey");
433 bool haveDefault =
false;
439 file.WriteObject (&meta,
"defaultMetaObject");
451 const std::string outDSSuffix =
'_' +
outputLabel +
".root/";
476 using namespace msgEventLoop;
482 const std::string jobELGDir =
data.submitDir +
"/elg";
483 const std::string runShFile = jobELGDir +
"/runjob.sh";
485 const std::string mergeShFile = jobELGDir +
"/elg_merge";
491 const std::string jobDefFile = jobELGDir +
"/jobdef.root";
492 gSystem->Exec(Form(
"mkdir -p %s", jobELGDir.c_str()));
493 gSystem->Exec(Form(
"cp %s %s", runShOrig.c_str(), runShFile.c_str()));
494 gSystem->Exec(Form(
"chmod +x %s", runShFile.c_str()));
495 gSystem->Exec(Form(
"cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
496 gSystem->Exec(Form(
"chmod +x %s", mergeShFile.c_str()));
501 if (listToShipToGrid.size()){
503 "Creating symbolic links for additional files or directories to be sent to grid.\n"
504 "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n"
505 "Otherwise prun ignores those files."
508 std::vector<std::string> vect_filesOrDirToShip;
510 boost::split(vect_filesOrDirToShip,listToShipToGrid,boost::is_any_of(
","));
513 for (
const std::string & fileOrDirToShip: vect_filesOrDirToShip){
514 ANA_MSG_INFO ((
"Creating symbolic link for: " +fileOrDirToShip).c_str());
527 std::string outputSampleName = meta.
castString(
"nc_outputSampleName");
528 if (outputSampleName.empty()) {
529 outputSampleName =
"user.%nickname%.%in:name%";
531 meta.
setString(
"nc_outDS", formatOutputName(meta, outputSampleName));
533 meta.
setString(
"nc_writeInputToTxt",
"IN:input.txt");
535 const std::string execstr =
"runjob.sh " + (*s)->name();
539 saveJobDef(jobDefFile, *
data.job,
sh);
544 shOut.
save(
data.submitDir +
"/output-" +
out->label());
547 shHist.
save(
data.submitDir +
"/output-hist");
553 sh.save(
data.submitDir +
"/input");
554 data.submitted =
true;
567 return ::StatusCode::SUCCESS;
575 TmpCd tmpDir(
data.submitDir);
581 const size_t nRunThreads =
options()->castDouble(
"nc_run_threads", 0);
582 const size_t nDlThreads =
options()->castDouble(
"nc_download_threads", 0);
585 processAllInState(
sh, JobState::DOWNLOAD, nDlThreads);
590 std::cout << std::endl;
594 JobState::Enum state = sampleState(*
s);
598 std::cout << (*s)->name() <<
"\t";
602 case JobState::DOWNLOAD:
607 std::cout <<
"\033[1;32m" <<
JobState::name[state] <<
"\033[0m\t";
609 case JobState::FAILED:
610 std::cout <<
"\033[1;31m" <<
JobState::name[state] <<
"\033[0m\t";
613 std::cout <<
details << std::endl;
618 std::cout << std::endl;
620 data.retrieved =
true;
621 data.completed = allDone;
622 return ::StatusCode::SUCCESS;
635 JobState::Enum state = sampleState(*
s);
639 <<
"\t" <<
details << std::endl;
644 const std::string& task,
645 const std::string& state)
654 if (not
sh.get(task)) {
655 std::cout <<
"Unknown task: " << task << std::endl;
656 std::cout <<
"Choose one of: " << std::endl;
661 sh.get(task)->meta()->setString(
"nc_ELG_state", state);