ATLAS Offline Software
LocalDriver.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 
8 //
9 // includes
10 //
11 
12 #include <EventLoop/LocalDriver.h>
13 
14 #include <sstream>
15 #include <TSystem.h>
17 #include <EventLoop/Job.h>
18 #include <EventLoop/ManagerData.h>
19 #include <EventLoop/MessageCheck.h>
20 #include <RootCoreUtils/Assert.h>
22 #include <RootCoreUtils/ThrowMsg.h>
23 #include <mutex>
24 #include <thread>
25 
26 //
27 // method implementations
28 //
29 
31 
32 namespace EL
33 {
34  void LocalDriver ::
35  testInvariant () const
36  {
37  RCU_INVARIANT (this != 0);
38  }
39 
40 
41 
43  LocalDriver ()
44  {
45  RCU_NEW_INVARIANT (this);
46  }
47 
48 
49 
51  doManagerStep (Detail::ManagerData& data) const
52  {
53  RCU_READ_INVARIANT (this);
54  using namespace msgEventLoop;
56  switch (data.step)
57  {
59  {
60  data.batchSkipReleaseSetup = true;
61  }
62  break;
63 
66  {
67  // safely ignoring: resubmit
68 
69  const std::string dockerImage {
70  data.options.castString(Job::optDockerImage)};
71  const std::string dockerOptions {
72  data.options.castString(Job::optDockerOptions)};
73  int numParallelProcs
74  = data.options.castDouble (Job::optNumParallelProcs, 1);
75  if (numParallelProcs < 0)
76  {
77  ANA_MSG_ERROR ("invalid number of parallel processes: " << numParallelProcs);
78  return StatusCode::FAILURE;
79  }
80 
81  std::ostringstream basedirName;
82  basedirName << data.submitDir << "/tmp";
83  if (!data.resubmit)
84  {
85  if (gSystem->MakeDirectory (basedirName.str().c_str()) != 0)
86  RCU_THROW_MSG ("failed to create directory " + basedirName.str());
87  }
88  auto submitSingle = [&] (std::size_t index) noexcept -> StatusCode
89  {
90  try
91  {
92  std::ostringstream dirName;
93  dirName << basedirName.str() << "/" << index;
94  if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
95  {
96  ANA_MSG_ERROR ("failed to create directory " + dirName.str());
97  return StatusCode::FAILURE;
98  }
99 
100  std::ostringstream cmd;
101  cmd << "cd " << dirName.str() << " && ";
102  if (!dockerImage.empty())
103  cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
104  cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
105  RCU::Shell::exec (cmd.str());
106  } catch (std::exception& e)
107  {
108  ANA_MSG_ERROR ("exception in job " << index << ": " << e.what());
109  return StatusCode::FAILURE;
110  }
111  return StatusCode::SUCCESS;
112  };
113  if (numParallelProcs == 1)
114  {
115  for (std::size_t index : data.batchJobIndices)
116  {
117  if (submitSingle (index).isFailure())
118  return StatusCode::FAILURE;
119  }
120  } else
121  {
122  if (numParallelProcs == 0)
123  numParallelProcs = std::thread::hardware_concurrency();
124  if (numParallelProcs > int (data.batchJobIndices.size()))
125  numParallelProcs = data.batchJobIndices.size();
126  std::vector<std::thread> threads;
128  auto indexIter = data.batchJobIndices.begin();
129  bool abort = false;
130  while (threads.size() < unsigned (numParallelProcs))
131  {
132  threads.emplace_back ([&] () noexcept
133  {
134  std::unique_lock<std::mutex> lock (mutex);
135  while (indexIter != data.batchJobIndices.end() && !abort)
136  {
137  auto myindex = *indexIter;
138  ++ indexIter;
139  lock.unlock();
140  if (submitSingle (myindex).isFailure())
141  {
142  abort = true;
143  return;
144  }
145  lock.lock ();
146  }
147  });
148  }
149  for (auto& thread : threads)
150  thread.join();
151  if (abort)
152  return StatusCode::FAILURE;
153  }
154  data.submitted = true;
155  }
156  break;
157 
158  default:
159  break;
160  }
161  return ::StatusCode::SUCCESS;
162  }
163 }
EL::LocalDriver::doManagerStep
virtual ::StatusCode doManagerStep(Detail::ManagerData &data) const override
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
EL::LocalDriver::LocalDriver
LocalDriver()
effects: standard default constructor guarantee: strong failures: low level errors I
EL::Detail::ManagerStep::batchScriptVar
@ batchScriptVar
create the variables needed for the batch-run script
EL::Job::optDockerImage
static const std::string optDockerImage
this is the name of the docker image, when using docker with a supported batch driver
Definition: Job.h:525
Run3DQTestingDriver.threads
threads
Definition: Run3DQTestingDriver.py:34
EL::BatchDriver::doManagerStep
virtual ::StatusCode doManagerStep(Detail::ManagerData &data) const override
index
Definition: index.py:1
BeamSpot::mutex
std::mutex mutex
Definition: InDetBeamSpotVertex.cxx:18
rerun_display.cmd
string cmd
Definition: rerun_display.py:67
Job.h
ANA_MSG_ERROR
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:294
ANA_CHECK
#define ANA_CHECK(EXP)
check whether the given expression was successful
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:324
ShellExec.h
Assert.h
MessageCheck.h
xAOD::unsigned
unsigned
Definition: RingSetConf_v1.cxx:662
ClassImp
ClassImp(EL::LocalDriver) namespace EL
Definition: LocalDriver.cxx:30
EL::Detail::ManagerStep::doResubmit
@ doResubmit
call the actual doResubmit method
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ManagerData.h
calibdata.exception
exception
Definition: calibdata.py:496
EL
This module defines the arguments passed from the BATCH driver to the BATCH worker.
Definition: AlgorithmWorkerData.h:24
EL::Job::optNumParallelProcs
static const std::string optNumParallelProcs
the option to specify the number of parallel jobs in LocalDriver (0 = number of hardware cores) (defa...
Definition: Job.h:443
EL::Job::optDockerOptions
static const std::string optDockerOptions
any extra options we may want to pass to docker
Definition: Job.h:528
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
StatusCode.h
ThrowMsg.h
EL::LocalDriver
a Driver for running batch jobs locally for testing purposes
Definition: LocalDriver.h:26
DeMoScan.index
string index
Definition: DeMoScan.py:364
EL::LocalDriver::testInvariant
void testInvariant() const
effects: test the invariant of this object guarantee: no-fail
RCU::Shell::exec
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition: ShellExec.cxx:29
RCU_THROW_MSG
#define RCU_THROW_MSG(message)
Definition: PrintMsg.h:58
LocalDriver.h
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
RCU::Shell::quote
std::string quote(const std::string &name)
effects: quote the given name to protect it from the shell returns: the quoted name guarantee: strong...
Definition: ShellExec.cxx:75
EL::Detail::ManagerStep::submitJob
@ submitJob
do the actual job submission
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233