ATLAS Offline Software
Worker.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 
8 //
9 // includes
10 //
11 
12 #include <EventLoop/Worker.h>
13 
19 #include <EventLoop/BatchJob.h>
20 #include <EventLoop/BatchSample.h>
21 #include <EventLoop/BatchSegment.h>
23 #include <EventLoop/Driver.h>
25 #include <EventLoop/EventRange.h>
29 #include <EventLoop/Job.h>
32 #include <EventLoop/MessageCheck.h>
33 #include <EventLoop/OutputStream.h>
35 #include <EventLoop/StatusCode.h>
38 #include <EventLoop/TEventModule.h>
40 #include <RootCoreUtils/Assert.h>
42 #include <RootCoreUtils/ThrowMsg.h>
48 #include <SampleHandler/Sample.h>
51 #include <TFile.h>
52 #include <TH1.h>
53 #include <TROOT.h>
54 #include <TSystem.h>
55 #include <TTree.h>
56 #include <TObjString.h>
57 #include <fstream>
58 #include <memory>
59 #include <exception>
60 
61 //
62 // method implementations
63 //
64 
65 namespace EL
66 {
67  void Worker ::
68  testInvariant () const
69  {
70  RCU_INVARIANT (this != nullptr);
71  for (std::size_t iter = 0, end = m_algs.size(); iter != end; ++ iter)
72  {
73  RCU_INVARIANT (m_algs[iter].m_algorithm != nullptr);
74  }
75  }
76 
77 
78 
80  ~Worker ()
81  {
82  RCU_DESTROY_INVARIANT (this);
83  }
84 
85 
86 
87  void Worker ::
88  addOutput (TObject *output_swallow)
89  {
90  std::unique_ptr<TObject> output (output_swallow);
91 
92  RCU_CHANGE_INVARIANT (this);
93  RCU_REQUIRE_SOFT (output_swallow != 0);
94 
95  RCU::SetDirectory (output_swallow, 0);
96  ModuleData::addOutput (std::move (output));
97  }
98 
99 
100 
102  addOutputList (const std::string& name, TObject *output_swallow)
103  {
104  std::unique_ptr<TObject> output (output_swallow);
105 
106  RCU_CHANGE_INVARIANT (this);
107  RCU_REQUIRE_SOFT (output_swallow != 0);
108 
109  RCU::SetDirectory (output_swallow, 0);
110  std::unique_ptr<TList> list (new TList);
111  list->SetName (name.c_str());
112  list->Add (output.release());
113  addOutput (list.release());
114  }
115 
116 
117 
118  TObject *Worker ::
119  getOutputHist (const std::string& name) const
120  {
121  RCU_READ_INVARIANT (this);
122 
123  TObject *result = m_histOutput->getOutputHist (name);
124  if (result == nullptr) RCU_THROW_MSG ("unknown output histogram: " + name);
125  return result;
126  }
127 
128 
129 
130  TFile *Worker ::
131  getOutputFile (const std::string& label) const
132  {
133  RCU_READ_INVARIANT (this);
134  TFile *result = getOutputFileNull (label);
135  if (result == 0)
136  RCU_THROW_MSG ("no output dataset defined with label: " + label);
137  return result;
138  }
139 
140 
141 
142  TFile *Worker ::
143  getOutputFileNull (const std::string& label) const
144  {
145  RCU_READ_INVARIANT (this);
146  auto iter = m_outputs.find (label);
147  if (iter == m_outputs.end())
148  return 0;
149  return iter->second.file();
150  }
151 
152 
153 
155  addTree( const TTree& tree, const std::string& stream )
156  {
157  using namespace msgEventLoop;
158  RCU_READ_INVARIANT( this );
159 
160  auto outputIter = m_outputs.find (stream);
161  if (outputIter == m_outputs.end())
162  {
163  ANA_MSG_ERROR ( "No output file with stream name \"" + stream +
164  "\" found" );
165  return ::StatusCode::FAILURE;
166  }
167 
168  outputIter->second.addClone (tree);
169 
170  // Return gracefully:
171  return ::StatusCode::SUCCESS;
172  }
173 
174 
175 
176  TTree *Worker::
177  getOutputTree( const std::string& name, const std::string& stream ) const
178  {
179  using namespace msgEventLoop;
180  RCU_READ_INVARIANT( this );
181 
182  auto outputIter = m_outputs.find (stream);
183  if (outputIter == m_outputs.end())
184  {
185  RCU_THROW_MSG ( "No output file with stream name \"" + stream
186  + "\" found" );
187  }
188 
189  TTree *result = outputIter->second.getOutputTree( name );
190  if( result == nullptr ) {
191  RCU_THROW_MSG ( "No tree with name \"" + name + "\" in stream \"" +
192  stream + "\"" );
193  }
194  return result;
195  }
196 
197 
198 
200  metaData () const
201  {
202  RCU_READ_INVARIANT (this);
203  return m_metaData;
204  }
205 
206 
207 
208  TTree *Worker ::
209  tree () const
210  {
211  RCU_READ_INVARIANT (this);
212  return m_inputTree;
213  }
214 
215 
216 
217  Long64_t Worker ::
218  treeEntry () const
219  {
220  RCU_READ_INVARIANT (this);
221  return m_inputTreeEntry;
222  }
223 
224 
225 
226  TFile *Worker ::
227  inputFile () const
228  {
229  RCU_READ_INVARIANT (this);
230  return m_inputFile.get();
231  }
232 
233 
234 
235  std::string Worker ::
236  inputFileName () const
237  {
238  // no invariant used
239  std::string path = inputFile()->GetName();
240  auto split = path.rfind ('/');
241  if (split != std::string::npos)
242  return path.substr (split + 1);
243  else
244  return path;
245  }
246 
247 
248 
249  TTree *Worker ::
250  triggerConfig () const
251  {
252  RCU_READ_INVARIANT (this);
253  return dynamic_cast<TTree*>(inputFile()->Get("physicsMeta/TrigConfTree"));
254  }
255 
256 
257 
259  xaodEvent () const
260  {
261  RCU_READ_INVARIANT (this);
262 
263  if (m_tevent == nullptr)
264  RCU_THROW_MSG ("Job not configured for xAOD support");
265  return m_tevent;
266  }
267 
268 
269 
271  xaodStore () const
272  {
273  RCU_READ_INVARIANT (this);
274 
275  if (m_tstore == nullptr)
276  RCU_THROW_MSG ("Job not configured for xAOD support");
277  return m_tstore;
278  }
279 
280 
281 
283  getAlg (const std::string& name) const
284  {
285  RCU_READ_INVARIANT (this);
286  for (auto& alg : m_algs)
287  {
288  if (alg->hasName (name))
289  return alg.m_algorithm->getLegacyAlg();
290  }
291  return 0;
292  }
293 
294 
295 
297  skipEvent ()
298  {
299  RCU_CHANGE_INVARIANT (this);
300  m_skipEvent = true;
301  }
302 
303 
304 
306  filterPassed () const noexcept
307  {
308  RCU_READ_INVARIANT (this);
309  return !m_skipEvent;
310  }
311 
312 
313 
315  setFilterPassed (bool val_filterPassed) noexcept
316  {
317  RCU_CHANGE_INVARIANT (this);
318  m_skipEvent = !val_filterPassed;
319  }
320 
321 
322 
324  Worker ()
325  {
326  m_worker = this;
327 
328  RCU_NEW_INVARIANT (this);
329  }
330 
331 
332 
334  setMetaData (const SH::MetaObject *val_metaData)
335  {
336  RCU_CHANGE_INVARIANT (this);
337  RCU_REQUIRE (val_metaData != 0);
338 
339  m_metaData = val_metaData;
340  }
341 
342 
343 
345  setOutputHist (const std::string& val_outputTarget)
346  {
347  RCU_CHANGE_INVARIANT (this);
348 
349  m_outputTarget = val_outputTarget;
350  }
351 
352 
353 
355  setSegmentName (const std::string& val_segmentName)
356  {
357  RCU_CHANGE_INVARIANT (this);
358 
359  m_segmentName = val_segmentName;
360  }
361 
362 
363 
366  {
367  RCU_CHANGE_INVARIANT (this);
368  for (std::unique_ptr<IAlgorithmWrapper>& alg : jobConfig.extractAlgorithms())
369  {
370  m_algs.push_back (std::move (alg));
371  }
372  }
373 
374 
375 
377  initialize ()
378  {
379  using namespace msgEventLoop;
380  RCU_CHANGE_INVARIANT (this);
381 
382  const bool xAODInput = m_metaData->castBool (Job::optXAODInput, false);
383 
384  ANA_MSG_INFO ("xAODInput = " << xAODInput);
385  if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
386  m_modules.push_back (std::make_unique<Detail::MemoryMonitorModule> ("EarlyMemoryMonitorModule"));
387  if (xAODInput)
388  m_modules.push_back (std::make_unique<Detail::TEventModule> ("TEventModule"));
389  auto factoryPreload = metaData()->castString (Job::optFactoryPreload, "");
390  if (!factoryPreload.empty())
391  {
392  auto module = std::make_unique<Detail::FactoryPreloadModule> ("FactoryPreloadModule");
393  module->preloader = factoryPreload;
394  m_modules.push_back (std::move (module));
395  }
396  m_modules.push_back (std::make_unique<Detail::LeakCheckModule> ("LeakCheckModule"));
397  m_modules.push_back (std::make_unique<Detail::StopwatchModule> ("StopwatchModule"));
398  if (metaData()->castBool (Job::optGridReporting, false))
399  m_modules.push_back (std::make_unique<Detail::GridReportingModule>("GridReportingModule"));
400  if (metaData()->castBool (Job::optAlgorithmTimer, false))
401  m_modules.push_back (std::make_unique<Detail::AlgorithmTimerModule> ("AlgorithmTimerModule"));
402  if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
403  m_modules.push_back (std::make_unique<Detail::AlgorithmMemoryModule> ("AlgorithmMemoryModule"));
404  m_modules.push_back (std::make_unique<Detail::FileExecutedModule> ("FileExecutedModule"));
405  m_modules.push_back (std::make_unique<Detail::EventCountModule> ("EventCountModule"));
406  m_modules.push_back (std::make_unique<Detail::WorkerConfigModule> ("WorkerConfigModule"));
407  m_modules.push_back (std::make_unique<Detail::AlgorithmStateModule> ("AlgorithmStateModule"));
408  m_modules.push_back (std::make_unique<Detail::PostClosedOutputsModule> ("PostClosedOutputsModule"));
409  if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
410  m_modules.push_back (std::make_unique<Detail::MemoryMonitorModule> ("LateMemoryMonitorModule"));
411 
412  if (m_outputs.find (Job::histogramStreamName) == m_outputs.end())
413  {
415  m_outputTarget + "/hist-" + m_segmentName + ".root", "RECREATE"};
417  }
420 
421  m_jobStats = std::make_unique<TTree>
422  ("EventLoop_JobStats", "EventLoop job statistics");
423  m_jobStats->SetDirectory (nullptr);
424 
425  ANA_MSG_INFO ("calling firstInitialize on all modules");
426  for (auto& module : m_modules)
427  ANA_CHECK (module->firstInitialize (*this));
428  ANA_MSG_INFO ("calling preFileInitialize on all modules");
429  for (auto& module : m_modules)
430  ANA_CHECK (module->preFileInitialize (*this));
431 
432  return ::StatusCode::SUCCESS;
433  }
434 
435 
436 
439  {
440  using namespace msgEventLoop;
441 
442  RCU_CHANGE_INVARIANT (this);
443 
444  for (auto& module : m_modules)
445  ANA_CHECK (module->processInputs (*this, *this));
446  return ::StatusCode::SUCCESS;
447  }
448 
449 
450 
452  finalize ()
453  {
454  using namespace msgEventLoop;
455 
456  RCU_CHANGE_INVARIANT (this);
457 
458  if (m_algorithmsInitialized == false)
459  {
460  ANA_MSG_ERROR ("algorithms never got initialized");
461  return StatusCode::FAILURE;
462  }
463 
464  ANA_CHECK (openInputFile (""));
465  for (auto& module : m_modules)
466  ANA_CHECK (module->onFinalize (*this));
467  for (auto& output : m_outputs)
468  {
469  if (output.first != Job::histogramStreamName)
470  {
471  output.second.saveOutput ();
472  output.second.close ();
473  std::string path = output.second.finalFileName ();
474  if (!path.empty())
475  addOutputList ("EventLoop_OutputStream_" + output.first, new TObjString (path.c_str()));
476  }
477  }
478  for (auto& module : m_modules)
479  ANA_CHECK (module->postFinalize (*this));
480  if (m_jobStats->GetListOfBranches()->GetEntries() > 0)
481  {
482  if (m_jobStats->Fill() <= 0)
483  {
484  ANA_MSG_ERROR ("failed to fill the job statistics tree");
485  return ::StatusCode::FAILURE;
486  }
487  ModuleData::addOutput (std::move (m_jobStats));
488  }
490  for (auto& module : m_modules)
491  ANA_CHECK (module->onWorkerEnd (*this));
493  m_histOutput->close ();
494 
495  for (auto& module : m_modules){
496  ANA_CHECK (module->postFileClose(*this));
497  }
498  ANA_MSG_INFO ("worker finished successfully");
499  return ::StatusCode::SUCCESS;
500  }
501 
502 
503 
505  processEvents (EventRange& eventRange)
506  {
507  using namespace msgEventLoop;
508 
509  RCU_CHANGE_INVARIANT (this);
510  RCU_REQUIRE (!eventRange.m_url.empty());
511  RCU_REQUIRE (eventRange.m_beginEvent >= 0);
512  RCU_REQUIRE (eventRange.m_endEvent == EventRange::eof || eventRange.m_endEvent >= eventRange.m_beginEvent);
513 
514  ANA_CHECK (openInputFile (eventRange.m_url));
515 
516  if (eventRange.m_beginEvent > inputFileNumEntries())
517  {
518  ANA_MSG_ERROR ("first event (" << eventRange.m_beginEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
519  return ::StatusCode::FAILURE;
520  }
521  if (eventRange.m_endEvent == EventRange::eof)
522  {
523  eventRange.m_endEvent = inputFileNumEntries();
524  } else if (eventRange.m_endEvent > inputFileNumEntries())
525  {
526  ANA_MSG_ERROR ("end event (" << eventRange.m_endEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
527  return ::StatusCode::FAILURE;
528  }
529 
530  m_inputTreeEntry = eventRange.m_beginEvent;
531 
532  if (m_algorithmsInitialized == false)
533  {
534  for (auto& module : m_modules)
535  ANA_CHECK (module->onInitialize (*this));
537  }
538 
539  if (m_newInputFile)
540  {
541  m_newInputFile = false;
542  for (auto& module : m_modules)
543  ANA_CHECK (module->onNewInputFile (*this));
544  }
545 
546  if (eventRange.m_beginEvent == 0)
547  {
548  for (auto& module : m_modules)
549  ANA_CHECK (module->onFileExecute (*this));
550  }
551 
552  ANA_MSG_INFO ("Processing events " << eventRange.m_beginEvent << "-" << eventRange.m_endEvent << " in file " << eventRange.m_url);
553 
554  for (uint64_t event = eventRange.m_beginEvent;
555  event != uint64_t (eventRange.m_endEvent);
556  ++ event)
557  {
559  for (auto& module : m_modules)
560  {
561  if (module->onExecute (*this).isFailure())
562  {
563  ANA_MSG_ERROR ("processing event " << treeEntry() << " on file " << inputFileName());
564  return ::StatusCode::FAILURE;
565  }
566  }
567  if (m_firstEvent)
568  {
569  m_firstEvent = false;
570  for (auto& module : m_modules)
571  ANA_CHECK (module->postFirstEvent (*this));
572  }
573  m_eventsProcessed += 1;
574  if (m_eventsProcessed % 10000 == 0)
575  ANA_MSG_INFO ("Processed " << m_eventsProcessed << " events");
576  }
577  return ::StatusCode::SUCCESS;
578  }
579 
580 
581 
583  fileOpenErrorFilter(int level, bool /*b1*/, const char* s1, const char * s2)
584  {
585  // Don't fail on missing dictionary messages.
586  if (strstr (s2, "no streamer or dictionary") != nullptr) {
587  return true;
588  }
589 
590  // For messages above warning level (SysError, Error, Fatal)
591  if( level > kWarning ) {
592  // We won't output further; ROOT should have already put something in the log file
593  std::string msg = "ROOT error detected in Worker.cxx: ";
594  msg += s1;
595  msg += " ";
596  msg += s2;
597  throw std::runtime_error(msg);
598 
599  // No need for further error handling
600  return false;
601  }
602 
603  // Pass to the default error handlers
604  return true;
605  }
606 
608  openInputFile (const std::string& inputFileUrl)
609  {
610  using namespace msgEventLoop;
611 
612  // Enable custom error handling in a nice way
614 
615  RCU_CHANGE_INVARIANT (this);
616 
617  if (m_inputFileUrl == inputFileUrl)
618  return ::StatusCode::SUCCESS;
619 
620  if (!m_inputFileUrl.empty())
621  {
622  if (m_newInputFile == false)
623  {
624  for (auto& module : m_modules)
625  ANA_CHECK (module->onCloseInputFile (*this));
626  for (auto& module : m_modules)
627  ANA_CHECK (module->postCloseInputFile (*this));
628  }
629  m_newInputFile = false;
630  m_inputTree = nullptr;
631  m_inputFile.reset ();
632  m_inputFileUrl.clear ();
633  }
634 
635  if (inputFileUrl.empty())
636  return ::StatusCode::SUCCESS;
637 
638  ANA_MSG_INFO ("Opening file " << inputFileUrl);
639  std::unique_ptr<TFile> inputFile;
640  try
641  {
642  inputFile = SH::openFile (inputFileUrl, *metaData());
643  } catch (...)
644  {
645  Detail::report_exception (std::current_exception());
646  }
647  if (inputFile.get() == 0)
648  {
649  ANA_MSG_ERROR ("failed to open file " << inputFileUrl);
650  for (auto& module : m_modules)
651  module->reportInputFailure (*this);
652  return ::StatusCode::FAILURE;
653  }
654  if (inputFile->IsZombie())
655  {
656  ANA_MSG_ERROR ("input file is a zombie: " << inputFileUrl);
657  for (auto& module : m_modules)
658  module->reportInputFailure (*this);
659  return ::StatusCode::FAILURE;
660  }
661 
662  TTree *tree = 0;
663  const std::string treeName
665  tree = dynamic_cast<TTree*>(inputFile->Get (treeName.c_str()));
666  if (tree == nullptr)
667  {
668  ANA_MSG_INFO ("tree " << treeName << " not found in input file: " << inputFileUrl);
669  ANA_MSG_INFO ("treating this like a tree with no events");
670  }
671 
672  m_newInputFile = true;
673  m_inputTree = tree;
674  m_inputTreeEntry = 0;
675  m_inputFile = std::move (inputFile);
676  m_inputFileUrl = std::move (inputFileUrl);
677 
678  return ::StatusCode::SUCCESS;
679  }
680 
681 
682 
684  addOutputStream (const std::string& label,
686  {
687  using namespace msgEventLoop;
688  RCU_CHANGE_INVARIANT (this);
689 
690  if (m_outputs.find (label) != m_outputs.end())
691  {
692  ANA_MSG_ERROR ("output file already defined for label: " + label);
693  return ::StatusCode::FAILURE;
694  }
695  if (data.file() == nullptr)
696  {
697  ANA_MSG_ERROR ("output stream does not have a file attached");
698  return ::StatusCode::FAILURE;
699  }
700  m_outputs.insert (std::make_pair (label, std::move (data)));
701  return ::StatusCode::SUCCESS;
702  }
703 
704 
705 
706  Long64_t Worker ::
707  inputFileNumEntries () const
708  {
709  RCU_READ_INVARIANT (this);
710  RCU_REQUIRE (inputFile() != 0);
711 
712  if (m_inputTree != 0)
713  return m_inputTree->GetEntries();
714  else
715  return 0;
716  }
717 
718 
719 
721  eventsProcessed () const noexcept
722  {
723  RCU_READ_INVARIANT (this);
724  return m_eventsProcessed;
725  }
726 
727 
728 
730  addModule (std::unique_ptr<Detail::Module> module)
731  {
732  RCU_CHANGE_INVARIANT (this);
733  RCU_REQUIRE (module != nullptr);
734  m_modules.push_back (std::move (module));
735  }
736 
737 
738 
740  directExecute (const SH::SamplePtr& sample, const Job& job,
741  const std::string& location, const SH::MetaObject& options)
742  {
743  using namespace msgEventLoop;
744  RCU_CHANGE_INVARIANT (this);
745 
746  SH::MetaObject meta (*sample->meta());
747  meta.fetchDefaults (options);
748 
749  setMetaData (&meta);
750  setOutputHist (location);
751  setSegmentName (sample->name());
752 
753  ANA_MSG_INFO ("Running sample: " << sample->name());
754 
755  setJobConfig (JobConfig (job.jobConfig()));
756 
757  for (Job::outputIter out = job.outputBegin(),
758  end = job.outputEnd(); out != end; ++ out)
759  {
761  out->output()->makeWriter (sample->name(), "", ".root")};
762  ANA_CHECK (addOutputStream (out->label(), std::move (data)));
763  }
764 
765  {
766  auto module = std::make_unique<Detail::DirectInputModule> ("DirectInputModule");
767  module->fileList = sample->makeFileList();
768  Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
769  if (maxEvents != -1)
770  module->maxEvents = maxEvents;
772  if (skipEvents != 0)
773  module->skipEvents = skipEvents;
774  addModule (std::move (module));
775  }
776 
777  ANA_CHECK (initialize ());
779  ANA_CHECK (finalize ());
780  return ::StatusCode::SUCCESS;
781  }
782 
783 
784 
786  batchExecute (unsigned job_id, const char *confFile)
787  {
788  using namespace msgEventLoop;
789  RCU_CHANGE_INVARIANT (this);
790 
791  try
792  {
793  std::unique_ptr<TFile> file (TFile::Open (confFile, "READ"));
794  if (file.get() == nullptr || file->IsZombie())
795  {
796  ANA_MSG_ERROR ("failed to open file: " << confFile);
797  return ::StatusCode::FAILURE;
798  }
799 
800  std::unique_ptr<BatchJob> job (dynamic_cast<BatchJob*>(file->Get ("job")));
801  if (job.get() == nullptr)
802  {
803  ANA_MSG_ERROR ("failed to retrieve BatchJob object");
804  return ::StatusCode::FAILURE;
805  }
806 
807  if (job_id >= job->segments.size())
808  {
809  ANA_MSG_ERROR ("invalid job-id " << job_id << ", max is " << job->segments.size());
810  return ::StatusCode::FAILURE;
811  }
812  BatchSegment *segment = &job->segments[job_id];
813  RCU_ASSERT (segment->job_id == job_id);
814  RCU_ASSERT (segment->sample < job->samples.size());
815  BatchSample *sample = &job->samples[segment->sample];
816 
817  gSystem->Exec ("pwd");
818  gSystem->MakeDirectory ("output");
819 
820  setMetaData (&sample->meta);
821  setOutputHist (job->location + "/fetch");
822  setSegmentName (segment->fullName);
823 
824  setJobConfig (JobConfig (job->job.jobConfig()));
825 
826  for (Job::outputIter out = job->job.outputBegin(),
827  end = job->job.outputEnd(); out != end; ++ out)
828  {
830  out->output()->makeWriter (segment->sampleName, segment->segmentName, ".root")};
831  ANA_CHECK (addOutputStream (out->label(), std::move (data)));
832  }
833 
834  {
835  auto module = std::make_unique<Detail::BatchInputModule> ("BatchInputModule");
836  Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
837  if (maxEvents != -1)
838  module->maxEvents = maxEvents;
839  module->sample = sample;
840  module->segment = segment;
841  addModule (std::move (module));
842  }
843 
844  ANA_CHECK (initialize ());
846  ANA_CHECK (finalize ());
847 
848  std::ostringstream job_name;
849  job_name << job_id;
850  std::ofstream completed ((job->location + "/status/completed-" + job_name.str()).c_str());
851  return ::StatusCode::SUCCESS;
852  } catch (...)
853  {
854  Detail::report_exception (std::current_exception());
855  return ::StatusCode::FAILURE;
856  }
857  }
858 
859 
860 
862  gridExecute (const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
863  {
864  using namespace msgEventLoop;
865  RCU_CHANGE_INVARIANT (this);
866 
867  ANA_MSG_INFO ("Running with ROOT version " << gROOT->GetVersion()
868  << " (" << gROOT->GetVersionDate() << ")");
869 
870  ANA_MSG_INFO ("Loading EventLoop grid job");
871 
872 
873  TList bigOutputs;
874  std::unique_ptr<JobConfig> jobConfig;
875  SH::MetaObject *mo = 0;
876 
877  std::unique_ptr<TFile> f (TFile::Open("jobdef.root"));
878  if (f == nullptr || f->IsZombie()) {
879  ANA_MSG_ERROR ("Could not read jobdef");
880  return ::StatusCode::FAILURE;
881  }
882 
883  mo = dynamic_cast<SH::MetaObject*>(f->Get(sampleName.c_str()));
884  if (!mo)
885  mo = dynamic_cast<SH::MetaObject*>(f->Get("defaultMetaObject"));
886  if (!mo) {
887  ANA_MSG_ERROR ("Could not read in sample meta object");
888  return ::StatusCode::FAILURE;
889  }
890 
891  jobConfig.reset (dynamic_cast<JobConfig*>(f->Get("jobConfig")));
892  if (jobConfig == nullptr)
893  {
894  ANA_MSG_ERROR ("failed to read jobConfig object");
895  return ::StatusCode::FAILURE;
896  }
897 
898  {
899  std::unique_ptr<TList> outs ((TList*)f->Get("outputs"));
900  if (outs == nullptr)
901  {
902  ANA_MSG_ERROR ("Could not read list of outputs");
903  return ::StatusCode::FAILURE;
904  }
905 
906  TIter itr(outs.get());
907  TObject *obj = 0;
908  while ((obj = itr())) {
909  EL::OutputStream * out = dynamic_cast<EL::OutputStream*>(obj);
910  if (out) {
911  bigOutputs.Add(out);
912  }
913  else {
914  ANA_MSG_ERROR ("Encountered unexpected entry in list of outputs");
915  return ::StatusCode::FAILURE;
916  }
917  }
918  }
919 
920  f->Close();
921  f.reset ();
922 
923  const std::string location = ".";
924 
925  mo->setBool (Job::optGridReporting, true);
926  setMetaData (mo);
927  setOutputHist (location);
928  setSegmentName ("output");
929 
930  ANA_MSG_INFO ("Starting EventLoop Grid worker");
931 
932  {//Create and register the "big" output files with base class
933  TIter itr(&bigOutputs);
934  TObject *obj = 0;
935  while ((obj = itr())) {
936  EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
937  if (os == nullptr)
938  {
939  ANA_MSG_ERROR ("Bad input");
940  return ::StatusCode::FAILURE;
941  }
942  {
944  location + "/" + os->label() + ".root", "RECREATE"};
945  ANA_CHECK (addOutputStream (os->label(), std::move (data)));
946  }
947  }
948  }
949 
950  setJobConfig (std::move (*jobConfig));
951 
952  {
953  auto module = std::make_unique<Detail::DirectInputModule> ("DirectInputModule");
954  std::ifstream infile("input.txt");
955  while (infile) {
956  std::string sLine;
957  if (!getline(infile, sLine)) break;
958  std::istringstream ssLine(sLine);
959  while (ssLine) {
960  std::string sFile;
961  if (!getline(ssLine, sFile, ',')) break;
962  module->fileList.push_back(sFile);
963  }
964  }
965  if (module->fileList.size() == 0) {
966  ANA_MSG_ERROR ("no input files provided");
967  //User was expecting input after all.
968  gSystem->Exit(EC_BADINPUT);
969  }
970 
971  if (nEventsPerJob != -1)
972  module->maxEvents = nEventsPerJob;
973  if (SkipEvents != 0)
974  module->skipEvents = SkipEvents;
975  addModule (std::move (module));
976  }
977 
978  ANA_CHECK (initialize());
980  ANA_CHECK (finalize ());
981 
982  int nEvents = eventsProcessed();
983  ANA_MSG_INFO ("Loop finished.");
984  ANA_MSG_INFO ("Read/processed " << nEvents << " events.");
985 
986  ANA_MSG_INFO ("EventLoop Grid worker finished");
987  ANA_MSG_INFO ("Saving output");
988  return ::StatusCode::SUCCESS;
989  }
990 }
EL::Worker::finalize
::StatusCode finalize()
finalize the worker
Definition: Worker.cxx:452
EL::Job::optMaxEvents
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
Definition: Job.h:222
EL::Worker::m_newInputFile
bool m_newInputFile
whether this is a new input file (i.e.
Definition: Worker.h:429
EL::Worker::addOutput
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
Definition: Worker.cxx:88
EL::Worker::m_firstEvent
bool m_firstEvent
whether we are still to process the first event
Definition: Worker.h:449
EL::Worker::m_modules
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
Definition: Worker.h:423
EL::Detail::OutputStreamData
all data needed to manage a given output stream
Definition: OutputStreamData.h:30
EL::Worker::m_outputTarget
std::string m_outputTarget
the target file to which we will write the histogram output
Definition: Worker.h:434
BatchInputModule.h
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
EL::Worker::filterPassed
virtual bool filterPassed() const noexcept final override
whether the current algorithm passed its filter criterion for the current event
Definition: Worker.cxx:306
EL::Worker::getOutputFileNull
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition: Worker.cxx:143
EL::Detail::OutputStreamData::getOutputHist
TObject * getOutputHist(const std::string &name) const noexcept
get the output histogram with the given name, or nullptr if there is no histogam with such a name
Definition: OutputStreamData.cxx:293
SGout2dot.alg
alg
Definition: SGout2dot.py:243
EL::Detail::ModuleData::m_inputTree
TTree * m_inputTree
the (main) tree in the input file
Definition: ModuleData.h:75
get_generator_info.result
result
Definition: get_generator_info.py:21
Driver.h
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
run.infile
string infile
Definition: run.py:13
EL::Worker::eventsProcessed
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
Definition: Worker.cxx:721
EventCountModule.h
EL::Worker::directExecute
::StatusCode directExecute(const SH::SamplePtr &sample, const Job &job, const std::string &location, const SH::MetaObject &options)
run the job
Definition: Worker.cxx:740
SH::MetaObject
A class that manages meta-data to be associated with an object.
Definition: MetaObject.h:56
GridReportingModule.h
RootUtils.h
EL::BatchSample
Definition: BatchSample.h:32
EL::Worker::inputFileName
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
Definition: Worker.cxx:236
SH::MetaObject::castBool
bool castBool(const std::string &name, bool def_val=false, CastMode mode=CAST_ERROR_THROW) const
the meta-data boolean with the given name
EL::EventRange::m_url
std::string m_url
the location of the file
Definition: EventRange.h:24
FactoryPreloadModule.h
EL::OutputStream
Definition: OutputStream.h:34
EL::Worker::inputFile
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
Definition: Worker.cxx:227
PlotCalibFromCool.label
label
Definition: PlotCalibFromCool.py:78
Execution.SkipEvents
SkipEvents
Definition: Execution.py:91
Job.h
PostClosedOutputsModule.h
EL::Worker::addModule
void addModule(std::unique_ptr< Detail::Module > module)
add the given module to this worker
Definition: Worker.cxx:730
RCU_REQUIRE
#define RCU_REQUIRE(x)
Definition: Assert.h:208
tree
TChain * tree
Definition: tile_monitor.h:30
EL::Worker::openInputFile
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
Definition: Worker.cxx:608
OutputStream.h
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
EL::Worker::m_algorithmsInitialized
bool m_algorithmsInitialized
whether the algorithms are initialized
Definition: Worker.h:444
EL::EventRange::m_beginEvent
Long64_t m_beginEvent
the first event to process
Definition: EventRange.h:27
ANA_MSG_ERROR
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:294
ANA_CHECK
#define ANA_CHECK(EXP)
check whether the given expression was successful
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:324
EL::Detail::OutputStreamData::close
void close()
close this file
Definition: OutputStreamData.cxx:153
EL::Job::optFactoryPreload
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
Definition: Job.h:214
FileExecutedModule.h
Assert.h
const
bool const RAWDATA *ch2 const
Definition: LArRodBlockPhysicsV0.cxx:560
MessageCheck.h
SH::MetaObject::setBool
void setBool(const std::string &name, bool value)
set the meta-data boolean with the given name
skel.nEventsPerJob
nEventsPerJob
Definition: skel.ABtoEVGEN.py:540
EL::Detail::ModuleData::m_inputFile
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
Definition: ModuleData.h:72
AthenaPoolTestWrite.stream
string stream
Definition: AthenaPoolTestWrite.py:12
EL::Worker::setFilterPassed
virtual void setFilterPassed(bool val_filterPassed) noexcept final override
set the value of filterPassed
Definition: Worker.cxx:315
EL::Detail::ModuleData::m_jobStats
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
Definition: ModuleData.h:94
AlgorithmStateModule.h
EL::Job::optXAODInput
static const std::string optXAODInput
the option to select whether our input is xAODs
Definition: Job.h:390
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
EL::Worker::processEvents
::StatusCode processEvents(EventRange &eventRange) override
process the given event range
Definition: Worker.cxx:505
EL::BatchSegment
Definition: BatchSegment.h:31
python.iconfTool.models.loaders.level
level
Definition: loaders.py:20
BatchSegment.h
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
python.SCT_ByteStreamErrorsTestAlgConfig.maxEvents
maxEvents
Definition: SCT_ByteStreamErrorsTestAlgConfig.py:43
EL::Worker::fileOpenErrorFilter
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
Definition: Worker.cxx:583
StopwatchModule.h
WorkerConfigModule.h
python.PyAthena.module
module
Definition: PyAthena.py:131
SamplePtr.h
MetaObject.h
OutputStreamData.h
MemoryMonitorModule.h
EL::Worker::metaData
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
Definition: Worker.cxx:200
EL::Algorithm
Definition: Algorithm.h:22
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
FullCPAlgorithmsTest_eljob.sample
sample
Definition: FullCPAlgorithmsTest_eljob.py:116
RCU_REQUIRE_SOFT
#define RCU_REQUIRE_SOFT(x)
Definition: Assert.h:153
AlgorithmTimerModule.h
EL::Worker::getAlg
EL::Algorithm * getAlg(const std::string &name) const override
effects: returns the algorithms with the given name or NULL if there is none guarantee: strong failur...
Definition: Worker.cxx:283
ToolsOther.h
EL::Worker::setSegmentName
void setSegmentName(const std::string &val_segmentName)
set the segment name
Definition: Worker.cxx:355
EL::Worker::setOutputHist
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
Definition: Worker.cxx:345
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
EL::Detail::ModuleData::m_tstore
xAOD::TStore * m_tstore
the TStore structure, if we use one
Definition: ModuleData.h:100
EL::Worker::triggerConfig
TTree * triggerConfig() const override
description: the trigger config tree from the input file, or NULL if we did not find it guarantee: st...
Definition: Worker.cxx:250
ANA_MSG_INFO
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
Definition: Control/AthToolSupport/AsgMessaging/AsgMessaging/MessageCheck.h:290
DiskWriter.h
EL::Worker::~Worker
virtual ~Worker()
effects: standard destructor guarantee: no-fail
Definition: Worker.cxx:80
BatchSample.h
file
TFile * file
Definition: tile_monitor.h:29
dumpFileToPlots.treeName
string treeName
Definition: dumpFileToPlots.py:20
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
EL
This module defines the arguments passed from the BATCH driver to the BATCH worker.
Definition: AlgorithmWorkerData.h:24
SH::openFile
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
Definition: ToolsOther.cxx:35
nEvents
int nEvents
Definition: fbtTestBasics.cxx:79
hist_file_dump.f
f
Definition: hist_file_dump.py:135
EL::EventRange::eof
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Definition: EventRange.h:34
SH::MetaObject::castString
std::string castString(const std::string &name, const std::string &def_val="", CastMode mode=CAST_ERROR_THROW) const
the meta-data string with the given name
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:379
EL::Worker::getOutputHist
TObject * getOutputHist(const std::string &name) const final override
get the output histogram with the given name
Definition: Worker.cxx:119
EL::Worker::inputFileNumEntries
Long64_t inputFileNumEntries() const override
the number of events in the input file
Definition: Worker.cxx:707
SH::MetaFields::treeName
static const std::string treeName
the name of the tree in the sample
Definition: MetaFields.h:52
EL::Worker::setJobConfig
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
Definition: Worker.cxx:365
RCU_INVARIANT
#define RCU_INVARIANT(x)
Definition: Assert.h:201
beamspotman.jobConfig
dictionary jobConfig
Definition: beamspotman.py:1071
EL::Detail::ModuleData::m_outputs
std::map< std::string, Detail::OutputStreamData > m_outputs
the list of output files
Definition: ModuleData.h:109
EL::BatchJob
Definition: BatchJob.h:36
EL::Job::optAlgorithmTimer
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
Definition: Job.h:205
ReadFromCoolCompare.os
os
Definition: ReadFromCoolCompare.py:231
merge.output
output
Definition: merge.py:17
DiskOutput.h
EL::Worker::skipEvent
void skipEvent() override
effects: skip the current event, i.e.
Definition: Worker.cxx:297
EL::Job::histogramStreamName
static const std::string histogramStreamName
the name of the histogram output stream
Definition: Job.h:602
EL::Worker::EC_BADINPUT
@ EC_BADINPUT
Definition: Worker.h:253
EL::Job::optAlgorithmMemoryMonitor
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
Definition: Job.h:210
EL::Worker::addOutputStream
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
Definition: Worker.cxx:684
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
EL::Worker::batchExecute
::StatusCode batchExecute(unsigned job_id, const char *confFile)
effects: do what is needed to execute the given job segment guarantee: basic failures: job specific
Definition: Worker.cxx:786
RCU::SetDirectory
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
Definition: RootUtils.cxx:28
MetaFields.h
ThrowMsg.h
RootUtils::WithRootErrorHandler
Run a MT piece of code with an alternate root error handler.
Definition: WithRootErrorHandler.h:56
EL::JobConfig
the job configuration that is independent of driver and dataset
Definition: JobConfig.h:39
BatchJob.h
TEventModule.h
EL::Detail::ModuleData::m_inputTreeEntry
uint64_t m_inputTreeEntry
the entry in the input tree we are currently looking at
Definition: ModuleData.h:79
SH::MetaObject::fetchDefaults
void fetchDefaults(const MetaObject &source)
fetch the meta-data from the given sample not present in this sample.
SH::SamplePtr
A smart pointer class that holds a single Sample object.
Definition: SamplePtr.h:35
EL::Worker::addTree
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
Definition: Worker.cxx:155
EL::Worker::testInvariant
void testInvariant() const
effects: test the invariant of this object guarantee: no-fail
Definition: Worker.cxx:68
EL::Detail::ModuleData::m_algs
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
Definition: ModuleData.h:66
AlgorithmMemoryModule.h
EL::Worker::getOutputTree
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
Definition: Worker.cxx:177
WithRootErrorHandler.h
Run a MT piece of code with an alternate root error handler.
EL::Detail::ModuleData::m_inputFileUrl
std::string m_inputFileUrl
the input file url of the currently opened file
Definition: ModuleData.h:69
EL::Detail::ModuleData::m_histOutput
OutputStreamData * m_histOutput
the histogram output stream
Definition: ModuleData.h:91
EL::Detail::ModuleData::m_eventsProcessed
uint64_t m_eventsProcessed
the number of events that have been processed
Definition: ModuleData.h:88
xAOD::TStore
A relatively simple transient store for objects created in analysis.
Definition: TStore.h:44
EL::Worker::getOutputFile
TFile * getOutputFile(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition: Worker.cxx:131
RCU_DESTROY_INVARIANT
#define RCU_DESTROY_INVARIANT(x)
Definition: Assert.h:235
EL::Detail::ModuleData::m_tevent
xAOD::TEvent * m_tevent
the TEvent structure, if we use one
Definition: ModuleData.h:97
LeakCheckModule.h
DirectInputModule.h
Worker.h
ReadCellNoiseFromCoolCompare.s2
s2
Definition: ReadCellNoiseFromCoolCompare.py:379
SH::MetaObject::castDouble
double castDouble(const std::string &name, double def_val=0, CastMode mode=CAST_ERROR_THROW) const
the meta-data double with the given name
StatusCode.h
EL::Detail::ModuleData::m_skipEvent
bool m_skipEvent
whether we are skipping the current event
Definition: ModuleData.h:82
RCU_CHANGE_INVARIANT
#define RCU_CHANGE_INVARIANT(x)
Definition: Assert.h:231
EL::Worker::setMetaData
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
Definition: Worker.cxx:334
EL::Worker::xaodEvent
xAOD::TEvent * xaodEvent() const override
description: the xAOD event and store guarantee: strong failures: out of memory I failures: TEventSvc...
Definition: Worker.cxx:259
EL::Worker::treeEntry
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
Definition: Worker.cxx:218
EL::Worker::gridExecute
::StatusCode gridExecute(const std::string &sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
Definition: Worker.cxx:862
RCU_THROW_MSG
#define RCU_THROW_MSG(message)
Definition: PrintMsg.h:58
EL::Job::optSkipEvents
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
Definition: Job.h:230
EL::Worker::initialize
::StatusCode initialize()
initialize the worker
Definition: Worker.cxx:377
SH::MetaFields::treeName_default
static const std::string treeName_default
the default value of treeName
Definition: MetaFields.h:55
EL::Detail::OutputStreamData::saveOutput
void saveOutput()
write the list of output objects to disk and clear it
Definition: OutputStreamData.cxx:172
EL::Job
Definition: Job.h:51
test_interactive_athena.job
job
Definition: test_interactive_athena.py:6
EL::Detail::ModuleData::m_worker
Worker * m_worker
the worker (to pass on to the algorithms)
Definition: ModuleData.h:106
EL::Worker::xaodStore
xAOD::TStore * xaodStore() const override
Definition: Worker.cxx:271
python.PyAthena.obj
obj
Definition: PyAthena.py:132
RCU_ASSERT
#define RCU_ASSERT(x)
Definition: Assert.h:222
checkJobs.completed
completed
Definition: checkJobs.py:24
EL::Detail::ModuleData::m_metaData
const SH::MetaObject * m_metaData
the meta-data we use
Definition: ModuleData.h:85
EL::EventRange
a range of events in a given file
Definition: EventRange.h:22
RCU_READ_INVARIANT
#define RCU_READ_INVARIANT(x)
Definition: Assert.h:229
EL::Detail::report_exception
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
Definition: PhysicsAnalysis/D3PDTools/EventLoop/Root/MessageCheck.cxx:27
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
Sample.h
IAlgorithmWrapper.h
EL::Worker::tree
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
Definition: Worker.cxx:209
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
EL::Worker::addOutputList
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
Definition: Worker.cxx:102
xAOD::TEvent
Tool for accessing xAOD files outside of Athena.
Definition: Control/xAODRootAccess/xAODRootAccess/TEvent.h:84
EL::Worker::Worker
Worker()
standard constructor
Definition: Worker.cxx:324
EL::Worker::processInputs
::StatusCode processInputs()
process all the inputs
Definition: Worker.cxx:438
EventRange.h
EL::Job::optGridReporting
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
Definition: Job.h:494
EL::EventRange::m_endEvent
Long64_t m_endEvent
the event past the last event, or eof
Definition: EventRange.h:30
EL::Worker::m_segmentName
std::string m_segmentName
the name of the segment we are processing
Definition: Worker.h:439
NSWL1::PadTriggerAdapter::segment
Muon::NSW_PadTriggerSegment segment(const NSWL1::PadTrigger &data)
Definition: PadTriggerAdapter.cxx:5
RCU_NEW_INVARIANT
#define RCU_NEW_INVARIANT(x)
Definition: Assert.h:233