43#include <TObjString.h>
56 StatusCode make_module (std::unique_ptr<Detail::Module>& module, asg::AsgComponentConfig config)
58 using namespace msgEventLoop;
60 ANA_CHECK (
config.makeComponentExpert (module,
"new %1% (\"%2%\")",
false,
"ELModule."));
62 return StatusCode::SUCCESS;
69 testInvariant ()
const
72 for (std::size_t iter = 0, end =
m_algs.size(); iter != end; ++ iter)
89 addOutput (TObject *output_swallow)
91 std::unique_ptr<TObject> output (output_swallow);
97 ModuleData::addOutput (std::move (output));
103 addOutputList (
const std::string& name, TObject *output_swallow)
105 std::unique_ptr<TObject> output (output_swallow);
111 std::unique_ptr<TList> list (
new TList);
112 list->SetName (name.c_str());
113 list->Add (output.release());
120 getOutputHist (
const std::string& name)
const
132 getOutputFile (
const std::string&
label)
const
144 getOutputFileNull (
const std::string&
label)
const
150 return iter->second->file();
156 addTree(
const TTree&
tree,
const std::string& stream )
158 using namespace msgEventLoop;
161 auto outputIter =
m_outputs.find (stream);
164 ANA_MSG_ERROR (
"No output file with stream name \"" + stream +
166 return ::StatusCode::FAILURE;
169 outputIter->second->addClone (
tree);
172 return ::StatusCode::SUCCESS;
178 getOutputTree(
const std::string& name,
const std::string& stream )
const
180 using namespace msgEventLoop;
183 auto outputIter =
m_outputs.find (stream);
186 RCU_THROW_MSG (
"No output file with stream name \"" + stream
190 TTree *
result = outputIter->second->getOutputTree( name );
192 RCU_THROW_MSG (
"No tree with name \"" + name +
"\" in stream \"" +
236 std::string Worker ::
237 inputFileName ()
const
240 std::string path =
inputFile()->GetName();
241 auto split = path.rfind (
'/');
242 if (
split != std::string::npos)
243 return path.substr (
split + 1);
251 triggerConfig ()
const
254 return dynamic_cast<TTree*
>(
inputFile()->Get(
"physicsMeta/TrigConfTree"));
284 getAlg (
const std::string& name)
const
289 if (alg->hasName (name))
290 return alg.m_algorithm->getLegacyAlg();
307 filterPassed ()
const noexcept
316 setFilterPassed (
bool val_filterPassed)
noexcept
346 setOutputHist (
const std::string& val_outputTarget)
356 setSegmentName (
const std::string& val_segmentName)
369 for (std::unique_ptr<IAlgorithmWrapper>& alg : jobConfig.extractAlgorithms())
371 m_algs.push_back (std::move (alg));
380 using namespace msgEventLoop;
388 m_moduleConfig.emplace_back (
"EL::Detail::MemoryMonitorModule/EarlyMemoryMonitorModule");
391 m_moduleConfig.emplace_back (
"EL::Detail::TreeCacheModule/TreeCacheModule");
398 m_moduleConfig.emplace_back (
"EL::Detail::TEventModule/TEventModule");
406 if (!factoryPreload.empty())
408 m_moduleConfig.emplace_back (
"EL::Detail::FactoryPreloadModule/FactoryPreloadModule");
411 m_moduleConfig.emplace_back (
"EL::Detail::LeakCheckModule/LeakCheckModule");
417 m_moduleConfig.emplace_back (
"EL::Detail::StopwatchModule/StopwatchModule");
419 m_moduleConfig.emplace_back (
"EL::Detail::GridReportingModule/GridReportingModule");
421 m_moduleConfig.emplace_back (
"EL::Detail::AlgorithmTimerModule/AlgorithmTimerModule");
423 m_moduleConfig.emplace_back (
"EL::Detail::AlgorithmMemoryModule/AlgorithmMemoryModule");
424 m_moduleConfig.emplace_back (
"EL::Detail::FileExecutedModule/FileExecutedModule");
425 m_moduleConfig.emplace_back (
"EL::Detail::EventCountModule/EventCountModule");
426 m_moduleConfig.emplace_back (
"EL::Detail::WorkerConfigModule/WorkerConfigModule");
427 m_moduleConfig.emplace_back (
"EL::Detail::AlgorithmStateModule/AlgorithmStateModule");
428 m_moduleConfig.emplace_back (
"EL::Detail::PostClosedOutputsModule/PostClosedOutputsModule");
430 m_moduleConfig.emplace_back (
"EL::Detail::MemoryMonitorModule/LateMemoryMonitorModule");
434 std::unique_ptr<Detail::Module> module;
436 m_modules.push_back (std::move (module));
450 std::istringstream iss (aliases);
452 while (std::getline (iss, alias,
','))
454 auto pos = alias.find (
'=');
455 if (pos == std::string::npos)
458 return ::StatusCode::FAILURE;
460 auto aliasName = alias.substr (0, pos);
461 auto realName = alias.substr (pos + 1);
462 auto realOutput =
m_outputs.find (realName);
465 ANA_MSG_ERROR (
"output stream " << realName <<
" not found for alias " << aliasName);
466 return ::StatusCode::FAILURE;
468 auto [aliasOutput, success] =
m_outputs.emplace (aliasName, realOutput->second);
471 ANA_MSG_ERROR (
"output stream " << aliasName <<
" already exists, can't make alias");
472 return ::StatusCode::FAILURE;
478 (
"EventLoop_JobStats",
"EventLoop job statistics");
481 ANA_MSG_INFO (
"calling firstInitialize on all modules");
483 ANA_CHECK (module->firstInitialize (*
this));
484 ANA_MSG_INFO (
"calling preFileInitialize on all modules");
486 ANA_CHECK (module->preFileInitialize (*
this));
488 return ::StatusCode::SUCCESS;
496 using namespace msgEventLoop;
501 ANA_CHECK (module->processInputs (*
this, *
this));
502 return ::StatusCode::SUCCESS;
510 using namespace msgEventLoop;
517 return StatusCode::FAILURE;
527 output.second->saveOutput ();
528 output.second->close ();
529 std::string path = output.second->finalFileName ();
531 addOutputList (
"EventLoop_OutputStream_" + output.first,
new TObjString (path.c_str()));
535 ANA_CHECK (module->postFinalize (*
this));
536 if (
m_jobStats->GetListOfBranches()->GetEntries() > 0)
541 return ::StatusCode::FAILURE;
543 ModuleData::addOutput (std::move (
m_jobStats));
552 ANA_CHECK (module->postFileClose(*
this));
555 return ::StatusCode::SUCCESS;
563 using namespace msgEventLoop;
575 return ::StatusCode::FAILURE;
583 return ::StatusCode::FAILURE;
591 ANA_CHECK (module->onInitialize (*
this));
599 ANA_CHECK (module->onNewInputFile (*
this));
605 ANA_CHECK (module->onFileExecute (*
this));
617 if (module->onExecute (*this).isFailure())
620 return ::StatusCode::FAILURE;
627 ANA_CHECK (module->postFirstEvent (*
this));
633 return ::StatusCode::SUCCESS;
639 fileOpenErrorFilter(
int level,
bool ,
const char* s1,
const char * s2)
642 if (strstr (s2,
"no streamer or dictionary") !=
nullptr) {
647 if( level > kWarning ) {
649 std::string
msg =
"ROOT error detected in Worker.cxx: ";
653 throw std::runtime_error(
msg);
664 openInputFile (
const std::string& inputFileUrl)
666 using namespace msgEventLoop;
674 return ::StatusCode::SUCCESS;
681 ANA_CHECK (module->onCloseInputFile (*
this));
683 ANA_CHECK (module->postCloseInputFile (*
this));
691 if (inputFileUrl.empty())
692 return ::StatusCode::SUCCESS;
707 module->reportInputFailure (*this);
708 return ::StatusCode::FAILURE;
714 module->reportInputFailure (*this);
715 return ::StatusCode::FAILURE;
719 const std::string treeName
721 tree =
dynamic_cast<TTree*
>(
inputFile->Get (treeName.c_str()));
724 ANA_MSG_INFO (
"tree " << treeName <<
" not found in input file: " << inputFileUrl);
725 ANA_MSG_INFO (
"treating this like a tree with no events");
734 return ::StatusCode::SUCCESS;
740 addOutputStream (
const std::string&
label,
743 using namespace msgEventLoop;
749 return ::StatusCode::FAILURE;
751 if (
data.file() ==
nullptr)
753 ANA_MSG_ERROR (
"output stream does not have a file attached");
754 return ::StatusCode::FAILURE;
756 if (
data.mainStreamName().empty())
758 m_outputs.insert (std::make_pair (
label, std::make_shared<Detail::OutputStreamData>(std::move (
data))));
759 return ::StatusCode::SUCCESS;
765 inputFileNumEntries ()
const
779 eventsProcessed ()
const noexcept
791 using namespace msgEventLoop;
795 meta.fetchDefaults (options);
806 end = job.outputEnd(); out != end; ++ out)
809 out->output()->makeWriter (sample->name(),
"",
".root")};
814 m_moduleConfig.emplace_back (
"EL::Detail::DirectInputModule/DirectInputModule");
827 return ::StatusCode::SUCCESS;
833 batchExecute (
unsigned job_id,
const char *confFile)
835 using namespace msgEventLoop;
840 std::unique_ptr<TFile>
file (TFile::Open (confFile,
"READ"));
841 if (
file.get() ==
nullptr ||
file->IsZombie())
844 return ::StatusCode::FAILURE;
847 std::unique_ptr<BatchJob> job (
dynamic_cast<BatchJob*
>(
file->Get (
"job")));
849 if (job.get() ==
nullptr)
852 return ::StatusCode::FAILURE;
855 if (job_id >= job->segments.size())
857 ANA_MSG_ERROR (
"invalid job-id " << job_id <<
", max is " << job->segments.size());
858 return ::StatusCode::FAILURE;
865 gSystem->Exec (
"pwd");
866 gSystem->MakeDirectory (
"output");
875 end = job->job.outputEnd(); out != end; ++ out)
883 m_moduleConfig.emplace_back (
"EL::Detail::BatchInputModule/BatchInputModule");
894 std::ostringstream job_name;
896 std::ofstream completed ((job->location +
"/status/completed-" + job_name.str()).c_str());
897 return ::StatusCode::SUCCESS;
901 return ::StatusCode::FAILURE;
908 gridExecute (
const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
910 using namespace msgEventLoop;
913 ANA_MSG_INFO (
"Running with ROOT version " << gROOT->GetVersion()
914 <<
" (" << gROOT->GetVersionDate() <<
")");
920 std::unique_ptr<JobConfig> jobConfig;
923 std::unique_ptr<TFile> f (TFile::Open(
"jobdef.root"));
924 if (f ==
nullptr || f->IsZombie()) {
926 return ::StatusCode::FAILURE;
934 return ::StatusCode::FAILURE;
937 jobConfig.reset (
dynamic_cast<JobConfig*
>(f->Get(
"jobConfig")));
938 if (jobConfig ==
nullptr)
941 return ::StatusCode::FAILURE;
945 std::unique_ptr<TList> outs ((TList*)f->Get(
"outputs"));
949 return ::StatusCode::FAILURE;
952 TIter itr(outs.get());
954 while ((obj = itr())) {
960 ANA_MSG_ERROR (
"Encountered unexpected entry in list of outputs");
961 return ::StatusCode::FAILURE;
969 const std::string location =
".";
979 TIter itr(&bigOutputs);
981 while ((obj = itr())) {
986 return ::StatusCode::FAILURE;
990 location +
"/" + os->label() +
".root",
"RECREATE"};
999 std::vector<std::string> fileList;
1000 std::ifstream infile(
"input.txt");
1003 if (!getline(infile, sLine))
break;
1004 std::istringstream ssLine(sLine);
1007 if (!getline(ssLine, sFile,
','))
break;
1008 fileList.push_back(sFile);
1011 if (fileList.size() == 0) {
1016 m_moduleConfig.emplace_back (
"EL::Detail::DirectInputModule/DirectInputModule");
1019 if (nEventsPerJob != -1)
1021 if (SkipEvents != 0)
1035 return ::StatusCode::SUCCESS;
#define RCU_DESTROY_INVARIANT(x)
#define RCU_CHANGE_INVARIANT(x)
#define RCU_NEW_INVARIANT(x)
#define RCU_REQUIRE_SOFT(x)
#define RCU_READ_INVARIANT(x)
char data[hepevt_bytes_allocation_ATLAS]
#define RCU_THROW_MSG(message)
Run a MT piece of code with an alternate root error handler.
all data needed to manage a given output stream
the job configuration that is independent of driver and dataset
static const std::string optPrintPerFileStats
description: the option to turn on printing of i/o statistics at the end of each file rationale: whil...
static const std::string optMemFailOnLeak
Failure behaviour of the code when a "significant memory leak" is found.
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
static const std::string optMemResidentIncreaseLimit
The minimal resident memory increase necessary to trigger an error.
static const std::string optXAODPerfStats
description: the name of the option for turning on XAODPerfStats.
const OutputStream * outputIter
static const std::string optXAODSummaryReport
the option to turn on/off the xAOD summary reporting at the end of the job
static const std::string optXaodAccessMode
description: the option to select the access mode for xAODs.
static const std::string optCacheLearnEntries
description: this option allows to configure the number of tree entries used for learning cache behav...
static const std::string optCacheSize
description: this option allows to configure the TTreeCache size for this job.
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
static const std::string optXAODInput
the option to select whether our input is xAODs
static const std::string optMemResidentPerEventIncreaseLimit
The minimal per-event resident memory increase for triggering an error.
static const std::string optMemVirtualIncreaseLimit
The minimal virtual memory increase necessary to trigger an error.
static const std::string optMemVirtualPerEventIncreaseLimit
The minimal per-event virtual memory increase for triggering an error.
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
static const std::string optStreamAliases
an option for stream aliases
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
static const std::string optOtherMetaDataTreeNamePattern
Pattern for other MetaData tree name in input xAODs Can be useful for augmented file reading or exclu...
static const std::string histogramStreamName
the name of the histogram output stream
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
std::vector< asg::AsgComponentConfig > m_moduleConfig
the module configurations we use
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
std::string m_segmentName
the name of the segment we are processing
::StatusCode finalize()
finalize the worker
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
bool m_newInputFile
whether this is a new input file (i.e.
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
bool m_algorithmsInitialized
whether the algorithms are initialized
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
::StatusCode initialize()
initialize the worker
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
void setSegmentName(const std::string &val_segmentName)
set the segment name
Long64_t inputFileNumEntries() const override
the number of events in the input file
::StatusCode processInputs()
process all the inputs
bool m_firstEvent
whether we are still to process the first event
std::string m_outputTarget
the target file to which we will write the histogram output
Run a MT piece of code with an alternate root error handler.
A smart pointer class that holds a single Sample object.
Tool for accessing xAOD files outside of Athena.
A relatively simple transient store for objects created in analysis.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
std::string label(const std::string &format, int i)
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
This module defines the arguments passed from the BATCH driver to the BATCH worker.
::StatusCode StatusCode
StatusCode definition for legacy code.
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
UInt_t job_id
description: the job id of this segment
std::string segmentName
the name/id to use for this segment (not including the sample name)
std::string fullName
the name/id to use for this segment (including the sample name)
UInt_t sample
description: the index of the sample we are using
std::string sampleName
the name of the sample for this segment
std::string m_inputFileUrl
the input file url of the currently opened file
const SH::MetaObject * m_metaData
the meta-data we use
Worker * m_worker
the worker (to pass on to the algorithms)
TTree * m_inputTree
the (main) tree in the input file
uint64_t m_inputTreeEntry
the entry in the input tree we are currently looking at
BatchJob * m_batchJob
the BatchJob configuration (if used)
xAOD::TStore * m_tstore
the TStore structure, if we use one
bool m_skipEvent
whether we are skipping the current event
OutputStreamData * m_histOutput
the histogram output stream
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
xAOD::TEvent * m_tevent
the TEvent structure, if we use one
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
the list of output files
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
uint64_t m_eventsProcessed
the number of events that have been processed
a range of events in a given file
std::string m_url
the location of the file
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Long64_t m_beginEvent
the first event to process
Long64_t m_endEvent
the event past the last event, or eof