43#include <TObjString.h>
56 StatusCode make_module (std::unique_ptr<Detail::Module>& module, asg::AsgComponentConfig config)
58 using namespace msgEventLoop;
60 ANA_CHECK (
config.makeComponentExpert (module,
"new %1% (\"%2%\")",
false,
"ELModule."));
62 return StatusCode::SUCCESS;
69 testInvariant ()
const
72 for (std::size_t iter = 0, end =
m_algs.size(); iter != end; ++ iter)
89 addOutput (TObject *output_swallow)
91 std::unique_ptr<TObject> output (output_swallow);
97 ModuleData::addOutput (std::move (output));
103 addOutputList (
const std::string& name, TObject *output_swallow)
105 std::unique_ptr<TObject> output (output_swallow);
111 std::unique_ptr<TList> list (
new TList);
112 list->SetName (name.c_str());
113 list->Add (output.release());
120 getOutputHist (
const std::string& name)
const
125 if (result ==
nullptr)
RCU_THROW_MSG (
"unknown output histogram: " + name);
132 getOutputFile (
const std::string&
label)
const
144 getOutputFileNull (
const std::string&
label)
const
150 return iter->second->file();
156 addTree(
const TTree&
tree,
const std::string& stream )
158 using namespace msgEventLoop;
161 auto outputIter =
m_outputs.find (stream);
164 ANA_MSG_ERROR (
"No output file with stream name \"" + stream +
166 return ::StatusCode::FAILURE;
169 outputIter->second->addClone (
tree);
172 return ::StatusCode::SUCCESS;
178 getOutputTree(
const std::string& name,
const std::string& stream )
const
180 using namespace msgEventLoop;
183 auto outputIter =
m_outputs.find (stream);
186 RCU_THROW_MSG (
"No output file with stream name \"" + stream
190 TTree *result = outputIter->second->getOutputTree( name );
191 if( result ==
nullptr ) {
192 RCU_THROW_MSG (
"No tree with name \"" + name +
"\" in stream \"" +
236 hasInputEvents ()
const
243 std::string Worker ::
244 inputFileName ()
const
247 std::string path =
inputFile()->GetName();
248 auto split = path.rfind (
'/');
249 if (
split != std::string::npos)
250 return path.substr (
split + 1);
258 triggerConfig ()
const
261 return dynamic_cast<TTree*
>(
inputFile()->Get(
"physicsMeta/TrigConfTree"));
291 getAlg (
const std::string& name)
const
296 if (alg->hasName (name))
297 return alg.m_algorithm->getLegacyAlg();
314 filterPassed ()
const noexcept
323 setFilterPassed (
bool val_filterPassed)
noexcept
353 setOutputHist (
const std::string& val_outputTarget)
363 setSegmentName (
const std::string& val_segmentName)
376 for (
auto& alg : jobConfig.extractAlgorithms())
378 m_algs.push_back (std::move (alg));
387 using namespace msgEventLoop;
395 m_moduleConfig.emplace_back (
"EL::Detail::MemoryMonitorModule/EarlyMemoryMonitorModule");
398 m_moduleConfig.emplace_back (
"EL::Detail::TreeCacheModule/TreeCacheModule");
405 m_moduleConfig.emplace_back (
"EL::Detail::EventModule/EventModule");
411 if (!factoryPreload.empty())
413 m_moduleConfig.emplace_back (
"EL::Detail::FactoryPreloadModule/FactoryPreloadModule");
416 m_moduleConfig.emplace_back (
"EL::Detail::LeakCheckModule/LeakCheckModule");
422 m_moduleConfig.emplace_back (
"EL::Detail::StopwatchModule/StopwatchModule");
424 m_moduleConfig.emplace_back (
"EL::Detail::GridReportingModule/GridReportingModule");
426 m_moduleConfig.emplace_back (
"EL::Detail::AlgorithmTimerModule/AlgorithmTimerModule");
428 m_moduleConfig.emplace_back (
"EL::Detail::AlgorithmMemoryModule/AlgorithmMemoryModule");
429 m_moduleConfig.emplace_back (
"EL::Detail::FileExecutedModule/FileExecutedModule");
430 m_moduleConfig.emplace_back (
"EL::Detail::EventCountModule/EventCountModule");
431 m_moduleConfig.emplace_back (
"EL::Detail::WorkerConfigModule/WorkerConfigModule");
432 m_moduleConfig.emplace_back (
"EL::Detail::AlgorithmStateModule/AlgorithmStateModule");
433 m_moduleConfig.emplace_back (
"EL::Detail::PostClosedOutputsModule/PostClosedOutputsModule");
435 m_moduleConfig.emplace_back (
"EL::Detail::MemoryMonitorModule/LateMemoryMonitorModule");
439 std::unique_ptr<Detail::Module> module;
440 ANA_CHECK (make_module (module, config));
441 m_modules.push_back (std::move (module));
455 std::istringstream iss (aliases);
457 while (std::getline (iss, alias,
','))
459 auto pos = alias.find (
'=');
460 if (pos == std::string::npos)
463 return ::StatusCode::FAILURE;
465 auto aliasName = alias.substr (0, pos);
466 auto realName = alias.substr (pos + 1);
467 auto realOutput =
m_outputs.find (realName);
470 ANA_MSG_ERROR (
"output stream " << realName <<
" not found for alias " << aliasName);
471 return ::StatusCode::FAILURE;
473 auto [aliasOutput, success] =
m_outputs.emplace (aliasName, realOutput->second);
476 ANA_MSG_ERROR (
"output stream " << aliasName <<
" already exists, can't make alias");
477 return ::StatusCode::FAILURE;
483 (
"EventLoop_JobStats",
"EventLoop job statistics");
486 ANA_MSG_INFO (
"calling firstInitialize on all modules");
488 ANA_CHECK (module->firstInitialize (*
this));
489 ANA_MSG_INFO (
"calling preFileInitialize on all modules");
491 ANA_CHECK (module->preFileInitialize (*
this));
493 return ::StatusCode::SUCCESS;
501 using namespace msgEventLoop;
506 ANA_CHECK (module->processInputs (*
this, *
this));
508 return ::StatusCode::SUCCESS;
516 using namespace msgEventLoop;
523 return StatusCode::FAILURE;
533 output.second->saveOutput ();
534 output.second->close ();
535 std::string path = output.second->finalFileName ();
537 addOutputList (
"EventLoop_OutputStream_" + output.first,
new TObjString (path.c_str()));
541 ANA_CHECK (module->postFinalize (*
this));
542 if (
m_jobStats->GetListOfBranches()->GetEntries() > 0)
547 return ::StatusCode::FAILURE;
549 ModuleData::addOutput (std::move (
m_jobStats));
558 ANA_CHECK (module->postFileClose(*
this));
561 return ::StatusCode::SUCCESS;
569 using namespace msgEventLoop;
581 return ::StatusCode::FAILURE;
589 return ::StatusCode::FAILURE;
597 ANA_CHECK (module->onInitialize (*
this));
605 ANA_CHECK (module->onNewInputFile (*
this));
611 ANA_CHECK (module->onFileExecute (*
this));
623 if (module->onExecute (*this).isFailure())
626 return ::StatusCode::FAILURE;
633 ANA_CHECK (module->postFirstEvent (*
this));
639 return ::StatusCode::SUCCESS;
645 fileOpenErrorFilter(
int level,
bool ,
const char* s1,
const char * s2)
648 if (strstr (s2,
"no streamer or dictionary") !=
nullptr) {
653 if( level > kWarning ) {
655 std::string
msg =
"ROOT error detected in Worker.cxx: ";
659 throw std::runtime_error(
msg);
670 openInputFile (
const std::string& inputFileUrl)
672 using namespace msgEventLoop;
680 return ::StatusCode::SUCCESS;
687 ANA_CHECK (module->onCloseInputFile (*
this));
689 ANA_CHECK (module->postCloseInputFile (*
this));
698 if (inputFileUrl.empty())
699 return ::StatusCode::SUCCESS;
714 module->reportInputFailure (*this);
715 return ::StatusCode::FAILURE;
721 module->reportInputFailure (*this);
722 return ::StatusCode::FAILURE;
727 const std::string treeName
729 tree =
dynamic_cast<TTree*
>(
inputFile->Get (treeName.c_str()));
732 ANA_MSG_INFO (
"tree " << treeName <<
" not found in input file: " << inputFileUrl);
733 ANA_MSG_INFO (
"treating this like a tree with no events");
749 ANA_CHECK (module->onFirstInputFile (*
this));
754 ANA_CHECK (module->onNextInputFile (*
this));
762 return ::StatusCode::SUCCESS;
768 addOutputStream (
const std::string&
label,
771 using namespace msgEventLoop;
777 return ::StatusCode::FAILURE;
779 if (data.file() ==
nullptr)
781 ANA_MSG_ERROR (
"output stream does not have a file attached");
782 return ::StatusCode::FAILURE;
784 if (data.mainStreamName().empty())
785 data.setMainStreamName (
label);
786 m_outputs.insert (std::make_pair (
label, std::make_shared<Detail::OutputStreamData>(std::move (data))));
787 return ::StatusCode::SUCCESS;
793 inputFileNumEntries ()
const
810 eventsProcessed ()
const noexcept
822 using namespace msgEventLoop;
826 meta.fetchDefaults (options);
837 end = job.outputEnd(); out != end; ++ out)
840 out->output()->makeWriter (sample.name(),
"",
".root")};
845 m_moduleConfig.emplace_back (
"EL::Detail::DirectInputModule/DirectInputModule");
858 return ::StatusCode::SUCCESS;
864 batchExecute (
unsigned job_id,
const char *confFile)
866 using namespace msgEventLoop;
871 std::unique_ptr<TFile>
file (TFile::Open (confFile,
"READ"));
872 if (
file.get() ==
nullptr ||
file->IsZombie())
875 return ::StatusCode::FAILURE;
878 std::unique_ptr<BatchJob> job (
dynamic_cast<BatchJob*
>(
file->Get (
"job")));
880 if (job.get() ==
nullptr)
883 return ::StatusCode::FAILURE;
886 if (job_id >= job->segments.size())
888 ANA_MSG_ERROR (
"invalid job-id " << job_id <<
", max is " << job->segments.size());
889 return ::StatusCode::FAILURE;
896 gSystem->Exec (
"pwd");
897 gSystem->MakeDirectory (
"output");
906 end = job->job.outputEnd(); out != end; ++ out)
914 m_moduleConfig.emplace_back (
"EL::Detail::BatchInputModule/BatchInputModule");
925 std::ostringstream job_name;
927 std::ofstream completed ((job->location +
"/status/completed-" + job_name.str()).c_str());
928 return ::StatusCode::SUCCESS;
932 return ::StatusCode::FAILURE;
939 gridExecute (
const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
941 using namespace msgEventLoop;
944 ANA_MSG_INFO (
"Running with ROOT version " << gROOT->GetVersion()
945 <<
" (" << gROOT->GetVersionDate() <<
")");
951 std::unique_ptr<JobConfig> jobConfig;
954 std::unique_ptr<TFile> f (TFile::Open(
"jobdef.root"));
955 if (f ==
nullptr || f->IsZombie()) {
957 return ::StatusCode::FAILURE;
965 return ::StatusCode::FAILURE;
968 jobConfig.reset (
dynamic_cast<JobConfig*
>(f->Get(
"jobConfig")));
969 if (jobConfig ==
nullptr)
972 return ::StatusCode::FAILURE;
976 std::unique_ptr<TList> outs ((TList*)f->Get(
"outputs"));
980 return ::StatusCode::FAILURE;
983 TIter itr(outs.get());
985 while ((obj = itr())) {
991 ANA_MSG_ERROR (
"Encountered unexpected entry in list of outputs");
992 return ::StatusCode::FAILURE;
1000 const std::string location =
".";
1010 TIter itr(&bigOutputs);
1012 while ((obj = itr())) {
1017 return ::StatusCode::FAILURE;
1021 location +
"/" + os->label() +
".root",
"RECREATE"};
1030 std::vector<std::string> fileList;
1031 std::ifstream infile(
"input.txt");
1034 if (!getline(infile, sLine))
break;
1035 std::istringstream ssLine(sLine);
1038 if (!getline(ssLine, sFile,
','))
break;
1039 fileList.push_back(sFile);
1042 if (fileList.size() == 0) {
1047 m_moduleConfig.emplace_back (
"EL::Detail::DirectInputModule/DirectInputModule");
1050 if (nEventsPerJob != -1)
1052 if (SkipEvents != 0)
1066 return ::StatusCode::SUCCESS;
#define RCU_DESTROY_INVARIANT(x)
#define RCU_CHANGE_INVARIANT(x)
#define RCU_NEW_INVARIANT(x)
#define RCU_REQUIRE_SOFT(x)
#define RCU_READ_INVARIANT(x)
#define RCU_THROW_MSG(message)
Run a MT piece of code with an alternate root error handler.
all data needed to manage a given output stream
the job configuration that is independent of driver and dataset
static const std::string optPrintPerFileStats
description: the option to turn on printing of i/o statistics at the end of each file rationale: whil...
static const std::string optMemFailOnLeak
Failure behaviour of the code when a "significant memory leak" is found.
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
static const std::string optMemResidentIncreaseLimit
The minimal resident memory increase necessary to trigger an error.
static const std::string optXAODPerfStats
description: the name of the option for turning on XAODPerfStats.
const OutputStream * outputIter
static const std::string optXAODSummaryReport
the option to turn on/off the xAOD summary reporting at the end of the job
static const std::string optCacheLearnEntries
description: this option allows to configure the number of tree entries used for learning cache behav...
static const std::string optCacheSize
description: this option allows to configure the TTreeCache size for this job.
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
static const std::string optXAODInput
the option to select whether our input is xAODs
static const std::string optMemResidentPerEventIncreaseLimit
The minimal per-event resident memory increase for triggering an error.
static const std::string optMemVirtualIncreaseLimit
The minimal virtual memory increase necessary to trigger an error.
static const std::string optMemVirtualPerEventIncreaseLimit
The minimal per-event virtual memory increase for triggering an error.
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
static const std::string optStreamAliases
an option for stream aliases
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
static const std::string histogramStreamName
the name of the histogram output stream
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
std::vector< asg::AsgComponentConfig > m_moduleConfig
the module configurations we use
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
std::string m_segmentName
the name of the segment we are processing
::StatusCode finalize()
finalize the worker
bool m_firstInputFile
whether this is the first input file
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
bool m_newInputFile
whether this is a new input file (i.e.
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
bool m_algorithmsInitialized
whether the algorithms are initialized
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
::StatusCode initialize()
initialize the worker
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
void setSegmentName(const std::string &val_segmentName)
set the segment name
Long64_t inputFileNumEntries() const override
the number of events in the input file
::StatusCode processInputs()
process all the inputs
bool m_firstEvent
whether we are still to process the first event
std::string m_outputTarget
the target file to which we will write the histogram output
Run a MT piece of code with an alternate root error handler.
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Base class for the event (xAOD::TEvent and xAOD::REvent) classes.
A relatively simple transient store for objects created in analysis.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
std::string label(const std::string &format, int i)
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
This module defines the arguments passed from the BATCH driver to the BATCH worker.
::StatusCode StatusCode
StatusCode definition for legacy code.
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
UInt_t job_id
description: the job id of this segment
std::string segmentName
the name/id to use for this segment (not including the sample name)
std::string fullName
the name/id to use for this segment (including the sample name)
UInt_t sample
description: the index of the sample we are using
std::string sampleName
the name of the sample for this segment
std::string m_inputFileUrl
the input file url of the currently opened file
const SH::MetaObject * m_metaData
the meta-data we use
Worker * m_worker
the worker (to pass on to the algorithms)
TTree * m_inputTree
the (main) tree in the input file
uint64_t m_inputEntry
the entry in the input tree we are currently looking at
BatchJob * m_batchJob
the BatchJob configuration (if used)
xAOD::TStore * m_tstore
the TStore structure, if we use one
bool m_skipEvent
whether we are skipping the current event
OutputStreamData * m_histOutput
the histogram output stream
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
bool m_hasInputEvents
flag whether the most recently opened input file has events or not
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
the list of output files
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
uint64_t m_eventsProcessed
the number of events that have been processed
xAOD::Event * m_event
the Event object, if we use one
a range of events in a given file
std::string m_url
the location of the file
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Long64_t m_beginEvent
the first event to process
Long64_t m_endEvent
the event past the last event, or eof