ATLAS Offline Software
Worker.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 
8 //
9 // includes
10 //
11 
12 #include <EventLoop/Worker.h>
13 
19 #include <EventLoop/BatchJob.h>
20 #include <EventLoop/BatchSample.h>
21 #include <EventLoop/BatchSegment.h>
23 #include <EventLoop/Driver.h>
25 #include <EventLoop/EventRange.h>
29 #include <EventLoop/Job.h>
32 #include <EventLoop/MessageCheck.h>
33 #include <EventLoop/OutputStream.h>
35 #include <EventLoop/StatusCode.h>
38 #include <EventLoop/TEventModule.h>
40 #include <RootCoreUtils/Assert.h>
42 #include <RootCoreUtils/ThrowMsg.h>
48 #include <SampleHandler/Sample.h>
51 #include <TFile.h>
52 #include <TH1.h>
53 #include <TROOT.h>
54 #include <TSystem.h>
55 #include <TTree.h>
56 #include <TObjString.h>
57 #include <fstream>
58 #include <memory>
59 #include <exception>
60 
61 //
62 // method implementations
63 //
64 
65 namespace EL
66 {
67  void Worker ::
68  testInvariant () const
69  {
70  RCU_INVARIANT (this != nullptr);
71  for (std::size_t iter = 0, end = m_algs.size(); iter != end; ++ iter)
72  {
73  RCU_INVARIANT (m_algs[iter].m_algorithm != nullptr);
74  }
75  }
76 
77 
78 
80  ~Worker ()
81  {
82  RCU_DESTROY_INVARIANT (this);
83  }
84 
85 
86 
87  void Worker ::
88  addOutput (TObject *output_swallow)
89  {
90  std::unique_ptr<TObject> output (output_swallow);
91 
92  RCU_CHANGE_INVARIANT (this);
93  RCU_REQUIRE_SOFT (output_swallow != 0);
94 
95  RCU::SetDirectory (output_swallow, 0);
96  ModuleData::addOutput (std::move (output));
97  }
98 
99 
100 
102  addOutputList (const std::string& name, TObject *output_swallow)
103  {
104  std::unique_ptr<TObject> output (output_swallow);
105 
106  RCU_CHANGE_INVARIANT (this);
107  RCU_REQUIRE_SOFT (output_swallow != 0);
108 
109  RCU::SetDirectory (output_swallow, 0);
110  std::unique_ptr<TList> list (new TList);
111  list->SetName (name.c_str());
112  list->Add (output.release());
113  addOutput (list.release());
114  }
115 
116 
117 
118  TObject *Worker ::
119  getOutputHist (const std::string& name) const
120  {
121  RCU_READ_INVARIANT (this);
122 
123  TObject *result = m_histOutput->getOutputHist (name);
124  if (result == nullptr) RCU_THROW_MSG ("unknown output histogram: " + name);
125  return result;
126  }
127 
128 
129 
130  TFile *Worker ::
131  getOutputFile (const std::string& label) const
132  {
133  RCU_READ_INVARIANT (this);
134  TFile *result = getOutputFileNull (label);
135  if (result == 0)
136  RCU_THROW_MSG ("no output dataset defined with label: " + label);
137  return result;
138  }
139 
140 
141 
142  TFile *Worker ::
143  getOutputFileNull (const std::string& label) const
144  {
145  RCU_READ_INVARIANT (this);
146  auto iter = m_outputs.find (label);
147  if (iter == m_outputs.end())
148  return 0;
149  return iter->second.file();
150  }
151 
152 
153 
155  addTree( const TTree& tree, const std::string& stream )
156  {
157  using namespace msgEventLoop;
158  RCU_READ_INVARIANT( this );
159 
160  auto outputIter = m_outputs.find (stream);
161  if (outputIter == m_outputs.end())
162  {
163  ANA_MSG_ERROR ( "No output file with stream name \"" + stream +
164  "\" found" );
165  return ::StatusCode::FAILURE;
166  }
167 
168  outputIter->second.addClone (tree);
169 
170  // Return gracefully:
171  return ::StatusCode::SUCCESS;
172  }
173 
174 
175 
176  TTree *Worker::
177  getOutputTree( const std::string& name, const std::string& stream ) const
178  {
179  using namespace msgEventLoop;
180  RCU_READ_INVARIANT( this );
181 
182  auto outputIter = m_outputs.find (stream);
183  if (outputIter == m_outputs.end())
184  {
185  RCU_THROW_MSG ( "No output file with stream name \"" + stream
186  + "\" found" );
187  }
188 
189  TTree *result = outputIter->second.getOutputTree( name );
190  if( result == nullptr ) {
191  RCU_THROW_MSG ( "No tree with name \"" + name + "\" in stream \"" +
192  stream + "\"" );
193  }
194  return result;
195  }
196 
197 
198 
200  metaData () const
201  {
202  RCU_READ_INVARIANT (this);
203  return m_metaData;
204  }
205 
206 
207 
208  TTree *Worker ::
209  tree () const
210  {
211  RCU_READ_INVARIANT (this);
212  return m_inputTree;
213  }
214 
215 
216 
217  Long64_t Worker ::
218  treeEntry () const
219  {
220  RCU_READ_INVARIANT (this);
221  return m_inputTreeEntry;
222  }
223 
224 
225 
226  TFile *Worker ::
227  inputFile () const
228  {
229  RCU_READ_INVARIANT (this);
230  return m_inputFile.get();
231  }
232 
233 
234 
235  std::string Worker ::
236  inputFileName () const
237  {
238  // no invariant used
239  std::string path = inputFile()->GetName();
240  auto split = path.rfind ('/');
241  if (split != std::string::npos)
242  return path.substr (split + 1);
243  else
244  return path;
245  }
246 
247 
248 
249  TTree *Worker ::
250  triggerConfig () const
251  {
252  RCU_READ_INVARIANT (this);
253  return dynamic_cast<TTree*>(inputFile()->Get("physicsMeta/TrigConfTree"));
254  }
255 
256 
257 
259  xaodEvent () const
260  {
261  RCU_READ_INVARIANT (this);
262 
263  if (m_tevent == nullptr)
264  RCU_THROW_MSG ("Job not configured for xAOD support");
265  return m_tevent;
266  }
267 
268 
269 
271  xaodStore () const
272  {
273  RCU_READ_INVARIANT (this);
274 
275  if (m_tstore == nullptr)
276  RCU_THROW_MSG ("Job not configured for xAOD support");
277  return m_tstore;
278  }
279 
280 
281 
283  getAlg (const std::string& name) const
284  {
285  RCU_READ_INVARIANT (this);
286  for (auto& alg : m_algs)
287  {
288  if (alg->hasName (name))
289  return alg.m_algorithm->getLegacyAlg();
290  }
291  return 0;
292  }
293 
294 
295 
297  skipEvent ()
298  {
299  RCU_CHANGE_INVARIANT (this);
300  m_skipEvent = true;
301  }
302 
303 
304 
306  filterPassed () const noexcept
307  {
308  RCU_READ_INVARIANT (this);
309  return !m_skipEvent;
310  }
311 
312 
313 
315  setFilterPassed (bool val_filterPassed) noexcept
316  {
317  RCU_CHANGE_INVARIANT (this);
318  m_skipEvent = !val_filterPassed;
319  }
320 
321 
322 
324  Worker ()
325  {
326  m_worker = this;
327 
328  RCU_NEW_INVARIANT (this);
329  }
330 
331 
332 
334  setMetaData (const SH::MetaObject *val_metaData)
335  {
336  RCU_CHANGE_INVARIANT (this);
337  RCU_REQUIRE (val_metaData != 0);
338 
339  m_metaData = val_metaData;
340  }
341 
342 
343 
345  setOutputHist (const std::string& val_outputTarget)
346  {
347  RCU_CHANGE_INVARIANT (this);
348 
349  m_outputTarget = val_outputTarget;
350  }
351 
352 
353 
355  setSegmentName (const std::string& val_segmentName)
356  {
357  RCU_CHANGE_INVARIANT (this);
358 
359  m_segmentName = val_segmentName;
360  }
361 
362 
363 
366  {
367  RCU_CHANGE_INVARIANT (this);
368  for (std::unique_ptr<IAlgorithmWrapper>& alg : jobConfig.extractAlgorithms())
369  {
370  m_algs.push_back (std::move (alg));
371  }
372  }
373 
374 
375 
377  initialize ()
378  {
379  using namespace msgEventLoop;
380  RCU_CHANGE_INVARIANT (this);
381 
382  const bool xAODInput = m_metaData->castBool (Job::optXAODInput, false);
383 
384  ANA_MSG_INFO ("xAODInput = " << xAODInput);
385  if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
386  m_modules.push_back (std::make_unique<Detail::MemoryMonitorModule> ("EarlyMemoryMonitorModule"));
387  if (xAODInput)
388  m_modules.push_back (std::make_unique<Detail::TEventModule> ("TEventModule"));
389  auto factoryPreload = metaData()->castString (Job::optFactoryPreload, "");
390  if (!factoryPreload.empty())
391  {
392  auto module = std::make_unique<Detail::FactoryPreloadModule> ("FactoryPreloadModule");
393  module->preloader = factoryPreload;
394  m_modules.push_back (std::move (module));
395  }
396  m_modules.push_back (std::make_unique<Detail::LeakCheckModule> ("LeakCheckModule"));
397  m_modules.push_back (std::make_unique<Detail::StopwatchModule> ("StopwatchModule"));
398  if (metaData()->castBool (Job::optGridReporting, false))
399  m_modules.push_back (std::make_unique<Detail::GridReportingModule>("GridReportingModule"));
400  if (metaData()->castBool (Job::optAlgorithmTimer, false))
401  m_modules.push_back (std::make_unique<Detail::AlgorithmTimerModule> ("AlgorithmTimerModule"));
402  if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
403  m_modules.push_back (std::make_unique<Detail::AlgorithmMemoryModule> ("AlgorithmMemoryModule"));
404  m_modules.push_back (std::make_unique<Detail::FileExecutedModule> ("FileExecutedModule"));
405  m_modules.push_back (std::make_unique<Detail::EventCountModule> ("EventCountModule"));
406  m_modules.push_back (std::make_unique<Detail::WorkerConfigModule> ("WorkerConfigModule"));
407  m_modules.push_back (std::make_unique<Detail::AlgorithmStateModule> ("AlgorithmStateModule"));
408  m_modules.push_back (std::make_unique<Detail::PostClosedOutputsModule> ("PostClosedOutputsModule"));
409  if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
410  m_modules.push_back (std::make_unique<Detail::MemoryMonitorModule> ("LateMemoryMonitorModule"));
411 
412  if (m_outputs.find (Job::histogramStreamName) == m_outputs.end())
413  {
415  m_outputTarget + "/hist-" + m_segmentName + ".root", "RECREATE"};
417  }
420 
421  m_jobStats = std::make_unique<TTree>
422  ("EventLoop_JobStats", "EventLoop job statistics");
423  m_jobStats->SetDirectory (nullptr);
424 
425  ANA_MSG_INFO ("calling firstInitialize on all modules");
426  for (auto& module : m_modules)
427  ANA_CHECK (module->firstInitialize (*this));
428  ANA_MSG_INFO ("calling preFileInitialize on all modules");
429  for (auto& module : m_modules)
430  ANA_CHECK (module->preFileInitialize (*this));
431 
432  return ::StatusCode::SUCCESS;
433  }
434 
435 
436 
439  {
440  using namespace msgEventLoop;
441 
442  RCU_CHANGE_INVARIANT (this);
443 
444  for (auto& module : m_modules)
445  ANA_CHECK (module->processInputs (*this, *this));
446  return ::StatusCode::SUCCESS;
447  }
448 
449 
450 
452  finalize ()
453  {
454  using namespace msgEventLoop;
455 
456  RCU_CHANGE_INVARIANT (this);
457 
458  if (m_algorithmsInitialized == false)
459  {
460  ANA_MSG_ERROR ("algorithms never got initialized");
461  return StatusCode::FAILURE;
462  }
463 
464  ANA_CHECK (openInputFile (""));
465  for (auto& module : m_modules)
466  ANA_CHECK (module->onFinalize (*this));
467  for (auto& output : m_outputs)
468  {
469  if (output.first != Job::histogramStreamName)
470  {
471  output.second.saveOutput ();
472  output.second.close ();
473  std::string path = output.second.finalFileName ();
474  if (!path.empty())
475  addOutputList ("EventLoop_OutputStream_" + output.first, new TObjString (path.c_str()));
476  }
477  }
478  for (auto& module : m_modules)
479  ANA_CHECK (module->postFinalize (*this));
480  if (m_jobStats->GetListOfBranches()->GetEntries() > 0)
481  {
482  if (m_jobStats->Fill() <= 0)
483  {
484  ANA_MSG_ERROR ("failed to fill the job statistics tree");
485  return ::StatusCode::FAILURE;
486  }
487  ModuleData::addOutput (std::move (m_jobStats));
488  }
490  for (auto& module : m_modules)
491  ANA_CHECK (module->onWorkerEnd (*this));
493  m_histOutput->close ();
494 
495  for (auto& module : m_modules){
496  ANA_CHECK (module->postFileClose(*this));
497  }
498  ANA_MSG_INFO ("worker finished successfully");
499  return ::StatusCode::SUCCESS;
500  }
501 
502 
503 
505  processEvents (EventRange& eventRange)
506  {
507  using namespace msgEventLoop;
508 
509  RCU_CHANGE_INVARIANT (this);
510  RCU_REQUIRE (!eventRange.m_url.empty());
511  RCU_REQUIRE (eventRange.m_beginEvent >= 0);
512  RCU_REQUIRE (eventRange.m_endEvent == EventRange::eof || eventRange.m_endEvent >= eventRange.m_beginEvent);
513 
514  ANA_CHECK (openInputFile (eventRange.m_url));
515 
516  if (eventRange.m_beginEvent > inputFileNumEntries())
517  {
518  ANA_MSG_ERROR ("first event (" << eventRange.m_beginEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
519  return ::StatusCode::FAILURE;
520  }
521  if (eventRange.m_endEvent == EventRange::eof)
522  {
523  eventRange.m_endEvent = inputFileNumEntries();
524  } else if (eventRange.m_endEvent > inputFileNumEntries())
525  {
526  ANA_MSG_ERROR ("end event (" << eventRange.m_endEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
527  return ::StatusCode::FAILURE;
528  }
529 
530  m_inputTreeEntry = eventRange.m_beginEvent;
531 
532  if (m_algorithmsInitialized == false)
533  {
534  for (auto& module : m_modules)
535  ANA_CHECK (module->onInitialize (*this));
537  }
538 
539  if (m_newInputFile)
540  {
541  m_newInputFile = false;
542  for (auto& module : m_modules)
543  ANA_CHECK (module->onNewInputFile (*this));
544  }
545 
546  if (eventRange.m_beginEvent == 0)
547  {
548  for (auto& module : m_modules)
549  ANA_CHECK (module->onFileExecute (*this));
550  }
551 
552  ANA_MSG_INFO ("Processing events " << eventRange.m_beginEvent << "-" << eventRange.m_endEvent << " in file " << eventRange.m_url);
553 
554  for (uint64_t event = eventRange.m_beginEvent;
555  event != uint64_t (eventRange.m_endEvent);
556  ++ event)
557  {
559  for (auto& module : m_modules)
560  {
561  if (module->onExecute (*this).isFailure())
562  {
563  ANA_MSG_ERROR ("processing event " << treeEntry() << " on file " << inputFileName());
564  return ::StatusCode::FAILURE;
565  }
566  }
567  if (m_firstEvent)
568  {
569  m_firstEvent = false;
570  for (auto& module : m_modules)
571  ANA_CHECK (module->postFirstEvent (*this));
572  }
573  m_eventsProcessed += 1;
574  if (m_eventsProcessed % 10000 == 0)
575  ANA_MSG_INFO ("Processed " << m_eventsProcessed << " events");
576  }
577  return ::StatusCode::SUCCESS;
578  }
579 
580 
581 
583  fileOpenErrorFilter(int level, bool, const char*, const char *)
584  {
585  // For messages above warning level (SysError, Error, Fatal)
586  if( level > kWarning ) {
587  // We won't output further; ROOT should have already put something in the log file
588  throw std::runtime_error("ROOT error detected");
589 
590  // No need for further error handling
591  return false;
592  }
593 
594  // Pass to the default error handlers
595  return true;
596  }
597 
599  openInputFile (const std::string& inputFileUrl)
600  {
601  using namespace msgEventLoop;
602 
603  // Enable custom error handling in a nice way
605 
606  RCU_CHANGE_INVARIANT (this);
607 
608  if (m_inputFileUrl == inputFileUrl)
609  return ::StatusCode::SUCCESS;
610 
611  if (!m_inputFileUrl.empty())
612  {
613  if (m_newInputFile == false)
614  {
615  for (auto& module : m_modules)
616  ANA_CHECK (module->onCloseInputFile (*this));
617  for (auto& module : m_modules)
618  ANA_CHECK (module->postCloseInputFile (*this));
619  }
620  m_newInputFile = false;
621  m_inputTree = nullptr;
622  m_inputFile.reset ();
623  m_inputFileUrl.clear ();
624  }
625 
626  if (inputFileUrl.empty())
627  return ::StatusCode::SUCCESS;
628 
629  ANA_MSG_INFO ("Opening file " << inputFileUrl);
630  std::unique_ptr<TFile> inputFile;
631  try
632  {
633  inputFile = SH::openFile (inputFileUrl, *metaData());
634  } catch (...)
635  {
636  Detail::report_exception (std::current_exception());
637  }
638  if (inputFile.get() == 0)
639  {
640  ANA_MSG_ERROR ("failed to open file " << inputFileUrl);
641  for (auto& module : m_modules)
642  module->reportInputFailure (*this);
643  return ::StatusCode::FAILURE;
644  }
645  if (inputFile->IsZombie())
646  {
647  ANA_MSG_ERROR ("input file is a zombie: " << inputFileUrl);
648  for (auto& module : m_modules)
649  module->reportInputFailure (*this);
650  return ::StatusCode::FAILURE;
651  }
652 
653  TTree *tree = 0;
654  const std::string treeName
656  tree = dynamic_cast<TTree*>(inputFile->Get (treeName.c_str()));
657  if (tree == nullptr)
658  {
659  ANA_MSG_INFO ("tree " << treeName << " not found in input file: " << inputFileUrl);
660  ANA_MSG_INFO ("treating this like a tree with no events");
661  }
662 
663  m_newInputFile = true;
664  m_inputTree = tree;
665  m_inputTreeEntry = 0;
666  m_inputFile = std::move (inputFile);
667  m_inputFileUrl = std::move (inputFileUrl);
668 
669  return ::StatusCode::SUCCESS;
670  }
671 
672 
673 
675  addOutputStream (const std::string& label,
677  {
678  using namespace msgEventLoop;
679  RCU_CHANGE_INVARIANT (this);
680 
681  if (m_outputs.find (label) != m_outputs.end())
682  {
683  ANA_MSG_ERROR ("output file already defined for label: " + label);
684  return ::StatusCode::FAILURE;
685  }
686  if (data.file() == nullptr)
687  {
688  ANA_MSG_ERROR ("output stream does not have a file attached");
689  return ::StatusCode::FAILURE;
690  }
691  m_outputs.insert (std::make_pair (label, std::move (data)));
692  return ::StatusCode::SUCCESS;
693  }
694 
695 
696 
697  Long64_t Worker ::
698  inputFileNumEntries () const
699  {
700  RCU_READ_INVARIANT (this);
701  RCU_REQUIRE (inputFile() != 0);
702 
703  if (m_inputTree != 0)
704  return m_inputTree->GetEntries();
705  else
706  return 0;
707  }
708 
709 
710 
712  eventsProcessed () const noexcept
713  {
714  RCU_READ_INVARIANT (this);
715  return m_eventsProcessed;
716  }
717 
718 
719 
721  addModule (std::unique_ptr<Detail::Module> module)
722  {
723  RCU_CHANGE_INVARIANT (this);
724  RCU_REQUIRE (module != nullptr);
725  m_modules.push_back (std::move (module));
726  }
727 
728 
729 
731  directExecute (const SH::SamplePtr& sample, const Job& job,
732  const std::string& location, const SH::MetaObject& options)
733  {
734  using namespace msgEventLoop;
735  RCU_CHANGE_INVARIANT (this);
736 
737  SH::MetaObject meta (*sample->meta());
738  meta.fetchDefaults (options);
739 
740  setMetaData (&meta);
742  setSegmentName (sample->name());
743 
744  ANA_MSG_INFO ("Running sample: " << sample->name());
745 
746  setJobConfig (JobConfig (job.jobConfig()));
747 
748  for (Job::outputIter out = job.outputBegin(),
749  end = job.outputEnd(); out != end; ++ out)
750  {
752  out->output()->makeWriter (sample->name(), "", ".root")};
753  ANA_CHECK (addOutputStream (out->label(), std::move (data)));
754  }
755 
756  {
757  auto module = std::make_unique<Detail::DirectInputModule> ("DirectInputModule");
758  module->fileList = sample->makeFileList();
759  Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
760  if (maxEvents != -1)
761  module->maxEvents = maxEvents;
763  if (skipEvents != 0)
764  module->skipEvents = skipEvents;
765  addModule (std::move (module));
766  }
767 
768  ANA_CHECK (initialize ());
770  ANA_CHECK (finalize ());
771  return ::StatusCode::SUCCESS;
772  }
773 
774 
775 
777  batchExecute (unsigned job_id, const char *confFile)
778  {
779  using namespace msgEventLoop;
780  RCU_CHANGE_INVARIANT (this);
781 
782  try
783  {
784  std::unique_ptr<TFile> file (TFile::Open (confFile, "READ"));
785  if (file.get() == nullptr || file->IsZombie())
786  {
787  ANA_MSG_ERROR ("failed to open file: " << confFile);
788  return ::StatusCode::FAILURE;
789  }
790 
791  std::unique_ptr<BatchJob> job (dynamic_cast<BatchJob*>(file->Get ("job")));
792  if (job.get() == nullptr)
793  {
794  ANA_MSG_ERROR ("failed to retrieve BatchJob object");
795  return ::StatusCode::FAILURE;
796  }
797 
798  if (job_id >= job->segments.size())
799  {
800  ANA_MSG_ERROR ("invalid job-id " << job_id << ", max is " << job->segments.size());
801  return ::StatusCode::FAILURE;
802  }
803  BatchSegment *segment = &job->segments[job_id];
804  RCU_ASSERT (segment->job_id == job_id);
805  RCU_ASSERT (segment->sample < job->samples.size());
806  BatchSample *sample = &job->samples[segment->sample];
807 
808  gSystem->Exec ("pwd");
809  gSystem->MakeDirectory ("output");
810 
811  setMetaData (&sample->meta);
812  setOutputHist (job->location + "/fetch");
813  setSegmentName (segment->fullName);
814 
815  setJobConfig (JobConfig (job->job.jobConfig()));
816 
817  for (Job::outputIter out = job->job.outputBegin(),
818  end = job->job.outputEnd(); out != end; ++ out)
819  {
821  out->output()->makeWriter (segment->sampleName, segment->segmentName, ".root")};
822  ANA_CHECK (addOutputStream (out->label(), std::move (data)));
823  }
824 
825  {
826  auto module = std::make_unique<Detail::BatchInputModule> ("BatchInputModule");
827  Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
828  if (maxEvents != -1)
829  module->maxEvents = maxEvents;
830  module->sample = sample;
831  module->segment = segment;
832  addModule (std::move (module));
833  }
834 
835  ANA_CHECK (initialize ());
837  ANA_CHECK (finalize ());
838 
839  std::ostringstream job_name;
840  job_name << job_id;
841  std::ofstream completed ((job->location + "/status/completed-" + job_name.str()).c_str());
842  return ::StatusCode::SUCCESS;
843  } catch (...)
844  {
845  Detail::report_exception (std::current_exception());
846  return ::StatusCode::FAILURE;
847  }
848  }
849 
850 
851 
853  gridExecute (const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
854  {
855  using namespace msgEventLoop;
856  RCU_CHANGE_INVARIANT (this);
857 
858  ANA_MSG_INFO ("Running with ROOT version " << gROOT->GetVersion()
859  << " (" << gROOT->GetVersionDate() << ")");
860 
861  ANA_MSG_INFO ("Loading EventLoop grid job");
862 
863 
864  TList bigOutputs;
865  std::unique_ptr<JobConfig> jobConfig;
866  SH::MetaObject *mo = 0;
867 
868  std::unique_ptr<TFile> f (TFile::Open("jobdef.root"));
869  if (f == nullptr || f->IsZombie()) {
870  ANA_MSG_ERROR ("Could not read jobdef");
871  return ::StatusCode::FAILURE;
872  }
873 
874  mo = dynamic_cast<SH::MetaObject*>(f->Get(sampleName.c_str()));
875  if (!mo)
876  mo = dynamic_cast<SH::MetaObject*>(f->Get("defaultMetaObject"));
877  if (!mo) {
878  ANA_MSG_ERROR ("Could not read in sample meta object");
879  return ::StatusCode::FAILURE;
880  }
881 
882  jobConfig.reset (dynamic_cast<JobConfig*>(f->Get("jobConfig")));
883  if (jobConfig == nullptr)
884  {
885  ANA_MSG_ERROR ("failed to read jobConfig object");
886  return ::StatusCode::FAILURE;
887  }
888 
889  {
890  std::unique_ptr<TList> outs ((TList*)f->Get("outputs"));
891  if (outs == nullptr)
892  {
893  ANA_MSG_ERROR ("Could not read list of outputs");
894  return ::StatusCode::FAILURE;
895  }
896 
897  TIter itr(outs.get());
898  TObject *obj = 0;
899  while ((obj = itr())) {
900  EL::OutputStream * out = dynamic_cast<EL::OutputStream*>(obj);
901  if (out) {
902  bigOutputs.Add(out);
903  }
904  else {
905  ANA_MSG_ERROR ("Encountered unexpected entry in list of outputs");
906  return ::StatusCode::FAILURE;
907  }
908  }
909  }
910 
911  f->Close();
912  f.reset ();
913 
914  const std::string location = ".";
915 
916  mo->setBool (Job::optGridReporting, true);
917  setMetaData (mo);
919  setSegmentName ("output");
920 
921  ANA_MSG_INFO ("Starting EventLoop Grid worker");
922 
923  {//Create and register the "big" output files with base class
924  TIter itr(&bigOutputs);
925  TObject *obj = 0;
926  while ((obj = itr())) {
927  EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
928  if (os == nullptr)
929  {
930  ANA_MSG_ERROR ("Bad input");
931  return ::StatusCode::FAILURE;
932  }
933  {
935  location + "/" + os->label() + ".root", "RECREATE"};
936  ANA_CHECK (addOutputStream (os->label(), std::move (data)));
937  }
938  }
939  }
940 
941  setJobConfig (std::move (*jobConfig));
942 
943  {
944  auto module = std::make_unique<Detail::DirectInputModule> ("DirectInputModule");
945  std::ifstream infile("input.txt");
946  while (infile) {
947  std::string sLine;
948  if (!getline(infile, sLine)) break;
949  std::istringstream ssLine(sLine);
950  while (ssLine) {
951  std::string sFile;
952  if (!getline(ssLine, sFile, ',')) break;
953  module->fileList.push_back(sFile);
954  }
955  }
956  if (module->fileList.size() == 0) {
957  ANA_MSG_ERROR ("no input files provided");
958  //User was expecting input after all.
959  gSystem->Exit(EC_BADINPUT);
960  }
961 
962  if (nEventsPerJob != -1)
963  module->maxEvents = nEventsPerJob;
964  if (SkipEvents != 0)
965  module->skipEvents = SkipEvents;
966  addModule (std::move (module));
967  }
968 
969  ANA_CHECK (initialize());
971  ANA_CHECK (finalize ());
972 
973  int nEvents = eventsProcessed();
974  ANA_MSG_INFO ("Loop finished.");
975  ANA_MSG_INFO ("Read/processed " << nEvents << " events.");
976 
977  ANA_MSG_INFO ("EventLoop Grid worker finished");
978  ANA_MSG_INFO ("Saving output");
979  return ::StatusCode::SUCCESS;
980  }
981 }
EL::Worker::finalize
::StatusCode finalize()
finalize the worker
Definition: Worker.cxx:452
EL::Job::optMaxEvents
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
Definition: Job.h:222
EL::Worker::m_newInputFile
bool m_newInputFile
whether this is a new input file (i.e.
Definition: Worker.h:429
EL::Worker::addOutput
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
Definition: Worker.cxx:88
EL::Worker::m_firstEvent
bool m_firstEvent
whether we are still to process the first event
Definition: Worker.h:449
EL::Worker::m_modules
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
Definition: Worker.h:423
EL::Detail::OutputStreamData
all data needed to manage a given output stream
Definition: OutputStreamData.h:30
EL::Worker::m_outputTarget
std::string m_outputTarget
the target file to which we will write the histogram output
Definition: Worker.h:434
BatchInputModule.h
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
python.CaloRecoConfig.f
f
Definition: CaloRecoConfig.py:127
EL::Worker::filterPassed
virtual bool filterPassed() const noexcept final override
whether the current algorithm passed its filter criterion for the current event
Definition: Worker.cxx:306
EL::Worker::getOutputFileNull
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition: Worker.cxx:143
EL::Detail::OutputStreamData::getOutputHist
TObject * getOutputHist(const std::string &name) const noexcept
get the output histogram with the given name, or nullptr if there is no histogam with such a name
Definition: OutputStreamData.cxx:293
SGout2dot.alg
alg
Definition: SGout2dot.py:243
EL::Detail::ModuleData::m_inputTree
TTree * m_inputTree
the (main) tree in the input file
Definition: ModuleData.h:75
get_generator_info.result
result
Definition: get_generator_info.py:21
Driver.h
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:126
run.infile
string infile
Definition: run.py:13
EL::Worker::eventsProcessed
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
Definition: Worker.cxx:712
EventCountModule.h
EL::Worker::directExecute
::StatusCode directExecute(const SH::SamplePtr &sample, const Job &job, const std::string &location, const SH::MetaObject &options)
run the job
Definition: Worker.cxx:731
SH::MetaObject
A class that manages meta-data to be associated with an object.
Definition: MetaObject.h:56
GridReportingModule.h
RootUtils.h
EL::BatchSample
Definition: BatchSample.h:32
EL::Worker::inputFileName
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
Definition: Worker.cxx:236
SH::MetaObject::castBool
bool castBool(const std::string &name, bool def_val=false, CastMode mode=CAST_ERROR_THROW) const
the meta-data boolean with the given name
EL::EventRange::m_url
std::string m_url
the location of the file
Definition: EventRange.h:24
FactoryPreloadModule.h
EL::OutputStream
Definition: OutputStream.h:34
EL::Worker::inputFile
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
Definition: Worker.cxx:227
PlotCalibFromCool.label
label
Definition: PlotCalibFromCool.py:78
Execution.SkipEvents
SkipEvents
Definition: Execution.py:91
Job.h
PostClosedOutputsModule.h
EL::Worker::addModule
void addModule(std::unique_ptr< Detail::Module > module)
add the given module to this worker
Definition: Worker.cxx:721
RCU_REQUIRE
#define RCU_REQUIRE(x)
Definition: Assert.h:208
tree
TChain * tree
Definition: tile_monitor.h:30
EL::Worker::openInputFile
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
Definition: Worker.cxx:599
OutputStream.h
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
EL::Worker::m_algorithmsInitialized
bool m_algorithmsInitialized
whether the algorithms are initialized
Definition: Worker.h:444
EL::EventRange::m_beginEvent
Long64_t m_beginEvent
the first event to process
Definition: EventRange.h:27
ANA_MSG_ERROR
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:294
ANA_CHECK
#define ANA_CHECK(EXP)
check whether the given expression was successful
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:324
EL::Detail::OutputStreamData::close
void close()
close this file
Definition: OutputStreamData.cxx:153
EL::Job::optFactoryPreload
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
Definition: Job.h:214
FileExecutedModule.h
Assert.h
const
bool const RAWDATA *ch2 const
Definition: LArRodBlockPhysicsV0.cxx:562
MessageCheck.h
SH::MetaObject::setBool
void setBool(const std::string &name, bool value)
set the meta-data boolean with the given name
skel.nEventsPerJob
nEventsPerJob
Definition: skel.ABtoEVGEN.py:567
EL::Detail::ModuleData::m_inputFile
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
Definition: ModuleData.h:72
AthenaPoolTestWrite.stream
string stream
Definition: AthenaPoolTestWrite.py:12
EL::Worker::setFilterPassed
virtual void setFilterPassed(bool val_filterPassed) noexcept final override
set the value of filterPassed
Definition: Worker.cxx:315
EL::Detail::ModuleData::m_jobStats
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
Definition: ModuleData.h:94
AlgorithmStateModule.h
EL::Job::optXAODInput
static const std::string optXAODInput
the option to select whether our input is xAODs
Definition: Job.h:390
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
EL::Worker::processEvents
::StatusCode processEvents(EventRange &eventRange) override
process the given event range
Definition: Worker.cxx:505
EL::BatchSegment
Definition: BatchSegment.h:31
python.iconfTool.models.loaders.level
level
Definition: loaders.py:20
BatchSegment.h
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
python.SCT_ByteStreamErrorsTestAlgConfig.maxEvents
maxEvents
Definition: SCT_ByteStreamErrorsTestAlgConfig.py:43
EL::Worker::fileOpenErrorFilter
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
Definition: Worker.cxx:583
StopwatchModule.h
WorkerConfigModule.h
python.PyAthena.module
module
Definition: PyAthena.py:134
SamplePtr.h
MetaObject.h
OutputStreamData.h
MemoryMonitorModule.h
EL::Worker::metaData
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
Definition: Worker.cxx:200
EL::Algorithm
Definition: Algorithm.h:22
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
FullCPAlgorithmsTest_eljob.sample
sample
Definition: FullCPAlgorithmsTest_eljob.py:100
RCU_REQUIRE_SOFT
#define RCU_REQUIRE_SOFT(x)
Definition: Assert.h:153
AlgorithmTimerModule.h
EL::Worker::getAlg
EL::Algorithm * getAlg(const std::string &name) const override
effects: returns the algorithms with the given name or NULL if there is none guarantee: strong failur...
Definition: Worker.cxx:283
ToolsOther.h
EL::Worker::setSegmentName
void setSegmentName(const std::string &val_segmentName)
set the segment name
Definition: Worker.cxx:355
EL::Worker::setOutputHist
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
Definition: Worker.cxx:345
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
python.sizes.location
string location
Definition: sizes.py:11
EL::Detail::ModuleData::m_tstore
xAOD::TStore * m_tstore
the TStore structure, if we use one
Definition: ModuleData.h:100
EL::Worker::triggerConfig
TTree * triggerConfig() const override
description: the trigger config tree from the input file, or NULL if we did not find it guarantee: st...
Definition: Worker.cxx:250
ANA_MSG_INFO
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:290
DiskWriter.h
EL::Worker::~Worker
virtual ~Worker()
effects: standard destructor guarantee: no-fail
Definition: Worker.cxx:80
BatchSample.h
file
TFile * file
Definition: tile_monitor.h:29
dumpFileToPlots.treeName
string treeName
Definition: dumpFileToPlots.py:20
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
EL
This module defines the arguments passed from the BATCH driver to the BATCH worker.
Definition: AlgorithmWorkerData.h:24
SH::openFile
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
Definition: ToolsOther.cxx:35
nEvents
int nEvents
Definition: fbtTestBasics.cxx:77
EL::EventRange::eof
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Definition: EventRange.h:34
SH::MetaObject::castString
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:379
EL::Worker::getOutputHist
TObject * getOutputHist(const std::string &name) const final override
get the output histogram with the given name
Definition: Worker.cxx:119
EL::Worker::inputFileNumEntries
Long64_t inputFileNumEntries() const override
the number of events in the input file
Definition: Worker.cxx:698
SH::MetaFields::treeName
static const std::string treeName
the name of the tree in the sample
Definition: MetaFields.h:52
EL::Worker::setJobConfig
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
Definition: Worker.cxx:365
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
beamspotman.jobConfig
dictionary jobConfig
Definition: beamspotman.py:1071
EL::Detail::ModuleData::m_outputs
std::map< std::string, Detail::OutputStreamData > m_outputs
the list of output files
Definition: ModuleData.h:109
EL::BatchJob
Definition: BatchJob.h:36
EL::Job::optAlgorithmTimer
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
Definition: Job.h:205
ReadFromCoolCompare.os
os
Definition: ReadFromCoolCompare.py:231
merge.output
output
Definition: merge.py:17
DiskOutput.h
EL::Worker::skipEvent
void skipEvent() override
effects: skip the current event, i.e.
Definition: Worker.cxx:297
EL::Job::histogramStreamName
static const std::string histogramStreamName
the name of the histogram output stream
Definition: Job.h:602
EL::Worker::EC_BADINPUT
@ EC_BADINPUT
Definition: Worker.h:253
EL::Job::optAlgorithmMemoryMonitor
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
Definition: Job.h:210
EL::Worker::addOutputStream
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
Definition: Worker.cxx:675
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
EL::Worker::batchExecute
::StatusCode batchExecute(unsigned job_id, const char *confFile)
effects: do what is needed to execute the given job segment guarantee: basic failures: job specific
Definition: Worker.cxx:777
RCU::SetDirectory
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
Definition: RootUtils.cxx:28
MetaFields.h
ThrowMsg.h
RootUtils::WithRootErrorHandler
Run a MT piece of code with an alternate root error handler.
Definition: WithRootErrorHandler.h:56
EL::JobConfig
the job configuration that is independent of driver and dataset
Definition: JobConfig.h:39
BatchJob.h
TEventModule.h
EL::Detail::ModuleData::m_inputTreeEntry
uint64_t m_inputTreeEntry
the entry in the input tree we are currently looking at
Definition: ModuleData.h:79
SH::MetaObject::fetchDefaults
void fetchDefaults(const MetaObject &source)
fetch the meta-data from the given sample not present in this sample.
SH::SamplePtr
A smart pointer class that holds a single Sample object.
Definition: SamplePtr.h:35
EL::Worker::addTree
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
Definition: Worker.cxx:155
EL::Worker::testInvariant
void testInvariant() const
effects: test the invariant of this object guarantee: no-fail
Definition: Worker.cxx:68
EL::Detail::ModuleData::m_algs
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
Definition: ModuleData.h:66
AlgorithmMemoryModule.h
EL::Worker::getOutputTree
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
Definition: Worker.cxx:177
WithRootErrorHandler.h
Run a MT piece of code with an alternate root error handler.
EL::Detail::ModuleData::m_inputFileUrl
std::string m_inputFileUrl
the input file url of the currently opened file
Definition: ModuleData.h:69
EL::Detail::ModuleData::m_histOutput
OutputStreamData * m_histOutput
the histogram output stream
Definition: ModuleData.h:91
EL::Detail::ModuleData::m_eventsProcessed
uint64_t m_eventsProcessed
the number of events that have been processed
Definition: ModuleData.h:88
xAOD::TStore
A relatively simple transient store for objects created in analysis.
Definition: TStore.h:44
EL::Worker::getOutputFile
TFile * getOutputFile(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition: Worker.cxx:131
RCU_DESTROY_INVARIANT
#define RCU_DESTROY_INVARIANT(x)
Definition: Assert.h:235
EL::Detail::ModuleData::m_tevent
xAOD::TEvent * m_tevent
the TEvent structure, if we use one
Definition: ModuleData.h:97
LeakCheckModule.h
DirectInputModule.h
Worker.h
SH::MetaObject::castDouble
double castDouble(const std::string &name, double def_val=0, CastMode mode=CAST_ERROR_THROW) const
the meta-data double with the given name
StatusCode.h
EL::Detail::ModuleData::m_skipEvent
bool m_skipEvent
whether we are skipping the current event
Definition: ModuleData.h:82
RCU_CHANGE_INVARIANT
#define RCU_CHANGE_INVARIANT(x)
Definition: Assert.h:231
EL::Worker::setMetaData
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
Definition: Worker.cxx:334
EL::Worker::xaodEvent
xAOD::TEvent * xaodEvent() const override
description: the xAOD event and store guarantee: strong failures: out of memory I failures: TEventSvc...
Definition: Worker.cxx:259
EL::Worker::treeEntry
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
Definition: Worker.cxx:218
EL::Worker::gridExecute
::StatusCode gridExecute(const std::string &sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
Definition: Worker.cxx:853
RCU_THROW_MSG
#define RCU_THROW_MSG(message)
Definition: PrintMsg.h:58
EL::Job::optSkipEvents
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
Definition: Job.h:230
EL::Worker::initialize
::StatusCode initialize()
initialize the worker
Definition: Worker.cxx:377
SH::MetaFields::treeName_default
static const std::string treeName_default
the default value of treeName
Definition: MetaFields.h:55
EL::Detail::OutputStreamData::saveOutput
void saveOutput()
write the list of output objects to disk and clear it
Definition: OutputStreamData.cxx:172
EL::Job
Definition: Job.h:51
test_interactive_athena.job
job
Definition: test_interactive_athena.py:6
EL::Detail::ModuleData::m_worker
Worker * m_worker
the worker (to pass on to the algorithms)
Definition: ModuleData.h:106
EL::Worker::xaodStore
xAOD::TStore * xaodStore() const override
Definition: Worker.cxx:271
python.PyAthena.obj
obj
Definition: PyAthena.py:135
RCU_ASSERT
#define RCU_ASSERT(x)
Definition: Assert.h:222
checkJobs.completed
completed
Definition: checkJobs.py:24
EL::Detail::ModuleData::m_metaData
const SH::MetaObject * m_metaData
the meta-data we use
Definition: ModuleData.h:85
EL::EventRange
a range of events in a given file
Definition: EventRange.h:22
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
EL::Detail::report_exception
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
Definition: PhysicsAnalysis/D3PDTools/EventLoop/Root/MessageCheck.cxx:27
Sample.h
IAlgorithmWrapper.h
EL::Worker::tree
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
Definition: Worker.cxx:209
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
EL::Worker::addOutputList
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
Definition: Worker.cxx:102
xAOD::TEvent
Tool for accessing xAOD files outside of Athena.
Definition: Control/xAODRootAccess/xAODRootAccess/TEvent.h:81
EL::Worker::Worker
Worker()
standard constructor
Definition: Worker.cxx:324
EL::Worker::processInputs
::StatusCode processInputs()
process all the inputs
Definition: Worker.cxx:438
EventRange.h
EL::Job::optGridReporting
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
Definition: Job.h:494
EL::EventRange::m_endEvent
Long64_t m_endEvent
the event past the last event, or eof
Definition: EventRange.h:30
EL::Worker::m_segmentName
std::string m_segmentName
the name of the segment we are processing
Definition: Worker.h:439
NSWL1::PadTriggerAdapter::segment
Muon::NSW_PadTriggerSegment segment(const NSWL1::PadTrigger &data)
Definition: PadTriggerAdapter.cxx:5
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233