43 #include <boost/algorithm/string.hpp> 
   52     static const unsigned int NSTATES = 6;
 
   54     static const char* 
name[NSTATES] = 
 
   55       { 
"INIT", 
"RUNNING", 
"DOWNLOAD", 
"MERGE", 
"FINISHED", 
"FAILED" };
 
   58       for (
unsigned int i = 0; 
i != NSTATES; ++
i) {
 
   59     if (
what == 
name[
i]) { 
return static_cast<Enum
>(
i); }
 
   62       throw std::runtime_error(
"PrunDriver.cxx: Failed to parse job state string"); 
 
   71     enum Enum { DONE=0, PENDING=1, FAIL=2 };
 
   74   struct TransitionRule {
 
   75     JobState::Enum fromState; 
 
   77     JobState::Enum toState; 
 
   78     TransitionRule(JobState::Enum fromState,
 
   80            JobState::Enum toState)
 
   81       : fromState(fromState)
 
   90     TmpCd(
const std::string & 
dir)
 
   93       gSystem->cd(
dir.c_str());
 
  110 static JobState::Enum nextState(JobState::Enum state, Status::Enum 
status)  
 
  114   static const TransitionRule TABLE[] =
 
  119       TransitionRule(
JobState::RUN,      Status::DONE,    JobState::DOWNLOAD),
 
  121       TransitionRule(
JobState::RUN,      Status::FAIL,    JobState::FAILED),
 
  123       TransitionRule(JobState::DOWNLOAD, Status::PENDING, JobState::DOWNLOAD),
 
  124       TransitionRule(JobState::DOWNLOAD, Status::FAIL,    JobState::FAILED),
 
  129   static const unsigned int TABLE_SIZE = 
sizeof(TABLE) / 
sizeof(TABLE[0]);
 
  130   for (
unsigned int i = 0; 
i != TABLE_SIZE; ++
i) {
 
  132       return TABLE[
i].toState;
 
  136   throw std::logic_error(
"PrunDriver.cxx: Missing state transition rule"); 
 
  144   o.
setString(
"nc_cmtConfig", gSystem->ExpandPathName(
"$AnalysisBase_PLATFORM"));
 
  145   o.
setString(
"nc_useAthenaPackages", 
"true");
 
  146   const std::string mergestr = 
"elg_merge jobdef.root %OUT %IN";
 
  151 static bool downloadContainer(
const std::string& 
name, 
 
  152                   const std::string& location)
 
  159     gSystem->Exec(Form(
"mkdir -p %s", location.c_str()));
 
  164       if (
entry.type == 
"CONTAINER" || 
entry.type == 
"DIDType.CONTAINER")
 
  169     for (
const auto& 
result : downloadResult)
 
  171       if (
result.notDownloaded != 0)
 
  183   using namespace EL::msgEventLoop;
 
  187   static bool loaded = 
false;
 
  197   TPython::Bind(
dynamic_cast<TObject*
>(
sample), 
"ELG_SAMPLE"); 
 
  198 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01) 
  200   TPython::Exec(
"_anyresult = ROOT.std.make_any['int'](ELG_prun(ELG_SAMPLE))", &
result);
 
  201   int ret = std::any_cast<int>(
result);
 
  203   int ret = TPython::Eval(
"ELG_prun(ELG_SAMPLE)");
 
  205   TPython::Bind(0, 
"ELG_SAMPLE");   
 
  208     sample->meta()->setString(
"nc_ELG_state_details", 
 
  209                               "problem submitting"); 
 
  213   sample->meta()->setDouble(
"nc_jediTaskID", ret);
 
  226   static bool loaded = 
false;
 
  233   TPython::Bind(
dynamic_cast<TObject*
>(
sample), 
"ELG_SAMPLE");
 
  234 #if ROOT_VERSION_CODE >= ROOT_VERSION(6,33,01) 
  236   TPython::Exec(
"_anyresult = ROOT.std.make_any['int'](ELG_jediState(ELG_SAMPLE))", &
result);
 
  237   int ret = std::any_cast<int>(
result);
 
  239   int ret =  TPython::Eval(
"ELG_jediState(ELG_SAMPLE)");
 
  241   TPython::Bind(0, 
"ELG_SAMPLE");
 
  243   if (ret == Status::DONE) 
return Status::DONE;
 
  244   if (ret == Status::FAIL) 
return Status::FAIL;
 
  247   if (ret != 90) { 
sample->meta()->setString(
"nc_ELG_state_details", 
"task status other than done/finished/failed/running"); }
 
  250     sample->meta()->setString(
"nc_ELG_state_details",
 
  251                               "problem checking jedi task status");
 
  254   return Status::PENDING;
 
  264     std::cout << 
"Downloading output from: "  
  265           << 
sample->name() << 
"..." << std::endl;
 
  270   if (container[container.size()-1] == 
'/') {
 
  271     container.resize(container.size() - 1); 
 
  273   container += 
"_hist/";
 
  275   bool downloadOk = downloadContainer(container, 
"elg/download/" + container);
 
  277   if (not downloadOk) {
 
  278     std::cerr << 
"Failed to download one or more files" << std::endl;
 
  279     sample->meta()->setString(
"nc_ELG_state_details", 
 
  280                               "error, check log for details"); 
 
  281     return Status::PENDING;
 
  293   if (container[container.size()-1] == 
'/') {
 
  294     container.resize(container.size() - 1); 
 
  296   container += 
"_hist/";
 
  297   const std::string 
dir = 
"elg/download/" + container;
 
  299   const std::string 
fileName = 
"hist-output.root";
 
  301   const std::string 
target = Form(
"hist-%s.root", 
sample->name().c_str());
 
  303   const std::string findCmd(Form(
"find %s -name \"*.%s*\" | tr '\n' ' '", 
 
  305   std::istringstream 
input(gSystem->GetFromPipe(findCmd.c_str()).Data()); 
 
  306   std::vector<std::string> 
files((std::istream_iterator<std::string>(
input)),
 
  307                  std::istream_iterator<std::string>());
 
  312   if (not 
files.size()) {
 
  313     std::cerr << 
"Found no input files for merging! " 
  314           << 
"Requeueing sample for download..." << std::endl;
 
  315     sample->meta()->setString(
"nc_ELG_state_details", 
"retry, files were lost"); 
 
  322     sample->meta()->setString(
"nc_ELG_state_details", 
 
  323                               "error, check log for details"); 
 
  324     gSystem->Exec(Form(
"rm -f %s", 
target.c_str()));
 
  325     return Status::PENDING;
 
  328   for (
size_t i = 0; 
i != 
files.size(); ++
i) {
 
  329     gSystem->Exec(Form(
"rm %s", 
files[
i].c_str()));
 
  331   gSystem->Exec(Form(
"rmdir %s/*", 
dir.c_str()));
 
  332   gSystem->Exec(Form(
"rmdir %s", 
dir.c_str()));
 
  341   JobState::Enum state = sampleState(
sample);
 
  343   sample->meta()->setString(
"nc_ELG_state_details", 
"");
 
  345   Status::Enum 
status = Status::PENDING;
 
  353   case JobState::DOWNLOAD: 
 
  360   case JobState::FAILED:
 
  364   state = nextState(state, 
status);
 
  375     if (sampleState(*
s) == state) {
 
  376       workList.push_back([
s]()->
void{ processTask(*
s); });
 
  383 static std::string formatOutputName(
const SH::MetaObject& sampleMeta,
 
  388   using namespace EL::msgEventLoop;
 
  390   static const std::string nickname = 
 
  391     gSystem->GetFromPipe(Form(
"python -c \"%s\" 2>/dev/null", 
 
  392                   "from pandatools import PsubUtils;" 
  393                   "print(PsubUtils.getNickname());")).Data();
 
  398   if (nickname.length()>20){
 
  399     ANA_MSG_WARNING( 
"No proxy available - cannot use nickname yet. Will try a late replacement.");
 
  401     out.ReplaceAll(
"%nickname%", nickname);
 
  409   while(std::getline(
ss, 
item, 
'.')) {
 
  410     std::stringstream sskey;
 
  411     sskey << 
"%in:name[" << ++
field << 
"]%";
 
  412     out.ReplaceAll(sskey.str(), 
item);
 
  414   while (
out.Index(
"%in:") != -1) {
 
  415     int i1 = 
out.Index(
"%in:");
 
  416     int i2 = 
out.Index(
"%", i1+1);
 
  417     TString metaName = 
out(i1+4, i2-i1-4);
 
  418     out.ReplaceAll(
"%in:"+metaName+
"%", 
 
  419            sampleMeta.
castString(std::string(metaName.Data())));  
 
  421   out.ReplaceAll(
"/", 
"");
 
  432   std::string 
out = 
"hist:hist-output.root";
 
  435   while ((
obj = itr())) {
 
  437     const std::string 
name = 
os->label() + 
".root";
 
  438     const std::string 
ds = 
 
  446 static void saveJobDef(
const std::string& 
fileName,
 
  454   file.WriteTObject(&
job.jobConfig(), 
"jobConfig", 
"SingleKey");        
 
  455   file.WriteTObject(&
outputs, 
"outputs", 
"SingleKey");        
 
  456   bool haveDefault = 
false;
 
  462       file.WriteObject (&meta, 
"defaultMetaObject");
 
  474   const std::string outDSSuffix = 
'_' + 
outputLabel + 
".root/"; 
 
  499   using namespace msgEventLoop;
 
  505       const std::string jobELGDir = 
data.submitDir + 
"/elg";
 
  506       const std::string runShFile = jobELGDir + 
"/runjob.sh";
 
  508       const std::string mergeShFile = jobELGDir + 
"/elg_merge";
 
  514       const std::string jobDefFile = jobELGDir + 
"/jobdef.root";
 
  515       gSystem->Exec(Form(
"mkdir -p %s", jobELGDir.c_str()));
 
  516       gSystem->Exec(Form(
"cp %s %s", runShOrig.c_str(), runShFile.c_str()));
 
  517       gSystem->Exec(Form(
"chmod +x %s", runShFile.c_str()));
 
  518       gSystem->Exec(Form(
"cp %s %s", mergeShOrig.c_str(), mergeShFile.c_str()));
 
  519       gSystem->Exec(Form(
"chmod +x %s", mergeShFile.c_str()));
 
  524       if (listToShipToGrid.size()){
 
  526           "Creating symbolic links for additional files or directories to be sent to grid.\n" 
  527           "For root or heavy files you should also add their name (not the full path) to EL::Job::optUserFiles.\n" 
  528           "Otherwise prun ignores those files." 
  531         std::vector<std::string> vect_filesOrDirToShip;
 
  533         boost::split(vect_filesOrDirToShip,listToShipToGrid,boost::is_any_of(
","));
 
  536         for (
const std::string & fileOrDirToShip: vect_filesOrDirToShip){
 
  537           ANA_MSG_INFO ((
"Creating symbolic link for: " +fileOrDirToShip).c_str());
 
  550         std::string outputSampleName = meta.
castString(
"nc_outputSampleName");
 
  551         if (outputSampleName.empty()) {
 
  552           outputSampleName = 
"user.%nickname%.%in:name%";
 
  554         meta.
setString(
"nc_outDS", formatOutputName(meta, outputSampleName));
 
  556         meta.
setString(
"nc_writeInputToTxt", 
"IN:input.txt");
 
  558         const std::string execstr = 
"runjob.sh " + (*s)->name();
 
  560         meta.
setString(
"nc_framework", 
"EventLoopGrid");
 
  563       saveJobDef(jobDefFile, *
data.job, 
sh);
 
  568         shOut.
save(
data.submitDir + 
"/output-" + 
out->label());
 
  571       shHist.
save(
data.submitDir + 
"/output-hist");
 
  577       sh.save(
data.submitDir + 
"/input");
 
  578       data.submitted = 
true;
 
  591   return ::StatusCode::SUCCESS;
 
  599   TmpCd tmpDir(
data.submitDir);
 
  605   const size_t nRunThreads = 
options()->castDouble(
"nc_run_threads", 0); 
 
  606   const size_t nDlThreads = 
options()->castDouble(
"nc_download_threads", 0); 
 
  609   processAllInState(
sh, JobState::DOWNLOAD, nDlThreads); 
 
  614   std::cout << std::endl;
 
  618     JobState::Enum state = sampleState(*
s);
 
  622     std::cout << (*s)->name() << 
"\t"; 
 
  626     case JobState::DOWNLOAD:
 
  631       std::cout << 
"\033[1;32m" << 
JobState::name[state] << 
"\033[0m\t";
 
  633     case JobState::FAILED:
 
  634       std::cout << 
"\033[1;31m" << 
JobState::name[state] << 
"\033[0m\t";
 
  637     std::cout << 
details << std::endl;
 
  642   std::cout << std::endl;
 
  644   data.retrieved = 
true;
 
  645   data.completed = allDone;
 
  646   return ::StatusCode::SUCCESS;
 
  652   TmpCd tmpDir(location);
 
  659     JobState::Enum state = sampleState(*
s);
 
  663           << 
"\t" << 
details << std::endl;
 
  668                   const std::string& task,
 
  669                   const std::string& state)
 
  674   TmpCd tmpDir(location);
 
  678   if (not 
sh.get(task)) {
 
  679     std::cout << 
"Unknown task: " << task << std::endl;
 
  680     std::cout << 
"Choose one of: " << std::endl;
 
  685   sh.get(task)->meta()->setString(
"nc_ELG_state", state);