35 testInvariant ()
const
50 ::StatusCode LocalDriver ::
51 doManagerStep (Detail::ManagerData&
data)
const
54 using namespace msgEventLoop;
60 data.batchSkipReleaseSetup =
true;
69 const std::string dockerImage {
71 const std::string dockerOptions {
75 if (numParallelProcs < 0)
77 ANA_MSG_ERROR (
"invalid number of parallel processes: " << numParallelProcs);
78 return StatusCode::FAILURE;
81 std::ostringstream basedirName;
82 basedirName <<
data.submitDir <<
"/tmp";
85 if (gSystem->MakeDirectory (basedirName.str().c_str()) != 0)
86 RCU_THROW_MSG (
"failed to create directory " + basedirName.str());
88 auto submitSingle = [&] (std::size_t
index)
noexcept -> StatusCode
92 std::ostringstream dirName;
93 dirName << basedirName.str() <<
"/" <<
index;
94 if (gSystem->MakeDirectory (dirName.str().c_str()) != 0)
96 ANA_MSG_ERROR (
"failed to create directory " + dirName.str());
97 return StatusCode::FAILURE;
100 std::ostringstream cmd;
101 cmd <<
"cd " << dirName.str() <<
" && ";
102 if (!dockerImage.empty())
106 }
catch (std::exception& e)
109 return StatusCode::FAILURE;
111 return StatusCode::SUCCESS;
113 if (numParallelProcs == 1)
115 for (std::size_t
index :
data.batchJobIndices)
117 if (submitSingle (
index).isFailure())
118 return StatusCode::FAILURE;
122 if (numParallelProcs == 0)
123 numParallelProcs = std::thread::hardware_concurrency();
124 if (numParallelProcs >
int (
data.batchJobIndices.size()))
125 numParallelProcs =
data.batchJobIndices.size();
126 std::vector<std::thread> threads;
128 auto indexIter =
data.batchJobIndices.begin();
130 while (threads.size() < unsigned (numParallelProcs))
132 threads.emplace_back ([&] ()
noexcept
134 std::unique_lock<std::mutex> lock (
mutex);
135 while (indexIter !=
data.batchJobIndices.end() && !abort)
137 auto myindex = *indexIter;
140 if (submitSingle (myindex).isFailure())
149 for (
auto& thread : threads)
152 return StatusCode::FAILURE;
154 data.submitted =
true;
161 return ::StatusCode::SUCCESS;
#define RCU_NEW_INVARIANT(x)
#define RCU_READ_INVARIANT(x)
char data[hepevt_bytes_allocation_ATLAS]
ClassImp(EL::LocalDriver) namespace EL
#define RCU_THROW_MSG(message)
virtual::StatusCode doManagerStep(Detail::ManagerData &data) const override
static const std::string optDockerOptions
any extra options we may want to pass to docker
static const std::string optNumParallelProcs
the option to specify the number of parallel jobs in LocalDriver (0 = number of hardware cores) (defa...
static const std::string optDockerImage
this is the name of the docker image, when using docker with a supported batch driver
a Driver for running batch jobs locally for testing purposes
@ doResubmit
call the actual doResubmit method
@ submitJob
do the actual job submission
@ batchScriptVar
create the variables needed for the batch-run script
This module defines the arguments passed from the BATCH driver to the BATCH worker.
void exec(const std::string &cmd)
effects: execute the given command guarantee: strong failures: out of memory II failures: system fail...
std::string quote(const std::string &name)
effects: quote the given name to protect it from the shell returns: the quoted name guarantee: strong...