ATLAS Offline Software
Loading...
Searching...
No Matches
LocalDriver.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7
8//
9// includes
10//
11
13
14#include <sstream>
15#include <TSystem.h>
17#include <EventLoop/Job.h>
23#include <mutex>
24#include <thread>
25
26//
27// method implementations
28//
29
31
32namespace EL
33{
34 void LocalDriver ::
35 testInvariant () const
36 {
37 RCU_INVARIANT (this != 0);
38 }
39
40
41
42 LocalDriver ::
43 LocalDriver ()
44 {
45 RCU_NEW_INVARIANT (this);
46 }
47
48
49
50 ::StatusCode LocalDriver ::
51 doManagerStep (Detail::ManagerData& data) const
52 {
53 RCU_READ_INVARIANT (this);
54 using namespace msgEventLoop;
56 switch (data.step)
57 {
59 {
60 data.batchSkipReleaseSetup = true;
61 }
62 break;
63
66 {
67 // safely ignoring: resubmit
68
69 const std::string dockerImage {
70 data.options.castString(Job::optDockerImage)};
71 const std::string dockerOptions {
72 data.options.castString(Job::optDockerOptions)};
73 int numParallelProcs
74 = data.options.castDouble (Job::optNumParallelProcs, 1);
75 if (numParallelProcs < 0)
76 {
77 ANA_MSG_ERROR ("invalid number of parallel processes: " << numParallelProcs);
78 return StatusCode::FAILURE;
79 }
80
81 std::ostringstream basedirName;
82 basedirName << data.submitDir << "/tmp";
83 if (!data.resubmit)
84 {
85 if (gSystem->MakeDirectory (basedirName.str().c_str()) != 0)
86 RCU_THROW_MSG ("failed to create directory " + basedirName.str());
87 }
88 auto submitSingle = [&] (std::size_t index) noexcept -> StatusCode
89 {
90 try
91 {
92 std::ostringstream dirName;
93 dirName << basedirName.str() << "/" << index;
94 if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
95 {
96 ANA_MSG_ERROR ("failed to create directory " + dirName.str());
97 return StatusCode::FAILURE;
98 }
99
100 std::ostringstream cmd;
101 cmd << "cd " << dirName.str() << " && ";
102 if (!dockerImage.empty())
103 cmd << "docker run --rm -v " << RCU::Shell::quote (data.submitDir) << ":" << RCU::Shell::quote (data.submitDir) << " " << dockerOptions << " " << dockerImage << " ";
104 cmd << RCU::Shell::quote (data.submitDir) << "/submit/run " << index;
105 RCU::Shell::exec (cmd.str());
106 } catch (std::exception& e)
107 {
108 ANA_MSG_ERROR ("exception in job " << index << ": " << e.what());
109 return StatusCode::FAILURE;
110 }
111 return StatusCode::SUCCESS;
112 };
113 if (numParallelProcs == 1)
114 {
115 for (std::size_t index : data.batchJobIndices)
116 {
117 if (submitSingle (index).isFailure())
118 return StatusCode::FAILURE;
119 }
120 } else
121 {
122 if (numParallelProcs == 0)
123 numParallelProcs = std::thread::hardware_concurrency();
124 if (numParallelProcs > int (data.batchJobIndices.size()))
125 numParallelProcs = data.batchJobIndices.size();
126 std::vector<std::thread> threads;
127 std::mutex mutex;
128 auto indexIter = data.batchJobIndices.begin();
129 bool abort = false;
130 while (threads.size() < unsigned (numParallelProcs))
131 {
132 threads.emplace_back ([&] () noexcept
133 {
134 std::unique_lock<std::mutex> lock (mutex);
135 while (indexIter != data.batchJobIndices.end() && !abort)
136 {
137 auto myindex = *indexIter;
138 ++ indexIter;
139 lock.unlock();
140 if (submitSingle (myindex).isFailure())
141 {
142 abort = true;
143 return;
144 }
145 lock.lock ();
146 }
147 });
148 }
149 for (auto& thread : threads)
150 thread.join();
151 if (abort)
152 return StatusCode::FAILURE;
153 }
154 data.submitted = true;
155 }
156 break;
157
158 default:
159 break;
160 }
161 return ::StatusCode::SUCCESS;
162 }
163}
#define RCU_INVARIANT(x)
Definition Assert.h:201
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
ClassImp(EL::LocalDriver) namespace EL
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:58
virtual::StatusCode doManagerStep(Detail::ManagerData &data) const override
static const std::string optDockerOptions
any extra options we may want to pass to docker
Definition Job.h:544
static const std::string optNumParallelProcs
the option to specify the number of parallel jobs in LocalDriver (0 = number of hardware cores) (defa...
Definition Job.h:452
static const std::string optDockerImage
this is the name of the docker image, when using docker with a supported batch driver
Definition Job.h:541
a Driver for running batch jobs locally for testing purposes
Definition LocalDriver.h:26
STL class.
@ doResubmit
call the actual doResubmit method
@ submitJob
do the actual job submission
Definition ManagerStep.h:92
@ batchScriptVar
create the variables needed for the batch-run script
Definition ManagerStep.h:83
This module defines the arguments passed from the BATCH driver to the BATCH worker.
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
Definition ShellExec.cxx:29
std::string quote(const std::string &name)
effects: quote the given name to protect it from the shell returns: the quoted name guarantee: strong...
Definition ShellExec.cxx:75
Definition index.py:1