ATLAS Offline Software
Functions
LocalDriver.cxx File Reference
#include <EventLoop/LocalDriver.h>
#include <sstream>
#include <TSystem.h>
#include <AsgMessaging/StatusCode.h>
#include <EventLoop/Job.h>
#include <EventLoop/ManagerData.h>
#include <EventLoop/MessageCheck.h>
#include <RootCoreUtils/Assert.h>
#include <RootCoreUtils/ShellExec.h>
#include <RootCoreUtils/ThrowMsg.h>
#include <mutex>
#include <thread>

Go to the source code of this file.

Functions

 ClassImp (EL::LocalDriver) namespace EL
 

Function Documentation

◆ ClassImp()

ClassImp ( EL::LocalDriver  )
Author
Nils Krumnack

Definition at line 30 of file LocalDriver.cxx.

33 {
34  void LocalDriver ::
35  testInvariant () const
36  {
37  RCU_INVARIANT (this != 0);
38  }
39 
40 
41 
42  LocalDriver ::
43  LocalDriver ()
44  {
45  RCU_NEW_INVARIANT (this);
46  }
47 
48 
49 
50  ::StatusCode LocalDriver ::
51  doManagerStep (Detail::ManagerData& data) const
52  {
53  RCU_READ_INVARIANT (this);
54  using namespace msgEventLoop;
55  ANA_CHECK (BatchDriver::doManagerStep (data));
56  switch (data.step)
57  {
58  case Detail::ManagerStep::batchScriptVar:
59  {
60  data.batchSkipReleaseSetup = true;
61  }
62  break;
63 
64  case Detail::ManagerStep::submitJob:
65  case Detail::ManagerStep::doResubmit:
66  {
67  // safely ignoring: resubmit
68 
69  const std::string dockerImage {
70  data.options.castString(Job::optDockerImage)};
71  const std::string dockerOptions {
72  data.options.castString(Job::optDockerOptions)};
73  int numParallelProcs
74  = data.options.castDouble (Job::optNumParallelProcs, 1);
75  if (numParallelProcs < 0)
76  {
77  ANA_MSG_ERROR ("invalid number of parallel processes: " << numParallelProcs);
78  return StatusCode::FAILURE;
79  }
80 
81  std::ostringstream basedirName;
82  basedirName << data.submitDir << "/tmp";
83  if (!data.resubmit)
84  {
85  if (gSystem->MakeDirectory (basedirName.str().c_str()) != 0)
86  RCU_THROW_MSG ("failed to create directory " + basedirName.str());
87  }
88  auto submitSingle = [&] (std::size_t index) noexcept -> StatusCode
89  {
90  try
91  {
92  std::ostringstream dirName;
93  dirName << basedirName.str() << "/" << index;
94  if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
95  {
96  ANA_MSG_ERROR ("failed to create directory " + dirName.str());
97  return StatusCode::FAILURE;
98  }
99 
100  std::ostringstream cmd;
101  cmd << "cd " << dirName.str() << " && ";
102  if (!dockerImage.empty())
103  cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
104  cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
105  RCU::Shell::exec (cmd.str());
106  } catch (std::exception& e)
107  {
108  ANA_MSG_ERROR ("exception in job " << index << ": " << e.what());
109  return StatusCode::FAILURE;
110  }
111  return StatusCode::SUCCESS;
112  };
113  if (numParallelProcs == 1)
114  {
115  for (std::size_t index : data.batchJobIndices)
116  {
117  if (submitSingle (index).isFailure())
118  return StatusCode::FAILURE;
119  }
120  } else
121  {
122  if (numParallelProcs == 0)
123  numParallelProcs = std::thread::hardware_concurrency();
124  if (numParallelProcs > int (data.batchJobIndices.size()))
125  numParallelProcs = data.batchJobIndices.size();
126  std::vector<std::thread> threads;
128  auto indexIter = data.batchJobIndices.begin();
129  bool abort = false;
130  while (threads.size() < unsigned (numParallelProcs))
131  {
132  threads.emplace_back ([&] () noexcept
133  {
134  std::unique_lock<std::mutex> lock (mutex);
135  while (indexIter != data.batchJobIndices.end() && !abort)
136  {
137  auto myindex = *indexIter;
138  ++ indexIter;
139  lock.unlock();
140  if (submitSingle (myindex).isFailure())
141  {
142  abort = true;
143  return;
144  }
145  lock.lock ();
146  }
147  });
148  }
149  for (auto& thread : threads)
150  thread.join();
151  if (abort)
152  return StatusCode::FAILURE;
153  }
154  data.submitted = true;
155  }
156  break;
157 
158  default:
159  break;
160  }
161  return ::StatusCode::SUCCESS;
162  }
163 }
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
Run3DQTestingDriver.threads
threads
Definition: Run3DQTestingDriver.py:34
index
Definition: index.py:1
BeamSpot::mutex
std::mutex mutex
Definition: InDetBeamSpotVertex.cxx:18
rerun_display.cmd
string cmd
Definition: rerun_display.py:67
ANA_MSG_ERROR
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:294
ANA_CHECK
#define ANA_CHECK(EXP)
check whether the given expression was successful
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:324
xAOD::unsigned
unsigned
Definition: RingSetConf_v1.cxx:662
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
calibdata.exception
exception
Definition: calibdata.py:496
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
DeMoScan.index
string index
Definition: DeMoScan.py:364
RCU::Shell::exec
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition: ShellExec.cxx:29
RCU_THROW_MSG
#define RCU_THROW_MSG(message)
Definition: PrintMsg.h:58
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
RCU::Shell::quote
std::string quote(const std::string &name)
effects: quote the given name to protect it from the shell returns: the quoted name guarantee: strong...
Definition: ShellExec.cxx:75
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233