ATLAS Offline Software
Loading...
Searching...
No Matches
EL::Worker Class Referencefinal

#include <Worker.h>

Inheritance diagram for EL::Worker:
Collaboration diagram for EL::Worker:

Public Member Functions

void testInvariant () const
 effects: test the invariant of this object guarantee: no-fail
virtual ~Worker ()
 effects: standard destructor guarantee: no-fail
void addOutput (TObject *output_swallow) final override
 effects: add an object to the output.
void addOutputList (const std::string &name, TObject *output_swallow) override
 effects: add a given object to the output.
TObject * getOutputHist (const std::string &name) const final override
 get the output histogram with the given name
TFile * getOutputFile (const std::string &label) const override
 effects: get the output file that goes into the dataset with the given label.
TFile * getOutputFileNull (const std::string &label) const override
 effects: get the output file that goes into the dataset with the given label.
::StatusCode addTree (const TTree &tree, const std::string &stream) final override
 effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label specified, called at the wrong time note: See getOutputFile for failure types...
TTree * getOutputTree (const std::string &name, const std::string &stream) const final override
 effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
const SH::MetaObjectmetaData () const override
 description: the sample meta-data we are working on guarantee: no-fail invariant: metaData != 0 rationale: this can be used for accessing sample meta-data
TTree * tree () const override
 description: the tree we are running on guarantee: no-fail
Long64_t treeEntry () const override
 description: the entry in the tree we are reading guarantee: no-fail
TFile * inputFile () const override
 description: the file we are reading the current tree from guarantee: no-fail
bool hasInputEvents () const override
 flag whether the most recently opened input file has events or not
std::string inputFileName () const override
 the name of the file we are reading the current tree from, without the path component
TTree * triggerConfig () const override
 description: the trigger config tree from the input file, or NULL if we did not find it guarantee: strong failures: i/o errors
xAOD::EventxaodEvent () const override
 description: the xAOD event and store guarantee: strong failures: out of memory I failures: EventSvc not configured postcondition: result != 0
xAOD::TStorexaodStore () const override
EL::AlgorithmgetAlg (const std::string &name) const override
 effects: returns the algorithms with the given name or NULL if there is none guarantee: strong failures: out of memory II
void skipEvent () override
 effects: skip the current event, i.e.
virtual bool filterPassed () const noexcept final override
 whether the current algorithm passed its filter criterion for the current event
virtual void setFilterPassed (bool val_filterPassed) noexcept final override
 set the value of filterPassed
 Worker ()
 standard constructor
::StatusCode directExecute (const SH::Sample &sample, const Job &job, const std::string &location, const SH::MetaObject &options)
 run the job
::StatusCode batchExecute (unsigned job_id, const char *confFile)
 effects: do what is needed to execute the given job segment guarantee: basic failures: job specific
::StatusCode gridExecute (const std::string &sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)

Protected Member Functions

void setMetaData (const SH::MetaObject *val_metaData)
 set the metaData
void setOutputHist (const std::string &val_outputTarget)
 set the histogram output list
void setSegmentName (const std::string &val_segmentName)
 set the segment name
void setJobConfig (JobConfig &&jobConfig)
 set the JobConfig
::StatusCode initialize ()
 initialize the worker
::StatusCode processInputs ()
 process all the inputs
::StatusCode finalize ()
 finalize the worker
::StatusCode processEvents (EventRange &eventRange) override
 process the given event range
::StatusCode openInputFile (const std::string &inputFileUrl) override
 open the given input file without processing it
::StatusCode addOutputStream (const std::string &label, Detail::OutputStreamData output)
 effects: add another output file guarantee: strong failures: low level errors II failures: label already used
Long64_t inputFileNumEntries () const override
 the number of events in the input file
uint64_t eventsProcessed () const noexcept
 the number of events that have been processed

Private Types

enum  GridErrorCodes { EC_FAIL = 220 , EC_ABORT = 221 , EC_NOTFINISHED = 222 , EC_BADINPUT = 223 }
typedef std::map< std::string, TH1 * >::const_iterator OutputHistMapIter
 the output map
typedef std::map< std::pair< std::string, std::string >, TTree * >::const_iterator OutputTreeMapIter
 description: the list of output trees

Private Member Functions

void gridCreateJobSummary (uint64_t eventsProcessed)
void addOutput (std::unique_ptr< TObject > output)
 add the given output object to the histogram output stream

Static Private Member Functions

static bool fileOpenErrorFilter (int level, bool, const char *, const char *)
 Error handler for file opening.

Private Attributes

std::map< std::string, TH1 * > m_outputHistMap
std::map< std::pair< std::string, std::string >, TTree * > m_outputTreeMap
std::vector< std::unique_ptr< Detail::Module > > m_modules
 the list of modules we hold
bool m_firstInputFile {true}
 whether this is the first input file
bool m_newInputFile {false}
 whether this is a new input file (i.e.
std::string m_outputTarget
 the target file to which we will write the histogram output
std::string m_segmentName
 the name of the segment we are processing
bool m_algorithmsInitialized {false}
 whether the algorithms are initialized
bool m_firstEvent {true}
 whether we are still to process the first event
std::vector< asg::AsgComponentConfigm_moduleConfig
 the module configurations we use
std::vector< Detail::AlgorithmDatam_algs
 the list of algorithms
std::string m_inputFileUrl
 the input file url of the currently opened file
std::unique_ptr< TFile > m_inputFile
 the input file pointer of the currently opened filed
TTree * m_inputTree {nullptr}
 the (main) tree in the input file
uint64_t m_inputEntry {0}
 the entry in the input tree we are currently looking at
bool m_hasInputEvents = false
 flag whether the most recently opened input file has events or not
bool m_skipEvent = false
 whether we are skipping the current event
const SH::MetaObjectm_metaData {nullptr}
 the meta-data we use
uint64_t m_eventsProcessed {0}
 the number of events that have been processed
OutputStreamData * m_histOutput {nullptr}
 the histogram output stream
std::unique_ptr< TTree > m_jobStats
 Tree saving per-job statistics information.
xAOD::Eventm_event {nullptr}
 the Event object, if we use one
xAOD::TStorem_tstore {nullptr}
 the TStore structure, if we use one
asg::SgEventm_evtStore {nullptr}
 the SgEvent structure, if we use one
Workerm_worker {nullptr}
 the worker (to pass on to the algorithms)
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
 the list of output files
BatchJobm_batchJob = nullptr
 the BatchJob configuration (if used)

Detailed Description

Definition at line 25 of file Worker.h.

Member Typedef Documentation

◆ OutputHistMapIter

typedef std::map<std::string,TH1*>::const_iterator EL::Worker::OutputHistMapIter
private

the output map

Definition at line 408 of file Worker.h.

◆ OutputTreeMapIter

typedef std::map<std::pair<std::string,std::string>,TTree*>::const_iterator EL::Worker::OutputTreeMapIter
private

description: the list of output trees

Definition at line 415 of file Worker.h.

Member Enumeration Documentation

◆ GridErrorCodes

Enumerator
EC_FAIL 
EC_ABORT 
EC_NOTFINISHED 
EC_BADINPUT 

Definition at line 256 of file Worker.h.

256 {
257 EC_FAIL = 220,
258 EC_ABORT = 221,
259 EC_NOTFINISHED = 222,
260 EC_BADINPUT = 223
261 };
@ EC_NOTFINISHED
Definition Worker.h:259
@ EC_BADINPUT
Definition Worker.h:260

Constructor & Destructor Documentation

◆ ~Worker()

EL::Worker::~Worker ( )
virtual

effects: standard destructor guarantee: no-fail

Definition at line 80 of file Worker.cxx.

82 {
84 }
#define RCU_DESTROY_INVARIANT(x)
Definition Assert.h:230

◆ Worker()

EL::Worker::Worker ( )

standard constructor

Guarantee
strong
Failures
out of memory I

Definition at line 331 of file Worker.cxx.

333 {
334 m_worker = this;
335
336 RCU_NEW_INVARIANT (this);
337 }
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:228
Worker * m_worker
the worker (to pass on to the algorithms)
Definition ModuleData.h:110

Member Function Documentation

◆ addOutput() [1/2]

void EL::Detail::ModuleData::addOutput ( std::unique_ptr< TObject > output)
inherited

add the given output object to the histogram output stream

Guarantee
basic
Failures
out of memory II

Definition at line 33 of file ModuleData.cxx.

35 {
36 RCU_ASSERT (m_histOutput != nullptr);
37 m_histOutput->addOutput (std::move (output));
38 }
#define RCU_ASSERT(x)
Definition Assert.h:217
OutputStreamData * m_histOutput
the histogram output stream
Definition ModuleData.h:95

◆ addOutput() [2/2]

void EL::Worker::addOutput ( TObject * output_swallow)
finaloverridevirtual

effects: add an object to the output.

the worker takes over ownership of the object, but the caller may maintain a reference to it guarantee: basic, argument is always swallowed failures: out of memory I requires: output_swallow != 0 warning: so far I placed no requirements on the output objects. I may do that at a later stage though, possibly breaking existing code.

Implements EL::IHistogramWorker.

Definition at line 88 of file Worker.cxx.

90 {
91 std::unique_ptr<TObject> output (output_swallow);
92
94 RCU_REQUIRE_SOFT (output_swallow != 0);
95
96 RCU::SetDirectory (output_swallow, 0);
97 ModuleData::addOutput (std::move (output));
98 }
#define RCU_CHANGE_INVARIANT(x)
Definition Assert.h:226
#define RCU_REQUIRE_SOFT(x)
Definition Assert.h:148
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
Definition RootUtils.cxx:25
output
Definition merge.py:16

◆ addOutputList()

void EL::Worker::addOutputList ( const std::string & name,
TObject * output_swallow )
overridevirtual

effects: add a given object to the output.

instead of trying to merge the outputs from multiple jobs the output file will contain a TList of the given name with the objects from all the output. guarantee: basic, argument is always swallowed failures: out of memory II requires: output_swallow != 0 rationale: This was primarily build to allow the GRL output to be collected from EventLoop jobs, but it can be used for any kind of output that can not or should not be merged.

Implements EL::IWorker.

Definition at line 102 of file Worker.cxx.

104 {
105 std::unique_ptr<TObject> output (output_swallow);
106
108 RCU_REQUIRE_SOFT (output_swallow != 0);
109
110 RCU::SetDirectory (output_swallow, 0);
111 std::unique_ptr<TList> list (new TList);
112 list->SetName (name.c_str());
113 list->Add (output.release());
114 addOutput (list.release());
115 }
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
Definition Worker.cxx:89
list(name, path='/')
Definition histSizes.py:38

◆ addOutputStream()

StatusCode EL::Worker::addOutputStream ( const std::string & label,
Detail::OutputStreamData output )
protected

effects: add another output file guarantee: strong failures: low level errors II failures: label already used

Definition at line 767 of file Worker.cxx.

770 {
771 using namespace msgEventLoop;
773
774 if (m_outputs.find (label) != m_outputs.end())
775 {
776 ANA_MSG_ERROR ("output file already defined for label: " + label);
777 return ::StatusCode::FAILURE;
778 }
779 if (data.file() == nullptr)
780 {
781 ANA_MSG_ERROR ("output stream does not have a file attached");
782 return ::StatusCode::FAILURE;
783 }
784 if (data.mainStreamName().empty())
785 data.setMainStreamName (label);
786 m_outputs.insert (std::make_pair (label, std::make_shared<Detail::OutputStreamData>(std::move (data))));
787 return ::StatusCode::SUCCESS;
788 }
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
std::string label(const std::string &format, int i)
Definition label.h:19
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
the list of output files
Definition ModuleData.h:113

◆ addTree()

StatusCode EL::Worker::addTree ( const TTree & tree,
const std::string & stream )
finaloverridevirtual

effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label specified, called at the wrong time note: See getOutputFile for failure types...

Implements EL::ITreeWorker.

Definition at line 155 of file Worker.cxx.

157 {
158 using namespace msgEventLoop;
159 RCU_READ_INVARIANT( this );
160
161 auto outputIter = m_outputs.find (stream);
162 if (outputIter == m_outputs.end())
163 {
164 ANA_MSG_ERROR ( "No output file with stream name \"" + stream +
165 "\" found" );
166 return ::StatusCode::FAILURE;
167 }
168
169 outputIter->second->addClone (tree);
170
171 // Return gracefully:
172 return ::StatusCode::SUCCESS;
173 }
#define RCU_READ_INVARIANT(x)
Definition Assert.h:224
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
Definition Worker.cxx:210

◆ batchExecute()

StatusCode EL::Worker::batchExecute ( unsigned job_id,
const char * confFile )

effects: do what is needed to execute the given job segment guarantee: basic failures: job specific

Definition at line 863 of file Worker.cxx.

865 {
866 using namespace msgEventLoop;
868
869 try
870 {
871 std::unique_ptr<TFile> file (TFile::Open (confFile, "READ"));
872 if (file.get() == nullptr || file->IsZombie())
873 {
874 ANA_MSG_ERROR ("failed to open file: " << confFile);
875 return ::StatusCode::FAILURE;
876 }
877
878 std::unique_ptr<BatchJob> job (dynamic_cast<BatchJob*>(file->Get ("job")));
879 m_batchJob = job.get();
880 if (job.get() == nullptr)
881 {
882 ANA_MSG_ERROR ("failed to retrieve BatchJob object");
883 return ::StatusCode::FAILURE;
884 }
885
886 if (job_id >= job->segments.size())
887 {
888 ANA_MSG_ERROR ("invalid job-id " << job_id << ", max is " << job->segments.size());
889 return ::StatusCode::FAILURE;
890 }
891 BatchSegment *segment = &job->segments[job_id];
892 RCU_ASSERT (segment->job_id == job_id);
893 RCU_ASSERT (segment->sample < job->samples.size());
894 BatchSample *sample = &job->samples[segment->sample];
895
896 gSystem->Exec ("pwd");
897 gSystem->MakeDirectory ("output");
898
899 setMetaData (&sample->meta);
900 setOutputHist (job->location + "/fetch");
901 setSegmentName (segment->fullName);
902
903 setJobConfig (JobConfig (job->job.jobConfig()));
904
905 for (Job::outputIter out = job->job.outputBegin(),
906 end = job->job.outputEnd(); out != end; ++ out)
907 {
908 Detail::OutputStreamData data {
909 out->output()->makeWriter (segment->sampleName, segment->segmentName, ".root")};
910 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
911 }
912
913 {
914 m_moduleConfig.emplace_back ("EL::Detail::BatchInputModule/BatchInputModule");
915 ANA_CHECK (m_moduleConfig.back().setProperty ("jobId", job_id));
917 if (maxEvents != -1)
918 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
919 }
920
923 ANA_CHECK (finalize ());
924
925 std::ostringstream job_name;
926 job_name << job_id;
927 std::ofstream completed ((job->location + "/status/completed-" + job_name.str()).c_str());
928 return ::StatusCode::SUCCESS;
929 } catch (...)
930 {
931 Detail::report_exception (std::current_exception());
932 return ::StatusCode::FAILURE;
933 }
934 }
#define ANA_CHECK(EXP)
check whether the given expression was successful
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
Definition Job.h:221
const OutputStream * outputIter
Definition Job.h:139
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
Definition Worker.cxx:353
std::vector< asg::AsgComponentConfig > m_moduleConfig
the module configurations we use
Definition Worker.h:457
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
Definition Worker.cxx:768
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
Definition Worker.cxx:201
::StatusCode finalize()
finalize the worker
Definition Worker.cxx:514
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
Definition Worker.cxx:373
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
Definition Worker.cxx:342
::StatusCode initialize()
initialize the worker
Definition Worker.cxx:385
void setSegmentName(const std::string &val_segmentName)
set the segment name
Definition Worker.cxx:363
::StatusCode processInputs()
process all the inputs
Definition Worker.cxx:499
double castDouble(const std::string &name, double def_val=0, CastMode mode=CAST_ERROR_THROW) const
the meta-data double with the given name
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
BatchJob * m_batchJob
the BatchJob configuration (if used)
Definition ModuleData.h:116
TFile * file

◆ directExecute()

StatusCode EL::Worker::directExecute ( const SH::Sample & sample,
const Job & job,
const std::string & location,
const SH::MetaObject & options )

run the job

Guarantee
basic

Definition at line 818 of file Worker.cxx.

821 {
822 using namespace msgEventLoop;
824
825 SH::MetaObject meta (*sample.meta());
826 meta.fetchDefaults (options);
827
828 setMetaData (&meta);
829 setOutputHist (location);
830 setSegmentName (sample.name());
831
832 ANA_MSG_INFO ("Running sample: " << sample.name());
833
834 setJobConfig (JobConfig (job.jobConfig()));
835
836 for (Job::outputIter out = job.outputBegin(),
837 end = job.outputEnd(); out != end; ++ out)
838 {
839 Detail::OutputStreamData data {
840 out->output()->makeWriter (sample.name(), "", ".root")};
841 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
842 }
843
844 {
845 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
846 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", sample.makeFileList()));
848 if (maxEvents != -1)
849 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
851 if (skipEvents != 0)
852 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", skipEvents));
853 }
854
857 ANA_CHECK (finalize ());
858 return ::StatusCode::SUCCESS;
859 }
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
Definition Job.h:229

◆ eventsProcessed()

uint64_t EL::Worker::eventsProcessed ( ) const
protectednoexcept

the number of events that have been processed

Guarantee
no-fail

Definition at line 809 of file Worker.cxx.

811 {
812 RCU_READ_INVARIANT (this);
813 return m_eventsProcessed;
814 }
uint64_t m_eventsProcessed
the number of events that have been processed
Definition ModuleData.h:92

◆ fileOpenErrorFilter()

bool EL::Worker::fileOpenErrorFilter ( int level,
bool ,
const char * s1,
const char * s2 )
staticprivate

Error handler for file opening.

Definition at line 644 of file Worker.cxx.

646 {
647 // Don't fail on missing dictionary messages.
648 if (strstr (s2, "no streamer or dictionary") != nullptr) {
649 return true;
650 }
651
652 // For messages above warning level (SysError, Error, Fatal)
653 if( level > kWarning ) {
654 // We won't output further; ROOT should have already put something in the log file
655 std::string msg = "ROOT error detected in Worker.cxx: ";
656 msg += s1;
657 msg += " ";
658 msg += s2;
659 throw std::runtime_error(msg);
660
661 // No need for further error handling
662 return false;
663 }
664
665 // Pass to the default error handlers
666 return true;
667 }
MsgStream & msg
Definition testRead.cxx:32

◆ filterPassed()

bool EL::Worker::filterPassed ( ) const
finaloverridevirtualnoexcept

whether the current algorithm passed its filter criterion for the current event

Guarantee
no-fail

Implements EL::IFilterWorker.

Definition at line 313 of file Worker.cxx.

315 {
316 RCU_READ_INVARIANT (this);
317 return !m_skipEvent;
318 }
bool m_skipEvent
whether we are skipping the current event
Definition ModuleData.h:86

◆ finalize()

StatusCode EL::Worker::finalize ( )
protected

finalize the worker

This method ought to be called after all events have been processed. It is meant to ensure that the job is ended properly and all outputs are written out and files are closed.

Guarantee
basic
Failures
finalization failures

Definition at line 513 of file Worker.cxx.

515 {
516 using namespace msgEventLoop;
517
519
520 if (m_algorithmsInitialized == false)
521 {
522 ANA_MSG_ERROR ("algorithms never got initialized");
523 return StatusCode::FAILURE;
524 }
525
527 for (auto& module : m_modules)
528 ANA_CHECK (module->onFinalize (*this));
529 for (auto& output : m_outputs)
530 {
531 if (output.first != Job::histogramStreamName && output.second->mainStreamName() == output.first)
532 {
533 output.second->saveOutput ();
534 output.second->close ();
535 std::string path = output.second->finalFileName ();
536 if (!path.empty())
537 addOutputList ("EventLoop_OutputStream_" + output.first, new TObjString (path.c_str()));
538 }
539 }
540 for (auto& module : m_modules)
541 ANA_CHECK (module->postFinalize (*this));
542 if (m_jobStats->GetListOfBranches()->GetEntries() > 0)
543 {
544 if (m_jobStats->Fill() <= 0)
545 {
546 ANA_MSG_ERROR ("failed to fill the job statistics tree");
547 return ::StatusCode::FAILURE;
548 }
549 ModuleData::addOutput (std::move (m_jobStats));
550 }
551 m_histOutput->saveOutput ();
552 for (auto& module : m_modules)
553 ANA_CHECK (module->onWorkerEnd (*this));
554 m_histOutput->saveOutput ();
555 m_histOutput->close ();
556
557 for (auto& module : m_modules){
558 ANA_CHECK (module->postFileClose(*this));
559 }
560 ANA_MSG_INFO ("worker finished successfully");
561 return ::StatusCode::SUCCESS;
562 }
static const std::string histogramStreamName
the name of the histogram output stream
Definition Job.h:597
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
Definition Worker.cxx:103
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
Definition Worker.cxx:670
bool m_algorithmsInitialized
whether the algorithms are initialized
Definition Worker.h:447
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
Definition Worker.h:421
path
python interpreter configuration --------------------------------------—
Definition athena.py:130
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
Definition ModuleData.h:98

◆ getAlg()

Algorithm * EL::Worker::getAlg ( const std::string & name) const
overridevirtual

effects: returns the algorithms with the given name or NULL if there is none guarantee: strong failures: out of memory II

Implements EL::IWorker.

Definition at line 290 of file Worker.cxx.

292 {
293 RCU_READ_INVARIANT (this);
294 for (auto& alg : m_algs)
295 {
296 if (alg->hasName (name))
297 return alg.m_algorithm->getLegacyAlg();
298 }
299 return 0;
300 }
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
Definition ModuleData.h:67

◆ getOutputFile()

TFile * EL::Worker::getOutputFile ( const std::string & label) const
overridevirtual

effects: get the output file that goes into the dataset with the given label.

this dataset has to be registered before the job is submitted. typically that happens in the doSetupJob method. guarantee: strong failures: no dataset with the given label postcondition: result != 0 note: the default value for the argument corresponds to the default label value in the OutputInfo class.

Implements EL::IWorker.

Definition at line 131 of file Worker.cxx.

133 {
134 RCU_READ_INVARIANT (this);
135 TFile *result = getOutputFileNull (label);
136 if (result == 0)
137 RCU_THROW_MSG ("no output dataset defined with label: " + label);
138 return result;
139 }
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:53
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition Worker.cxx:144

◆ getOutputFileNull()

TFile * EL::Worker::getOutputFileNull ( const std::string & label) const
overridevirtual

effects: get the output file that goes into the dataset with the given label.

this dataset has to be registered before the job is submitted. typically that happens in the doSetupJob method. guarantee: strong failures: internal errors note: the default value for the argument corresponds to the default label value in the OutputInfo class. rationale: this method was added to support optional output files, i.e.: the algorithm leaves it to the user whether or not the output file gets configured. then the algorithm just checks whether the output file is there. if so it fills it, otherwise it ignores it.

Implements EL::IWorker.

Definition at line 143 of file Worker.cxx.

145 {
146 RCU_READ_INVARIANT (this);
147 auto iter = m_outputs.find (label);
148 if (iter == m_outputs.end())
149 return 0;
150 return iter->second->file();
151 }

◆ getOutputHist()

TObject * EL::Worker::getOutputHist ( const std::string & name) const
finaloverridevirtual

get the output histogram with the given name

This is mostly meant, so that I can emulate the Athena histogram mechanism.

Guarantee
strong
Failures
object not found
Postcondition
result != 0

Implements EL::IHistogramWorker.

Definition at line 119 of file Worker.cxx.

121 {
122 RCU_READ_INVARIANT (this);
123
124 TObject *result = m_histOutput->getOutputHist (name);
125 if (result == nullptr) RCU_THROW_MSG ("unknown output histogram: " + name);
126 return result;
127 }

◆ getOutputTree()

TTree * EL::Worker::getOutputTree ( const std::string & name,
const std::string & stream ) const
finaloverridevirtual

effects: get the tree that was added to an output file earlier failures: Tree doesn't exist

Implements EL::ITreeWorker.

Definition at line 177 of file Worker.cxx.

179 {
180 using namespace msgEventLoop;
181 RCU_READ_INVARIANT( this );
182
183 auto outputIter = m_outputs.find (stream);
184 if (outputIter == m_outputs.end())
185 {
186 RCU_THROW_MSG ( "No output file with stream name \"" + stream
187 + "\" found" );
188 }
189
190 TTree *result = outputIter->second->getOutputTree( name );
191 if( result == nullptr ) {
192 RCU_THROW_MSG ( "No tree with name \"" + name + "\" in stream \"" +
193 stream + "\"" );
194 }
195 return result;
196 }

◆ gridCreateJobSummary()

void EL::Worker::gridCreateJobSummary ( uint64_t eventsProcessed)
private

◆ gridExecute()

StatusCode EL::Worker::gridExecute ( const std::string & sampleName,
Long64_t SkipEvents,
Long64_t nEventsPerJob )

Definition at line 938 of file Worker.cxx.

940 {
941 using namespace msgEventLoop;
943
944 ANA_MSG_INFO ("Running with ROOT version " << gROOT->GetVersion()
945 << " (" << gROOT->GetVersionDate() << ")");
946
947 ANA_MSG_INFO ("Loading EventLoop grid job");
948
949
950 TList bigOutputs;
951 std::unique_ptr<JobConfig> jobConfig;
952 SH::MetaObject *mo = 0;
953
954 std::unique_ptr<TFile> f (TFile::Open("jobdef.root"));
955 if (f == nullptr || f->IsZombie()) {
956 ANA_MSG_ERROR ("Could not read jobdef");
957 return ::StatusCode::FAILURE;
958 }
959
960 mo = dynamic_cast<SH::MetaObject*>(f->Get(sampleName.c_str()));
961 if (!mo)
962 mo = dynamic_cast<SH::MetaObject*>(f->Get("defaultMetaObject"));
963 if (!mo) {
964 ANA_MSG_ERROR ("Could not read in sample meta object");
965 return ::StatusCode::FAILURE;
966 }
967
968 jobConfig.reset (dynamic_cast<JobConfig*>(f->Get("jobConfig")));
969 if (jobConfig == nullptr)
970 {
971 ANA_MSG_ERROR ("failed to read jobConfig object");
972 return ::StatusCode::FAILURE;
973 }
974
975 {
976 std::unique_ptr<TList> outs ((TList*)f->Get("outputs"));
977 if (outs == nullptr)
978 {
979 ANA_MSG_ERROR ("Could not read list of outputs");
980 return ::StatusCode::FAILURE;
981 }
982
983 TIter itr(outs.get());
984 TObject *obj = 0;
985 while ((obj = itr())) {
986 EL::OutputStream * out = dynamic_cast<EL::OutputStream*>(obj);
987 if (out) {
988 bigOutputs.Add(out);
989 }
990 else {
991 ANA_MSG_ERROR ("Encountered unexpected entry in list of outputs");
992 return ::StatusCode::FAILURE;
993 }
994 }
995 }
996
997 f->Close();
998 f.reset ();
999
1000 const std::string location = ".";
1001
1002 mo->setBool (Job::optGridReporting, true);
1003 setMetaData (mo);
1004 setOutputHist (location);
1005 setSegmentName ("output");
1006
1007 ANA_MSG_INFO ("Starting EventLoop Grid worker");
1008
1009 {//Create and register the "big" output files with base class
1010 TIter itr(&bigOutputs);
1011 TObject *obj = 0;
1012 while ((obj = itr())) {
1013 EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
1014 if (os == nullptr)
1015 {
1016 ANA_MSG_ERROR ("Bad input");
1017 return ::StatusCode::FAILURE;
1018 }
1019 {
1020 Detail::OutputStreamData data {
1021 location + "/" + os->label() + ".root", "RECREATE"};
1022 ANA_CHECK (addOutputStream (os->label(), std::move (data)));
1023 }
1024 }
1025 }
1026
1027 setJobConfig (std::move (*jobConfig));
1028
1029 {
1030 std::vector<std::string> fileList;
1031 std::ifstream infile("input.txt");
1032 while (infile) {
1033 std::string sLine;
1034 if (!getline(infile, sLine)) break;
1035 std::istringstream ssLine(sLine);
1036 while (ssLine) {
1037 std::string sFile;
1038 if (!getline(ssLine, sFile, ',')) break;
1039 fileList.push_back(sFile);
1040 }
1041 }
1042 if (fileList.size() == 0) {
1043 ANA_MSG_ERROR ("no input files provided");
1044 //User was expecting input after all.
1045 gSystem->Exit(EC_BADINPUT);
1046 }
1047 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
1048 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", fileList));
1049
1050 if (nEventsPerJob != -1)
1051 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", nEventsPerJob));
1052 if (SkipEvents != 0)
1053 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", SkipEvents));
1054 }
1055
1058 ANA_CHECK (finalize ());
1059
1060 int nEvents = eventsProcessed();
1061 ANA_MSG_INFO ("Loop finished.");
1062 ANA_MSG_INFO ("Read/processed " << nEvents << " events.");
1063
1064 ANA_MSG_INFO ("EventLoop Grid worker finished");
1065 ANA_MSG_INFO ("Saving output");
1066 return ::StatusCode::SUCCESS;
1067 }
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
Definition Job.h:489
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
Definition Worker.cxx:810
void setBool(const std::string &name, bool value)
set the meta-data boolean with the given name
const int nEvents

◆ hasInputEvents()

bool EL::Worker::hasInputEvents ( ) const
overridevirtual

flag whether the most recently opened input file has events or not

Guarantee no-fail

Implements EL::IWorker.

Definition at line 235 of file Worker.cxx.

237 {
238 // no invariant used
239 return m_hasInputEvents;
240 }
bool m_hasInputEvents
flag whether the most recently opened input file has events or not
Definition ModuleData.h:83

◆ initialize()

StatusCode EL::Worker::initialize ( )
protected

initialize the worker

This method ought to be called after the options on the worker are set and before any events are processed. It is meant to make sure everything is ready and set up for event processing.

Guarantee
basic
Failures
initialization failures

Definition at line 384 of file Worker.cxx.

386 {
387 using namespace msgEventLoop;
389
390 const bool xAODInput = m_metaData->castBool (Job::optXAODInput, false);
391
392 ANA_MSG_INFO ("xAODInput = " << xAODInput);
393
394 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
395 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/EarlyMemoryMonitorModule");
396 if (auto cacheSize = metaData()->castDouble (Job::optCacheSize, 0); cacheSize > 0)
397 {
398 m_moduleConfig.emplace_back ("EL::Detail::TreeCacheModule/TreeCacheModule");
399 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheSize", Long64_t (cacheSize)));
400 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheLearnEntries", Long64_t (metaData()->castInteger (Job::optCacheLearnEntries, 0))));
401 ANA_CHECK (m_moduleConfig.back().setProperty ("printPerFileStats", metaData()->castBool (Job::optPrintPerFileStats, false)));
402 }
403 if (xAODInput)
404 {
405 m_moduleConfig.emplace_back ("EL::Detail::EventModule/EventModule");
406 if (metaData()->castDouble (Job::optXAODSummaryReport, 1) == 0)
407 ANA_CHECK (m_moduleConfig.back().setProperty ("summaryReport", false));
408 ANA_CHECK (m_moduleConfig.back().setProperty ("useStats", metaData()->castBool (Job::optXAODPerfStats, false)));
409 }
410 auto factoryPreload = metaData()->castString (Job::optFactoryPreload, "");
411 if (!factoryPreload.empty())
412 {
413 m_moduleConfig.emplace_back ("EL::Detail::FactoryPreloadModule/FactoryPreloadModule");
414 ANA_CHECK (m_moduleConfig.back().setProperty ("preloader", factoryPreload));
415 }
416 m_moduleConfig.emplace_back ("EL::Detail::LeakCheckModule/LeakCheckModule");
417 ANA_CHECK (m_moduleConfig.back().setProperty ("failOnLeak", metaData()->castBool (Job::optMemFailOnLeak, false)));
418 ANA_CHECK (m_moduleConfig.back().setProperty ("absResidentLimit", metaData()->castInteger (Job::optMemResidentIncreaseLimit, 10000)));
419 ANA_CHECK (m_moduleConfig.back().setProperty ("absVirtualLimit", metaData()->castInteger (Job::optMemVirtualIncreaseLimit, 0)));
420 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvResidentLimit", metaData()->castInteger (Job::optMemResidentPerEventIncreaseLimit, 10)));
421 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvVirtualLimit", metaData()->castInteger (Job::optMemVirtualPerEventIncreaseLimit, 0)));
422 m_moduleConfig.emplace_back ("EL::Detail::StopwatchModule/StopwatchModule");
423 if (metaData()->castBool (Job::optGridReporting, false))
424 m_moduleConfig.emplace_back ("EL::Detail::GridReportingModule/GridReportingModule");
425 if (metaData()->castBool (Job::optAlgorithmTimer, false))
426 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmTimerModule/AlgorithmTimerModule");
427 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
428 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmMemoryModule/AlgorithmMemoryModule");
429 m_moduleConfig.emplace_back ("EL::Detail::FileExecutedModule/FileExecutedModule");
430 m_moduleConfig.emplace_back ("EL::Detail::EventCountModule/EventCountModule");
431 m_moduleConfig.emplace_back ("EL::Detail::WorkerConfigModule/WorkerConfigModule");
432 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmStateModule/AlgorithmStateModule");
433 m_moduleConfig.emplace_back ("EL::Detail::PostClosedOutputsModule/PostClosedOutputsModule");
434 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
435 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/LateMemoryMonitorModule");
436
437 for (const auto& config : m_moduleConfig)
438 {
439 std::unique_ptr<Detail::Module> module;
440 ANA_CHECK (make_module (module, config));
441 m_modules.push_back (std::move (module));
442 }
443
444 if (m_outputs.find (Job::histogramStreamName) == m_outputs.end())
445 {
446 Detail::OutputStreamData data {
447 m_outputTarget + "/hist-" + m_segmentName + ".root", "RECREATE"};
449 }
452 if (auto aliases = metaData()->castString (Job::optStreamAliases, ""); !aliases.empty())
453 {
454 // the format of aliases is "alias1=realname1,alias2=realname2"
455 std::istringstream iss (aliases);
456 std::string alias;
457 while (std::getline (iss, alias, ','))
458 {
459 auto pos = alias.find ('=');
460 if (pos == std::string::npos)
461 {
462 ANA_MSG_ERROR ("Invalid alias format: " << alias);
463 return ::StatusCode::FAILURE;
464 }
465 auto aliasName = alias.substr (0, pos);
466 auto realName = alias.substr (pos + 1);
467 auto realOutput = m_outputs.find (realName);
468 if (realOutput == m_outputs.end())
469 {
470 ANA_MSG_ERROR ("output stream " << realName << " not found for alias " << aliasName);
471 return ::StatusCode::FAILURE;
472 }
473 auto [aliasOutput, success] = m_outputs.emplace (aliasName, realOutput->second);
474 if (!success)
475 {
476 ANA_MSG_ERROR ("output stream " << aliasName << " already exists, can't make alias");
477 return ::StatusCode::FAILURE;
478 }
479 }
480 }
481
482 m_jobStats = std::make_unique<TTree>
483 ("EventLoop_JobStats", "EventLoop job statistics");
484 m_jobStats->SetDirectory (nullptr);
485
486 ANA_MSG_INFO ("calling firstInitialize on all modules");
487 for (auto& module : m_modules)
488 ANA_CHECK (module->firstInitialize (*this));
489 ANA_MSG_INFO ("calling preFileInitialize on all modules");
490 for (auto& module : m_modules)
491 ANA_CHECK (module->preFileInitialize (*this));
492
493 return ::StatusCode::SUCCESS;
494 }
static const std::string optPrintPerFileStats
description: the option to turn on printing of i/o statistics at the end of each file rationale: whil...
Definition Job.h:404
static const std::string optMemFailOnLeak
Failure behaviour of the code when a "significant memory leak" is found.
Definition Job.h:586
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
Definition Job.h:201
static const std::string optMemResidentIncreaseLimit
The minimal resident memory increase necessary to trigger an error.
Definition Job.h:564
static const std::string optXAODPerfStats
description: the name of the option for turning on XAODPerfStats.
Definition Job.h:352
static const std::string optXAODSummaryReport
the option to turn on/off the xAOD summary reporting at the end of the job
Definition Job.h:394
static const std::string optCacheLearnEntries
description: this option allows to configure the number of tree entries used for learning cache behav...
Definition Job.h:323
static const std::string optCacheSize
description: this option allows to configure the TTreeCache size for this job.
Definition Job.h:308
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
Definition Job.h:206
static const std::string optXAODInput
the option to select whether our input is xAODs
Definition Job.h:389
static const std::string optMemResidentPerEventIncreaseLimit
The minimal per-event resident memory increase for triggering an error.
Definition Job.h:546
static const std::string optMemVirtualIncreaseLimit
The minimal virtual memory increase necessary to trigger an error.
Definition Job.h:572
static const std::string optMemVirtualPerEventIncreaseLimit
The minimal per-event virtual memory increase for triggering an error.
Definition Job.h:556
static const std::string optStreamAliases
an option for stream aliases
Definition Job.h:213
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
Definition Job.h:210
std::string m_segmentName
the name of the segment we are processing
Definition Worker.h:442
std::string m_outputTarget
the target file to which we will write the histogram output
Definition Worker.h:437
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name
const SH::MetaObject * m_metaData
the meta-data we use
Definition ModuleData.h:89

◆ inputFile()

TFile * EL::Worker::inputFile ( ) const
overridevirtual

description: the file we are reading the current tree from guarantee: no-fail

Implements EL::IWorker.

Definition at line 227 of file Worker.cxx.

229 {
230 RCU_READ_INVARIANT (this);
231 return m_inputFile.get();
232 }
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
Definition ModuleData.h:73

◆ inputFileName()

std::string EL::Worker::inputFileName ( ) const
overridevirtual

the name of the file we are reading the current tree from, without the path component

Guarantee
no-fail

Implements EL::IWorker.

Definition at line 243 of file Worker.cxx.

245 {
246 // no invariant used
247 std::string path = inputFile()->GetName();
248 auto split = path.rfind ('/');
249 if (split != std::string::npos)
250 return path.substr (split + 1);
251 else
252 return path;
253 }
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
Definition Worker.cxx:228
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179

◆ inputFileNumEntries()

Long64_t EL::Worker::inputFileNumEntries ( ) const
overrideprotectedvirtual

the number of events in the input file

Guarantee
no-fail
Precondition
inputFile() != 0

Implements EL::Detail::IInputModuleActions.

Definition at line 792 of file Worker.cxx.

794 {
795 RCU_READ_INVARIANT (this);
796 RCU_REQUIRE (inputFile() != 0);
797
798 if (m_event) {
799 return m_event->getEntries();
800 }
801 else if (m_inputTree != 0)
802 return m_inputTree->GetEntries();
803 else
804 return 0;
805 }
#define RCU_REQUIRE(x)
Definition Assert.h:203
TTree * m_inputTree
the (main) tree in the input file
Definition ModuleData.h:76
xAOD::Event * m_event
the Event object, if we use one
Definition ModuleData.h:101

◆ metaData()

const SH::MetaObject * EL::Worker::metaData ( ) const
overridevirtual

description: the sample meta-data we are working on guarantee: no-fail invariant: metaData != 0 rationale: this can be used for accessing sample meta-data

Implements EL::IWorker.

Definition at line 200 of file Worker.cxx.

202 {
203 RCU_READ_INVARIANT (this);
204 return m_metaData;
205 }

◆ openInputFile()

StatusCode EL::Worker::openInputFile ( const std::string & inputFileUrl)
overrideprotectedvirtual

open the given input file without processing it

This is mostly to allow the driver to query the number of events in the input file without processing it, usually to determine the range of events to process.

Guarantee
basic
Failures
file can't be opened

Implements EL::Detail::IInputModuleActions.

Definition at line 669 of file Worker.cxx.

671 {
672 using namespace msgEventLoop;
673
674 // Enable custom error handling in a nice way
675 RootUtils::WithRootErrorHandler my_handler( fileOpenErrorFilter );
676
678
679 if (m_inputFileUrl == inputFileUrl)
680 return ::StatusCode::SUCCESS;
681
682 if (!m_inputFileUrl.empty())
683 {
684 if (m_newInputFile == false)
685 {
686 for (auto& module : m_modules)
687 ANA_CHECK (module->onCloseInputFile (*this));
688 for (auto& module : m_modules)
689 ANA_CHECK (module->postCloseInputFile (*this));
690 }
691 m_newInputFile = false;
692 m_hasInputEvents = false;
693 m_inputTree = nullptr;
694 m_inputFile.reset ();
695 m_inputFileUrl.clear ();
696 }
697
698 if (inputFileUrl.empty())
699 return ::StatusCode::SUCCESS;
700
701 ANA_MSG_INFO ("Opening file " << inputFileUrl);
702 std::unique_ptr<TFile> inputFile;
703 try
704 {
705 inputFile = SH::openFile (inputFileUrl, *metaData());
706 } catch (...)
707 {
708 Detail::report_exception (std::current_exception());
709 }
710 if (inputFile.get() == 0)
711 {
712 ANA_MSG_ERROR ("failed to open file " << inputFileUrl);
713 for (auto& module : m_modules)
714 module->reportInputFailure (*this);
715 return ::StatusCode::FAILURE;
716 }
717 if (inputFile->IsZombie())
718 {
719 ANA_MSG_ERROR ("input file is a zombie: " << inputFileUrl);
720 for (auto& module : m_modules)
721 module->reportInputFailure (*this);
722 return ::StatusCode::FAILURE;
723 }
724
725 // Direct TTree access
726 TTree *tree = 0;
727 const std::string treeName
729 tree = dynamic_cast<TTree*>(inputFile->Get (treeName.c_str()));
730 if (tree == nullptr)
731 {
732 ANA_MSG_INFO ("tree " << treeName << " not found in input file: " << inputFileUrl);
733 ANA_MSG_INFO ("treating this like a tree with no events");
734 }
735 else {
736 m_hasInputEvents = (tree->GetEntries() > 0);
737 }
738
739 m_newInputFile = true;
741 m_inputEntry = 0;
742 m_inputFile = std::move (inputFile);
743 m_inputFileUrl = std::move (inputFileUrl);
744
745 // onFirstInputFile to setup Event object
747 {
748 for (auto& module : m_modules)
749 ANA_CHECK (module->onFirstInputFile (*this));
750 m_firstInputFile = false;
751 }
752 else {
753 for (auto& module : m_modules)
754 ANA_CHECK (module->onNextInputFile (*this));
755 }
756
757 // Check if we have input events - done above for TTree
758 if (m_inputTree == nullptr) {
759 if (m_event) m_hasInputEvents = (m_event->getEntries() > 0);
760 }
761
762 return ::StatusCode::SUCCESS;
763 }
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
Definition Worker.cxx:645
bool m_firstInputFile
whether this is the first input file
Definition Worker.h:426
bool m_newInputFile
whether this is a new input file (i.e.
Definition Worker.h:432
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
std::string m_inputFileUrl
the input file url of the currently opened file
Definition ModuleData.h:70
uint64_t m_inputEntry
the entry in the input tree we are currently looking at
Definition ModuleData.h:80
static const std::string treeName_default
the default value of treeName
Definition MetaFields.h:47
static const std::string treeName
the name of the tree in the sample
Definition MetaFields.h:44

◆ processEvents()

StatusCode EL::Worker::processEvents ( EventRange & eventRange)
overrideprotectedvirtual

process the given event range

This will update eventRange if the end is set to eof

Guarantee
basic
Failures
file can't be opened
event range exceeds length of file
processing failures

Implements EL::Detail::IInputModuleActions.

Definition at line 566 of file Worker.cxx.

568 {
569 using namespace msgEventLoop;
570
572 RCU_REQUIRE (!eventRange.m_url.empty());
573 RCU_REQUIRE (eventRange.m_beginEvent >= 0);
574 RCU_REQUIRE (eventRange.m_endEvent == EventRange::eof || eventRange.m_endEvent >= eventRange.m_beginEvent);
575
576 ANA_CHECK (openInputFile (eventRange.m_url));
577
578 if (eventRange.m_beginEvent > inputFileNumEntries())
579 {
580 ANA_MSG_ERROR ("first event (" << eventRange.m_beginEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
581 return ::StatusCode::FAILURE;
582 }
583 if (eventRange.m_endEvent == EventRange::eof)
584 {
585 eventRange.m_endEvent = inputFileNumEntries();
586 } else if (eventRange.m_endEvent > inputFileNumEntries())
587 {
588 ANA_MSG_ERROR ("end event (" << eventRange.m_endEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
589 return ::StatusCode::FAILURE;
590 }
591
592 m_inputEntry = eventRange.m_beginEvent;
593
594 if (m_algorithmsInitialized == false)
595 {
596 for (auto& module : m_modules)
597 ANA_CHECK (module->onInitialize (*this));
599 }
600
601 if (m_newInputFile)
602 {
603 m_newInputFile = false;
604 for (auto& module : m_modules)
605 ANA_CHECK (module->onNewInputFile (*this));
606 }
607
608 if (eventRange.m_beginEvent == 0)
609 {
610 for (auto& module : m_modules)
611 ANA_CHECK (module->onFileExecute (*this));
612 }
613
614 ANA_MSG_INFO ("Processing events " << eventRange.m_beginEvent << "-" << eventRange.m_endEvent << " in file " << eventRange.m_url);
615
616 for (uint64_t event = eventRange.m_beginEvent;
617 event != uint64_t (eventRange.m_endEvent);
618 ++ event)
619 {
621 for (auto& module : m_modules)
622 {
623 if (module->onExecute (*this).isFailure())
624 {
625 ANA_MSG_ERROR ("processing event " << treeEntry() << " on file " << inputFileName());
626 return ::StatusCode::FAILURE;
627 }
628 }
629 if (m_firstEvent)
630 {
631 m_firstEvent = false;
632 for (auto& module : m_modules)
633 ANA_CHECK (module->postFirstEvent (*this));
634 }
636 if (m_eventsProcessed % 10000 == 0)
637 ANA_MSG_INFO ("Processed " << m_eventsProcessed << " events");
638 }
639 return ::StatusCode::SUCCESS;
640 }
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
Definition Worker.cxx:219
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
Definition Worker.cxx:244
Long64_t inputFileNumEntries() const override
the number of events in the input file
Definition Worker.cxx:793
bool m_firstEvent
whether we are still to process the first event
Definition Worker.h:452
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Definition EventRange.h:34

◆ processInputs()

StatusCode EL::Worker::processInputs ( )
protected

process all the inputs

This method ought to be called after initialize and before finalize. It will rely on the defined modules to steer it to the files and events it ought to process.

Definition at line 498 of file Worker.cxx.

500 {
501 using namespace msgEventLoop;
502
504
505 for (auto& module : m_modules)
506 ANA_CHECK (module->processInputs (*this, *this));
507
508 return ::StatusCode::SUCCESS;
509 }

◆ setFilterPassed()

void EL::Worker::setFilterPassed ( bool val_filterPassed)
finaloverridevirtualnoexcept

set the value of filterPassed

Guarantee
no-fail

Implements EL::IFilterWorker.

Definition at line 322 of file Worker.cxx.

324 {
326 m_skipEvent = !val_filterPassed;
327 }

◆ setJobConfig()

void EL::Worker::setJobConfig ( JobConfig && jobConfig)
protected

set the JobConfig

This takes care of adding the algorithms, etc. (only algorithms for now, 03 Feb 17).

Note the rvalue calling convention here: Algorithms are objects that get modified, so if you use them more than once you need to copy/clone them. However, in practice no driver should need that (though many do for now, 03 Feb 17), as drivers generally stream the JobConfig in for one-time use.

Guarantee
basic
Failures
job configuration/streaming errors

Definition at line 372 of file Worker.cxx.

374 {
376 for (auto& alg : jobConfig.extractAlgorithms())
377 {
378 m_algs.push_back (std::move (alg));
379 }
380 }

◆ setMetaData()

void EL::Worker::setMetaData ( const SH::MetaObject * val_metaData)
protected

set the metaData

Guarantee
no-fail

Definition at line 341 of file Worker.cxx.

343 {
345 RCU_REQUIRE (val_metaData != 0);
346
347 m_metaData = val_metaData;
348 }

◆ setOutputHist()

void EL::Worker::setOutputHist ( const std::string & val_outputTarget)
protected

set the histogram output list

Guarantee
no-fail

Definition at line 352 of file Worker.cxx.

354 {
356
357 m_outputTarget = val_outputTarget;
358 }

◆ setSegmentName()

void EL::Worker::setSegmentName ( const std::string & val_segmentName)
protected

set the segment name

Guarantee
no-fail

Definition at line 362 of file Worker.cxx.

364 {
366
367 m_segmentName = val_segmentName;
368 }

◆ skipEvent()

void EL::Worker::skipEvent ( )
overridevirtual

effects: skip the current event, i.e.

skip the rest of the algorithms for this event guarantee: no-fail rationale: if you have an analysis strategy in which you divide work into multiple algorithms you may want to have dedicated algorithms for event selection that then skip later algorithms that fill histograms

Implements EL::IWorker.

Definition at line 304 of file Worker.cxx.

306 {
308 m_skipEvent = true;
309 }

◆ testInvariant()

void EL::Worker::testInvariant ( ) const

effects: test the invariant of this object guarantee: no-fail

Definition at line 68 of file Worker.cxx.

70 {
71 RCU_INVARIANT (this != nullptr);
72 for (std::size_t iter = 0, end = m_algs.size(); iter != end; ++ iter)
73 {
74 RCU_INVARIANT (m_algs[iter].m_algorithm != nullptr);
75 }
76 }
#define RCU_INVARIANT(x)
Definition Assert.h:196

◆ tree()

TTree * EL::Worker::tree ( ) const
overridevirtual

description: the tree we are running on guarantee: no-fail

Implements EL::IWorker.

Definition at line 209 of file Worker.cxx.

211 {
212 RCU_READ_INVARIANT (this);
213 return m_inputTree;
214 }

◆ treeEntry()

Long64_t EL::Worker::treeEntry ( ) const
overridevirtual

description: the entry in the tree we are reading guarantee: no-fail

Implements EL::IWorker.

Definition at line 218 of file Worker.cxx.

220 {
221 RCU_READ_INVARIANT (this);
222 return m_inputEntry;
223 }

◆ triggerConfig()

TTree * EL::Worker::triggerConfig ( ) const
overridevirtual

description: the trigger config tree from the input file, or NULL if we did not find it guarantee: strong failures: i/o errors

Implements EL::IWorker.

Definition at line 257 of file Worker.cxx.

259 {
260 RCU_READ_INVARIANT (this);
261 return dynamic_cast<TTree*>(inputFile()->Get("physicsMeta/TrigConfTree"));
262 }

◆ xaodEvent()

xAOD::Event * EL::Worker::xaodEvent ( ) const
overridevirtual

description: the xAOD event and store guarantee: strong failures: out of memory I failures: EventSvc not configured postcondition: result != 0

Implements EL::IWorker.

Definition at line 266 of file Worker.cxx.

268 {
269 RCU_READ_INVARIANT (this);
270
271 if (m_event == nullptr)
272 RCU_THROW_MSG ("Job not configured for xAOD support");
273 return m_event;
274 }

◆ xaodStore()

xAOD::TStore * EL::Worker::xaodStore ( ) const
overridevirtual

Implements EL::IWorker.

Definition at line 278 of file Worker.cxx.

280 {
281 RCU_READ_INVARIANT (this);
282
283 if (m_tstore == nullptr)
284 RCU_THROW_MSG ("Job not configured for xAOD support");
285 return m_tstore;
286 }
xAOD::TStore * m_tstore
the TStore structure, if we use one
Definition ModuleData.h:104

Member Data Documentation

◆ m_algorithmsInitialized

bool EL::Worker::m_algorithmsInitialized {false}
private

whether the algorithms are initialized

Definition at line 447 of file Worker.h.

447{false};

◆ m_algs

std::vector<Detail::AlgorithmData> EL::Detail::ModuleData::m_algs
inherited

the list of algorithms

Definition at line 67 of file ModuleData.h.

◆ m_batchJob

BatchJob* EL::Detail::ModuleData::m_batchJob = nullptr
inherited

the BatchJob configuration (if used)

Definition at line 116 of file ModuleData.h.

◆ m_event

xAOD::Event* EL::Detail::ModuleData::m_event {nullptr}
inherited

the Event object, if we use one

Definition at line 101 of file ModuleData.h.

101{nullptr};

◆ m_eventsProcessed

uint64_t EL::Detail::ModuleData::m_eventsProcessed {0}
inherited

the number of events that have been processed

Definition at line 92 of file ModuleData.h.

92{0};

◆ m_evtStore

asg::SgEvent* EL::Detail::ModuleData::m_evtStore {nullptr}
inherited

the SgEvent structure, if we use one

Definition at line 107 of file ModuleData.h.

107{nullptr};

◆ m_firstEvent

bool EL::Worker::m_firstEvent {true}
private

whether we are still to process the first event

Definition at line 452 of file Worker.h.

452{true};

◆ m_firstInputFile

bool EL::Worker::m_firstInputFile {true}
private

whether this is the first input file

Definition at line 426 of file Worker.h.

426{true};

◆ m_hasInputEvents

bool EL::Detail::ModuleData::m_hasInputEvents = false
inherited

flag whether the most recently opened input file has events or not

Definition at line 83 of file ModuleData.h.

◆ m_histOutput

OutputStreamData* EL::Detail::ModuleData::m_histOutput {nullptr}
inherited

the histogram output stream

Definition at line 95 of file ModuleData.h.

95{nullptr};

◆ m_inputEntry

uint64_t EL::Detail::ModuleData::m_inputEntry {0}
inherited

the entry in the input tree we are currently looking at

Definition at line 80 of file ModuleData.h.

80{0};

◆ m_inputFile

std::unique_ptr<TFile> EL::Detail::ModuleData::m_inputFile
inherited

the input file pointer of the currently opened filed

Definition at line 73 of file ModuleData.h.

◆ m_inputFileUrl

std::string EL::Detail::ModuleData::m_inputFileUrl
inherited

the input file url of the currently opened file

Definition at line 70 of file ModuleData.h.

◆ m_inputTree

TTree* EL::Detail::ModuleData::m_inputTree {nullptr}
inherited

the (main) tree in the input file

Definition at line 76 of file ModuleData.h.

76{nullptr};

◆ m_jobStats

std::unique_ptr<TTree> EL::Detail::ModuleData::m_jobStats
inherited

Tree saving per-job statistics information.

Definition at line 98 of file ModuleData.h.

◆ m_metaData

const SH::MetaObject* EL::Detail::ModuleData::m_metaData {nullptr}
inherited

the meta-data we use

Definition at line 89 of file ModuleData.h.

89{nullptr};

◆ m_moduleConfig

std::vector<asg::AsgComponentConfig> EL::Worker::m_moduleConfig
private

the module configurations we use

Definition at line 457 of file Worker.h.

◆ m_modules

std::vector<std::unique_ptr<Detail::Module> > EL::Worker::m_modules
private

the list of modules we hold

Definition at line 421 of file Worker.h.

◆ m_newInputFile

bool EL::Worker::m_newInputFile {false}
private

whether this is a new input file (i.e.

one that has not yet been connected to the algorithms)

Definition at line 432 of file Worker.h.

432{false};

◆ m_outputHistMap

std::map<std::string,TH1*> EL::Worker::m_outputHistMap
private

Definition at line 409 of file Worker.h.

◆ m_outputs

std::map<std::string,std::shared_ptr<Detail::OutputStreamData> > EL::Detail::ModuleData::m_outputs
inherited

the list of output files

Definition at line 113 of file ModuleData.h.

◆ m_outputTarget

std::string EL::Worker::m_outputTarget
private

the target file to which we will write the histogram output

Definition at line 437 of file Worker.h.

◆ m_outputTreeMap

std::map<std::pair<std::string,std::string>,TTree*> EL::Worker::m_outputTreeMap
private

Definition at line 416 of file Worker.h.

◆ m_segmentName

std::string EL::Worker::m_segmentName
private

the name of the segment we are processing

Definition at line 442 of file Worker.h.

◆ m_skipEvent

bool EL::Detail::ModuleData::m_skipEvent = false
inherited

whether we are skipping the current event

Definition at line 86 of file ModuleData.h.

◆ m_tstore

xAOD::TStore* EL::Detail::ModuleData::m_tstore {nullptr}
inherited

the TStore structure, if we use one

Definition at line 104 of file ModuleData.h.

104{nullptr};

◆ m_worker

Worker* EL::Detail::ModuleData::m_worker {nullptr}
inherited

the worker (to pass on to the algorithms)

Definition at line 110 of file ModuleData.h.

110{nullptr};

The documentation for this class was generated from the following files: