ATLAS Offline Software
Loading...
Searching...
No Matches
Worker.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7
8//
9// includes
10//
11
12#include <EventLoop/Worker.h>
13
16#include <EventLoop/BatchJob.h>
19#include <EventLoop/Driver.h>
21#include <EventLoop/Job.h>
23#include <EventLoop/Module.h>
38#include <TFile.h>
39#include <TH1.h>
40#include <TROOT.h>
41#include <TSystem.h>
42#include <TTree.h>
43#include <TObjString.h>
44#include <fstream>
45#include <memory>
46#include <exception>
47
48//
49// method implementations
50//
51
52namespace EL
53{
54 namespace
55 {
56 StatusCode make_module (std::unique_ptr<Detail::Module>& module, asg::AsgComponentConfig config)
57 {
58 using namespace msgEventLoop;
59 ANA_MSG_DEBUG ("making EventLoop module of type " + config.type());
60 ANA_CHECK (config.makeComponentExpert (module, "new %1% (\"%2%\")", false, "ELModule."));
61 ANA_MSG_DEBUG ("Created EventLoop module of type " << config.type());
62 return StatusCode::SUCCESS;
63 }
64 }
65
66
67
68 void Worker ::
69 testInvariant () const
70 {
71 RCU_INVARIANT (this != nullptr);
72 for (std::size_t iter = 0, end = m_algs.size(); iter != end; ++ iter)
73 {
74 RCU_INVARIANT (m_algs[iter].m_algorithm != nullptr);
75 }
76 }
77
78
79
80 Worker ::
81 ~Worker ()
82 {
84 }
85
86
87
88 void Worker ::
89 addOutput (TObject *output_swallow)
90 {
91 std::unique_ptr<TObject> output (output_swallow);
92
94 RCU_REQUIRE_SOFT (output_swallow != 0);
95
96 RCU::SetDirectory (output_swallow, 0);
97 ModuleData::addOutput (std::move (output));
98 }
99
100
101
102 void Worker ::
103 addOutputList (const std::string& name, TObject *output_swallow)
104 {
105 std::unique_ptr<TObject> output (output_swallow);
106
108 RCU_REQUIRE_SOFT (output_swallow != 0);
109
110 RCU::SetDirectory (output_swallow, 0);
111 std::unique_ptr<TList> list (new TList);
112 list->SetName (name.c_str());
113 list->Add (output.release());
114 addOutput (list.release());
115 }
116
117
118
119 TObject *Worker ::
120 getOutputHist (const std::string& name) const
121 {
122 RCU_READ_INVARIANT (this);
123
124 TObject *result = m_histOutput->getOutputHist (name);
125 if (result == nullptr) RCU_THROW_MSG ("unknown output histogram: " + name);
126 return result;
127 }
128
129
130
131 TFile *Worker ::
132 getOutputFile (const std::string& label) const
133 {
134 RCU_READ_INVARIANT (this);
135 TFile *result = getOutputFileNull (label);
136 if (result == 0)
137 RCU_THROW_MSG ("no output dataset defined with label: " + label);
138 return result;
139 }
140
141
142
143 TFile *Worker ::
144 getOutputFileNull (const std::string& label) const
145 {
146 RCU_READ_INVARIANT (this);
147 auto iter = m_outputs.find (label);
148 if (iter == m_outputs.end())
149 return 0;
150 return iter->second->file();
151 }
152
153
154
156 addTree( const TTree& tree, const std::string& stream )
157 {
158 using namespace msgEventLoop;
159 RCU_READ_INVARIANT( this );
160
161 auto outputIter = m_outputs.find (stream);
162 if (outputIter == m_outputs.end())
163 {
164 ANA_MSG_ERROR ( "No output file with stream name \"" + stream +
165 "\" found" );
166 return ::StatusCode::FAILURE;
167 }
168
169 outputIter->second->addClone (tree);
170
171 // Return gracefully:
172 return ::StatusCode::SUCCESS;
173 }
174
175
176
177 TTree *Worker::
178 getOutputTree( const std::string& name, const std::string& stream ) const
179 {
180 using namespace msgEventLoop;
181 RCU_READ_INVARIANT( this );
182
183 auto outputIter = m_outputs.find (stream);
184 if (outputIter == m_outputs.end())
185 {
186 RCU_THROW_MSG ( "No output file with stream name \"" + stream
187 + "\" found" );
188 }
189
190 TTree *result = outputIter->second->getOutputTree( name );
191 if( result == nullptr ) {
192 RCU_THROW_MSG ( "No tree with name \"" + name + "\" in stream \"" +
193 stream + "\"" );
194 }
195 return result;
196 }
197
198
199
200 const SH::MetaObject *Worker ::
201 metaData () const
202 {
203 RCU_READ_INVARIANT (this);
204 return m_metaData;
205 }
206
207
208
209 TTree *Worker ::
210 tree () const
211 {
212 RCU_READ_INVARIANT (this);
213 return m_inputTree;
214 }
215
216
217
218 Long64_t Worker ::
219 treeEntry () const
220 {
221 RCU_READ_INVARIANT (this);
222 return m_inputTreeEntry;
223 }
224
225
226
227 TFile *Worker ::
228 inputFile () const
229 {
230 RCU_READ_INVARIANT (this);
231 return m_inputFile.get();
232 }
233
234
235
236 std::string Worker ::
237 inputFileName () const
238 {
239 // no invariant used
240 std::string path = inputFile()->GetName();
241 auto split = path.rfind ('/');
242 if (split != std::string::npos)
243 return path.substr (split + 1);
244 else
245 return path;
246 }
247
248
249
250 TTree *Worker ::
251 triggerConfig () const
252 {
253 RCU_READ_INVARIANT (this);
254 return dynamic_cast<TTree*>(inputFile()->Get("physicsMeta/TrigConfTree"));
255 }
256
257
258
259 xAOD::TEvent *Worker ::
260 xaodEvent () const
261 {
262 RCU_READ_INVARIANT (this);
263
264 if (m_tevent == nullptr)
265 RCU_THROW_MSG ("Job not configured for xAOD support");
266 return m_tevent;
267 }
268
269
270
271 xAOD::TStore *Worker ::
272 xaodStore () const
273 {
274 RCU_READ_INVARIANT (this);
275
276 if (m_tstore == nullptr)
277 RCU_THROW_MSG ("Job not configured for xAOD support");
278 return m_tstore;
279 }
280
281
282
283 Algorithm *Worker ::
284 getAlg (const std::string& name) const
285 {
286 RCU_READ_INVARIANT (this);
287 for (auto& alg : m_algs)
288 {
289 if (alg->hasName (name))
290 return alg.m_algorithm->getLegacyAlg();
291 }
292 return 0;
293 }
294
295
296
297 void Worker ::
298 skipEvent ()
299 {
301 m_skipEvent = true;
302 }
303
304
305
306 bool Worker ::
307 filterPassed () const noexcept
308 {
309 RCU_READ_INVARIANT (this);
310 return !m_skipEvent;
311 }
312
313
314
315 void Worker ::
316 setFilterPassed (bool val_filterPassed) noexcept
317 {
319 m_skipEvent = !val_filterPassed;
320 }
321
322
323
324 Worker ::
325 Worker ()
326 {
327 m_worker = this;
328
329 RCU_NEW_INVARIANT (this);
330 }
331
332
333
334 void Worker ::
335 setMetaData (const SH::MetaObject *val_metaData)
336 {
338 RCU_REQUIRE (val_metaData != 0);
339
340 m_metaData = val_metaData;
341 }
342
343
344
345 void Worker ::
346 setOutputHist (const std::string& val_outputTarget)
347 {
349
350 m_outputTarget = val_outputTarget;
351 }
352
353
354
355 void Worker ::
356 setSegmentName (const std::string& val_segmentName)
357 {
359
360 m_segmentName = val_segmentName;
361 }
362
363
364
365 void Worker ::
366 setJobConfig (JobConfig&& jobConfig)
367 {
369 for (std::unique_ptr<IAlgorithmWrapper>& alg : jobConfig.extractAlgorithms())
370 {
371 m_algs.push_back (std::move (alg));
372 }
373 }
374
375
376
377 ::StatusCode Worker ::
378 initialize ()
379 {
380 using namespace msgEventLoop;
382
383 const bool xAODInput = m_metaData->castBool (Job::optXAODInput, false);
384
385 ANA_MSG_INFO ("xAODInput = " << xAODInput);
386
387 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
388 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/EarlyMemoryMonitorModule");
389 if (auto cacheSize = metaData()->castDouble (Job::optCacheSize, 0); cacheSize > 0)
390 {
391 m_moduleConfig.emplace_back ("EL::Detail::TreeCacheModule/TreeCacheModule");
392 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheSize", Long64_t (cacheSize)));
393 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheLearnEntries", Long64_t (metaData()->castInteger (Job::optCacheLearnEntries, 0))));
394 ANA_CHECK (m_moduleConfig.back().setProperty ("printPerFileStats", metaData()->castBool (Job::optPrintPerFileStats, false)));
395 }
396 if (xAODInput)
397 {
398 m_moduleConfig.emplace_back ("EL::Detail::TEventModule/TEventModule");
399 ANA_CHECK (m_moduleConfig.back().setProperty ("accessMode", metaData()->castString (Job::optXaodAccessMode)));
400 ANA_CHECK (m_moduleConfig.back().setProperty ("otherMetaDataTreeNamePattern", metaData()->castString (Job::optOtherMetaDataTreeNamePattern)));
401 if (metaData()->castDouble (Job::optXAODSummaryReport, 1) == 0)
402 ANA_CHECK (m_moduleConfig.back().setProperty ("summaryReport", false));
403 ANA_CHECK (m_moduleConfig.back().setProperty ("useStats", metaData()->castBool (Job::optXAODPerfStats, false)));
404 }
405 auto factoryPreload = metaData()->castString (Job::optFactoryPreload, "");
406 if (!factoryPreload.empty())
407 {
408 m_moduleConfig.emplace_back ("EL::Detail::FactoryPreloadModule/FactoryPreloadModule");
409 ANA_CHECK (m_moduleConfig.back().setProperty ("preloader", factoryPreload));
410 }
411 m_moduleConfig.emplace_back ("EL::Detail::LeakCheckModule/LeakCheckModule");
412 ANA_CHECK (m_moduleConfig.back().setProperty ("failOnLeak", metaData()->castBool (Job::optMemFailOnLeak, false)));
413 ANA_CHECK (m_moduleConfig.back().setProperty ("absResidentLimit", metaData()->castInteger (Job::optMemResidentIncreaseLimit, 10000)));
414 ANA_CHECK (m_moduleConfig.back().setProperty ("absVirtualLimit", metaData()->castInteger (Job::optMemVirtualIncreaseLimit, 0)));
415 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvResidentLimit", metaData()->castInteger (Job::optMemResidentPerEventIncreaseLimit, 10)));
416 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvVirtualLimit", metaData()->castInteger (Job::optMemVirtualPerEventIncreaseLimit, 0)));
417 m_moduleConfig.emplace_back ("EL::Detail::StopwatchModule/StopwatchModule");
418 if (metaData()->castBool (Job::optGridReporting, false))
419 m_moduleConfig.emplace_back ("EL::Detail::GridReportingModule/GridReportingModule");
420 if (metaData()->castBool (Job::optAlgorithmTimer, false))
421 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmTimerModule/AlgorithmTimerModule");
422 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
423 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmMemoryModule/AlgorithmMemoryModule");
424 m_moduleConfig.emplace_back ("EL::Detail::FileExecutedModule/FileExecutedModule");
425 m_moduleConfig.emplace_back ("EL::Detail::EventCountModule/EventCountModule");
426 m_moduleConfig.emplace_back ("EL::Detail::WorkerConfigModule/WorkerConfigModule");
427 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmStateModule/AlgorithmStateModule");
428 m_moduleConfig.emplace_back ("EL::Detail::PostClosedOutputsModule/PostClosedOutputsModule");
429 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
430 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/LateMemoryMonitorModule");
431
432 for (const auto& config : m_moduleConfig)
433 {
434 std::unique_ptr<Detail::Module> module;
435 ANA_CHECK (make_module (module, config));
436 m_modules.push_back (std::move (module));
437 }
438
439 if (m_outputs.find (Job::histogramStreamName) == m_outputs.end())
440 {
442 m_outputTarget + "/hist-" + m_segmentName + ".root", "RECREATE"};
444 }
447 if (auto aliases = metaData()->castString (Job::optStreamAliases, ""); !aliases.empty())
448 {
449 // the format of aliases is "alias1=realname1,alias2=realname2"
450 std::istringstream iss (aliases);
451 std::string alias;
452 while (std::getline (iss, alias, ','))
453 {
454 auto pos = alias.find ('=');
455 if (pos == std::string::npos)
456 {
457 ANA_MSG_ERROR ("Invalid alias format: " << alias);
458 return ::StatusCode::FAILURE;
459 }
460 auto aliasName = alias.substr (0, pos);
461 auto realName = alias.substr (pos + 1);
462 auto realOutput = m_outputs.find (realName);
463 if (realOutput == m_outputs.end())
464 {
465 ANA_MSG_ERROR ("output stream " << realName << " not found for alias " << aliasName);
466 return ::StatusCode::FAILURE;
467 }
468 auto [aliasOutput, success] = m_outputs.emplace (aliasName, realOutput->second);
469 if (!success)
470 {
471 ANA_MSG_ERROR ("output stream " << aliasName << " already exists, can't make alias");
472 return ::StatusCode::FAILURE;
473 }
474 }
475 }
476
477 m_jobStats = std::make_unique<TTree>
478 ("EventLoop_JobStats", "EventLoop job statistics");
479 m_jobStats->SetDirectory (nullptr);
480
481 ANA_MSG_INFO ("calling firstInitialize on all modules");
482 for (auto& module : m_modules)
483 ANA_CHECK (module->firstInitialize (*this));
484 ANA_MSG_INFO ("calling preFileInitialize on all modules");
485 for (auto& module : m_modules)
486 ANA_CHECK (module->preFileInitialize (*this));
487
488 return ::StatusCode::SUCCESS;
489 }
490
491
492
493 ::StatusCode Worker ::
494 processInputs ()
495 {
496 using namespace msgEventLoop;
497
499
500 for (auto& module : m_modules)
501 ANA_CHECK (module->processInputs (*this, *this));
502 return ::StatusCode::SUCCESS;
503 }
504
505
506
507 ::StatusCode Worker ::
508 finalize ()
509 {
510 using namespace msgEventLoop;
511
513
514 if (m_algorithmsInitialized == false)
515 {
516 ANA_MSG_ERROR ("algorithms never got initialized");
517 return StatusCode::FAILURE;
518 }
519
521 for (auto& module : m_modules)
522 ANA_CHECK (module->onFinalize (*this));
523 for (auto& output : m_outputs)
524 {
525 if (output.first != Job::histogramStreamName && output.second->mainStreamName() == output.first)
526 {
527 output.second->saveOutput ();
528 output.second->close ();
529 std::string path = output.second->finalFileName ();
530 if (!path.empty())
531 addOutputList ("EventLoop_OutputStream_" + output.first, new TObjString (path.c_str()));
532 }
533 }
534 for (auto& module : m_modules)
535 ANA_CHECK (module->postFinalize (*this));
536 if (m_jobStats->GetListOfBranches()->GetEntries() > 0)
537 {
538 if (m_jobStats->Fill() <= 0)
539 {
540 ANA_MSG_ERROR ("failed to fill the job statistics tree");
541 return ::StatusCode::FAILURE;
542 }
543 ModuleData::addOutput (std::move (m_jobStats));
544 }
545 m_histOutput->saveOutput ();
546 for (auto& module : m_modules)
547 ANA_CHECK (module->onWorkerEnd (*this));
548 m_histOutput->saveOutput ();
549 m_histOutput->close ();
550
551 for (auto& module : m_modules){
552 ANA_CHECK (module->postFileClose(*this));
553 }
554 ANA_MSG_INFO ("worker finished successfully");
555 return ::StatusCode::SUCCESS;
556 }
557
558
559
560 ::StatusCode Worker ::
561 processEvents (EventRange& eventRange)
562 {
563 using namespace msgEventLoop;
564
566 RCU_REQUIRE (!eventRange.m_url.empty());
567 RCU_REQUIRE (eventRange.m_beginEvent >= 0);
568 RCU_REQUIRE (eventRange.m_endEvent == EventRange::eof || eventRange.m_endEvent >= eventRange.m_beginEvent);
569
570 ANA_CHECK (openInputFile (eventRange.m_url));
571
572 if (eventRange.m_beginEvent > inputFileNumEntries())
573 {
574 ANA_MSG_ERROR ("first event (" << eventRange.m_beginEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
575 return ::StatusCode::FAILURE;
576 }
577 if (eventRange.m_endEvent == EventRange::eof)
578 {
579 eventRange.m_endEvent = inputFileNumEntries();
580 } else if (eventRange.m_endEvent > inputFileNumEntries())
581 {
582 ANA_MSG_ERROR ("end event (" << eventRange.m_endEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
583 return ::StatusCode::FAILURE;
584 }
585
586 m_inputTreeEntry = eventRange.m_beginEvent;
587
588 if (m_algorithmsInitialized == false)
589 {
590 for (auto& module : m_modules)
591 ANA_CHECK (module->onInitialize (*this));
593 }
594
595 if (m_newInputFile)
596 {
597 m_newInputFile = false;
598 for (auto& module : m_modules)
599 ANA_CHECK (module->onNewInputFile (*this));
600 }
601
602 if (eventRange.m_beginEvent == 0)
603 {
604 for (auto& module : m_modules)
605 ANA_CHECK (module->onFileExecute (*this));
606 }
607
608 ANA_MSG_INFO ("Processing events " << eventRange.m_beginEvent << "-" << eventRange.m_endEvent << " in file " << eventRange.m_url);
609
610 for (uint64_t event = eventRange.m_beginEvent;
611 event != uint64_t (eventRange.m_endEvent);
612 ++ event)
613 {
614 m_inputTreeEntry = event;
615 for (auto& module : m_modules)
616 {
617 if (module->onExecute (*this).isFailure())
618 {
619 ANA_MSG_ERROR ("processing event " << treeEntry() << " on file " << inputFileName());
620 return ::StatusCode::FAILURE;
621 }
622 }
623 if (m_firstEvent)
624 {
625 m_firstEvent = false;
626 for (auto& module : m_modules)
627 ANA_CHECK (module->postFirstEvent (*this));
628 }
630 if (m_eventsProcessed % 10000 == 0)
631 ANA_MSG_INFO ("Processed " << m_eventsProcessed << " events");
632 }
633 return ::StatusCode::SUCCESS;
634 }
635
636
637
638 bool Worker ::
639 fileOpenErrorFilter(int level, bool /*b1*/, const char* s1, const char * s2)
640 {
641 // Don't fail on missing dictionary messages.
642 if (strstr (s2, "no streamer or dictionary") != nullptr) {
643 return true;
644 }
645
646 // For messages above warning level (SysError, Error, Fatal)
647 if( level > kWarning ) {
648 // We won't output further; ROOT should have already put something in the log file
649 std::string msg = "ROOT error detected in Worker.cxx: ";
650 msg += s1;
651 msg += " ";
652 msg += s2;
653 throw std::runtime_error(msg);
654
655 // No need for further error handling
656 return false;
657 }
658
659 // Pass to the default error handlers
660 return true;
661 }
662
663 ::StatusCode Worker ::
664 openInputFile (const std::string& inputFileUrl)
665 {
666 using namespace msgEventLoop;
667
668 // Enable custom error handling in a nice way
670
672
673 if (m_inputFileUrl == inputFileUrl)
674 return ::StatusCode::SUCCESS;
675
676 if (!m_inputFileUrl.empty())
677 {
678 if (m_newInputFile == false)
679 {
680 for (auto& module : m_modules)
681 ANA_CHECK (module->onCloseInputFile (*this));
682 for (auto& module : m_modules)
683 ANA_CHECK (module->postCloseInputFile (*this));
684 }
685 m_newInputFile = false;
686 m_inputTree = nullptr;
687 m_inputFile.reset ();
688 m_inputFileUrl.clear ();
689 }
690
691 if (inputFileUrl.empty())
692 return ::StatusCode::SUCCESS;
693
694 ANA_MSG_INFO ("Opening file " << inputFileUrl);
695 std::unique_ptr<TFile> inputFile;
696 try
697 {
698 inputFile = SH::openFile (inputFileUrl, *metaData());
699 } catch (...)
700 {
701 Detail::report_exception (std::current_exception());
702 }
703 if (inputFile.get() == 0)
704 {
705 ANA_MSG_ERROR ("failed to open file " << inputFileUrl);
706 for (auto& module : m_modules)
707 module->reportInputFailure (*this);
708 return ::StatusCode::FAILURE;
709 }
710 if (inputFile->IsZombie())
711 {
712 ANA_MSG_ERROR ("input file is a zombie: " << inputFileUrl);
713 for (auto& module : m_modules)
714 module->reportInputFailure (*this);
715 return ::StatusCode::FAILURE;
716 }
717
718 TTree *tree = 0;
719 const std::string treeName
721 tree = dynamic_cast<TTree*>(inputFile->Get (treeName.c_str()));
722 if (tree == nullptr)
723 {
724 ANA_MSG_INFO ("tree " << treeName << " not found in input file: " << inputFileUrl);
725 ANA_MSG_INFO ("treating this like a tree with no events");
726 }
727
728 m_newInputFile = true;
731 m_inputFile = std::move (inputFile);
732 m_inputFileUrl = std::move (inputFileUrl);
733
734 return ::StatusCode::SUCCESS;
735 }
736
737
738
739 ::StatusCode Worker ::
740 addOutputStream (const std::string& label,
742 {
743 using namespace msgEventLoop;
745
746 if (m_outputs.find (label) != m_outputs.end())
747 {
748 ANA_MSG_ERROR ("output file already defined for label: " + label);
749 return ::StatusCode::FAILURE;
750 }
751 if (data.file() == nullptr)
752 {
753 ANA_MSG_ERROR ("output stream does not have a file attached");
754 return ::StatusCode::FAILURE;
755 }
756 if (data.mainStreamName().empty())
757 data.setMainStreamName (label);
758 m_outputs.insert (std::make_pair (label, std::make_shared<Detail::OutputStreamData>(std::move (data))));
759 return ::StatusCode::SUCCESS;
760 }
761
762
763
764 Long64_t Worker ::
765 inputFileNumEntries () const
766 {
767 RCU_READ_INVARIANT (this);
768 RCU_REQUIRE (inputFile() != 0);
769
770 if (m_inputTree != 0)
771 return m_inputTree->GetEntries();
772 else
773 return 0;
774 }
775
776
777
778 uint64_t Worker ::
779 eventsProcessed () const noexcept
780 {
781 RCU_READ_INVARIANT (this);
782 return m_eventsProcessed;
783 }
784
785
786
787 ::StatusCode Worker ::
788 directExecute (const SH::SamplePtr& sample, const Job& job,
789 const std::string& location, const SH::MetaObject& options)
790 {
791 using namespace msgEventLoop;
793
794 SH::MetaObject meta (*sample->meta());
795 meta.fetchDefaults (options);
796
797 setMetaData (&meta);
798 setOutputHist (location);
799 setSegmentName (sample->name());
800
801 ANA_MSG_INFO ("Running sample: " << sample->name());
802
803 setJobConfig (JobConfig (job.jobConfig()));
804
805 for (Job::outputIter out = job.outputBegin(),
806 end = job.outputEnd(); out != end; ++ out)
807 {
809 out->output()->makeWriter (sample->name(), "", ".root")};
810 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
811 }
812
813 {
814 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
815 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", sample->makeFileList()));
816 Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
817 if (maxEvents != -1)
818 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
819 Long64_t skipEvents = metaData()->castDouble (Job::optSkipEvents, 0);
820 if (skipEvents != 0)
821 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", skipEvents));
822 }
823
826 ANA_CHECK (finalize ());
827 return ::StatusCode::SUCCESS;
828 }
829
830
831
832 ::StatusCode Worker ::
833 batchExecute (unsigned job_id, const char *confFile)
834 {
835 using namespace msgEventLoop;
837
838 try
839 {
840 std::unique_ptr<TFile> file (TFile::Open (confFile, "READ"));
841 if (file.get() == nullptr || file->IsZombie())
842 {
843 ANA_MSG_ERROR ("failed to open file: " << confFile);
844 return ::StatusCode::FAILURE;
845 }
846
847 std::unique_ptr<BatchJob> job (dynamic_cast<BatchJob*>(file->Get ("job")));
848 m_batchJob = job.get();
849 if (job.get() == nullptr)
850 {
851 ANA_MSG_ERROR ("failed to retrieve BatchJob object");
852 return ::StatusCode::FAILURE;
853 }
854
855 if (job_id >= job->segments.size())
856 {
857 ANA_MSG_ERROR ("invalid job-id " << job_id << ", max is " << job->segments.size());
858 return ::StatusCode::FAILURE;
859 }
860 BatchSegment *segment = &job->segments[job_id];
861 RCU_ASSERT (segment->job_id == job_id);
862 RCU_ASSERT (segment->sample < job->samples.size());
863 BatchSample *sample = &job->samples[segment->sample];
864
865 gSystem->Exec ("pwd");
866 gSystem->MakeDirectory ("output");
867
868 setMetaData (&sample->meta);
869 setOutputHist (job->location + "/fetch");
870 setSegmentName (segment->fullName);
871
872 setJobConfig (JobConfig (job->job.jobConfig()));
873
874 for (Job::outputIter out = job->job.outputBegin(),
875 end = job->job.outputEnd(); out != end; ++ out)
876 {
878 out->output()->makeWriter (segment->sampleName, segment->segmentName, ".root")};
879 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
880 }
881
882 {
883 m_moduleConfig.emplace_back ("EL::Detail::BatchInputModule/BatchInputModule");
884 ANA_CHECK (m_moduleConfig.back().setProperty ("jobId", job_id));
885 Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
886 if (maxEvents != -1)
887 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
888 }
889
892 ANA_CHECK (finalize ());
893
894 std::ostringstream job_name;
895 job_name << job_id;
896 std::ofstream completed ((job->location + "/status/completed-" + job_name.str()).c_str());
897 return ::StatusCode::SUCCESS;
898 } catch (...)
899 {
900 Detail::report_exception (std::current_exception());
901 return ::StatusCode::FAILURE;
902 }
903 }
904
905
906
907 ::StatusCode Worker ::
908 gridExecute (const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
909 {
910 using namespace msgEventLoop;
912
913 ANA_MSG_INFO ("Running with ROOT version " << gROOT->GetVersion()
914 << " (" << gROOT->GetVersionDate() << ")");
915
916 ANA_MSG_INFO ("Loading EventLoop grid job");
917
918
919 TList bigOutputs;
920 std::unique_ptr<JobConfig> jobConfig;
921 SH::MetaObject *mo = 0;
922
923 std::unique_ptr<TFile> f (TFile::Open("jobdef.root"));
924 if (f == nullptr || f->IsZombie()) {
925 ANA_MSG_ERROR ("Could not read jobdef");
926 return ::StatusCode::FAILURE;
927 }
928
929 mo = dynamic_cast<SH::MetaObject*>(f->Get(sampleName.c_str()));
930 if (!mo)
931 mo = dynamic_cast<SH::MetaObject*>(f->Get("defaultMetaObject"));
932 if (!mo) {
933 ANA_MSG_ERROR ("Could not read in sample meta object");
934 return ::StatusCode::FAILURE;
935 }
936
937 jobConfig.reset (dynamic_cast<JobConfig*>(f->Get("jobConfig")));
938 if (jobConfig == nullptr)
939 {
940 ANA_MSG_ERROR ("failed to read jobConfig object");
941 return ::StatusCode::FAILURE;
942 }
943
944 {
945 std::unique_ptr<TList> outs ((TList*)f->Get("outputs"));
946 if (outs == nullptr)
947 {
948 ANA_MSG_ERROR ("Could not read list of outputs");
949 return ::StatusCode::FAILURE;
950 }
951
952 TIter itr(outs.get());
953 TObject *obj = 0;
954 while ((obj = itr())) {
955 EL::OutputStream * out = dynamic_cast<EL::OutputStream*>(obj);
956 if (out) {
957 bigOutputs.Add(out);
958 }
959 else {
960 ANA_MSG_ERROR ("Encountered unexpected entry in list of outputs");
961 return ::StatusCode::FAILURE;
962 }
963 }
964 }
965
966 f->Close();
967 f.reset ();
968
969 const std::string location = ".";
970
971 mo->setBool (Job::optGridReporting, true);
972 setMetaData (mo);
973 setOutputHist (location);
974 setSegmentName ("output");
975
976 ANA_MSG_INFO ("Starting EventLoop Grid worker");
977
978 {//Create and register the "big" output files with base class
979 TIter itr(&bigOutputs);
980 TObject *obj = 0;
981 while ((obj = itr())) {
982 EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
983 if (os == nullptr)
984 {
985 ANA_MSG_ERROR ("Bad input");
986 return ::StatusCode::FAILURE;
987 }
988 {
990 location + "/" + os->label() + ".root", "RECREATE"};
991 ANA_CHECK (addOutputStream (os->label(), std::move (data)));
992 }
993 }
994 }
995
996 setJobConfig (std::move (*jobConfig));
997
998 {
999 std::vector<std::string> fileList;
1000 std::ifstream infile("input.txt");
1001 while (infile) {
1002 std::string sLine;
1003 if (!getline(infile, sLine)) break;
1004 std::istringstream ssLine(sLine);
1005 while (ssLine) {
1006 std::string sFile;
1007 if (!getline(ssLine, sFile, ',')) break;
1008 fileList.push_back(sFile);
1009 }
1010 }
1011 if (fileList.size() == 0) {
1012 ANA_MSG_ERROR ("no input files provided");
1013 //User was expecting input after all.
1014 gSystem->Exit(EC_BADINPUT);
1015 }
1016 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
1017 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", fileList));
1018
1019 if (nEventsPerJob != -1)
1020 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", nEventsPerJob));
1021 if (SkipEvents != 0)
1022 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", SkipEvents));
1023 }
1024
1027 ANA_CHECK (finalize ());
1028
1029 int nEvents = eventsProcessed();
1030 ANA_MSG_INFO ("Loop finished.");
1031 ANA_MSG_INFO ("Read/processed " << nEvents << " events.");
1032
1033 ANA_MSG_INFO ("EventLoop Grid worker finished");
1034 ANA_MSG_INFO ("Saving output");
1035 return ::StatusCode::SUCCESS;
1036 }
1037}
#define RCU_INVARIANT(x)
Definition Assert.h:201
#define RCU_ASSERT(x)
Definition Assert.h:222
#define RCU_DESTROY_INVARIANT(x)
Definition Assert.h:235
#define RCU_CHANGE_INVARIANT(x)
Definition Assert.h:231
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233
#define RCU_REQUIRE(x)
Definition Assert.h:208
#define RCU_REQUIRE_SOFT(x)
Definition Assert.h:153
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:58
Run a MT piece of code with an alternate root error handler.
all data needed to manage a given output stream
the job configuration that is independent of driver and dataset
Definition JobConfig.h:39
Definition Job.h:51
static const std::string optPrintPerFileStats
description: the option to turn on printing of i/o statistics at the end of each file rationale: whil...
Definition Job.h:425
static const std::string optMemFailOnLeak
Failure behaviour of the code when a "significant memory leak" is found.
Definition Job.h:607
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
Definition Job.h:225
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
Definition Job.h:510
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
Definition Job.h:205
static const std::string optMemResidentIncreaseLimit
The minimal resident memory increase necessary to trigger an error.
Definition Job.h:585
static const std::string optXAODPerfStats
description: the name of the option for turning on XAODPerfStats.
Definition Job.h:356
const OutputStream * outputIter
Definition Job.h:143
static const std::string optXAODSummaryReport
the option to turn on/off the xAOD summary reporting at the end of the job
Definition Job.h:415
static const std::string optXaodAccessMode
description: the option to select the access mode for xAODs.
Definition Job.h:400
static const std::string optCacheLearnEntries
description: this option allows to configure the number of tree entries used for learning cache behav...
Definition Job.h:327
static const std::string optCacheSize
description: this option allows to configure the TTreeCache size for this job.
Definition Job.h:312
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
Definition Job.h:210
static const std::string optXAODInput
the option to select whether our input is xAODs
Definition Job.h:393
static const std::string optMemResidentPerEventIncreaseLimit
The minimal per-event resident memory increase for triggering an error.
Definition Job.h:567
static const std::string optMemVirtualIncreaseLimit
The minimal virtual memory increase necessary to trigger an error.
Definition Job.h:593
static const std::string optMemVirtualPerEventIncreaseLimit
The minimal per-event virtual memory increase for triggering an error.
Definition Job.h:577
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
Definition Job.h:233
static const std::string optStreamAliases
an option for stream aliases
Definition Job.h:217
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
Definition Job.h:214
static const std::string optOtherMetaDataTreeNamePattern
Pattern for other MetaData tree name in input xAODs Can be useful for augmented file reading or exclu...
Definition Job.h:409
static const std::string histogramStreamName
the name of the histogram output stream
Definition Job.h:618
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
Definition Worker.cxx:219
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
Definition Worker.cxx:237
@ EC_BADINPUT
Definition Worker.h:254
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
Definition Worker.cxx:103
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
Definition Worker.cxx:156
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
Definition Worker.cxx:228
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
Definition Worker.cxx:178
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
Definition Worker.cxx:639
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
Definition Worker.cxx:346
std::vector< asg::AsgComponentConfig > m_moduleConfig
the module configurations we use
Definition Worker.h:446
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
Definition Worker.cxx:740
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
Definition Worker.cxx:201
std::string m_segmentName
the name of the segment we are processing
Definition Worker.h:431
::StatusCode finalize()
finalize the worker
Definition Worker.cxx:508
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
Definition Worker.cxx:366
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
Definition Worker.cxx:664
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
Definition Worker.cxx:779
bool m_newInputFile
whether this is a new input file (i.e.
Definition Worker.h:421
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
Definition Worker.cxx:210
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
Definition Worker.cxx:335
bool m_algorithmsInitialized
whether the algorithms are initialized
Definition Worker.h:436
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
Definition Worker.h:415
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
Definition Worker.cxx:89
::StatusCode initialize()
initialize the worker
Definition Worker.cxx:378
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition Worker.cxx:144
void setSegmentName(const std::string &val_segmentName)
set the segment name
Definition Worker.cxx:356
Long64_t inputFileNumEntries() const override
the number of events in the input file
Definition Worker.cxx:765
::StatusCode processInputs()
process all the inputs
Definition Worker.cxx:494
bool m_firstEvent
whether we are still to process the first event
Definition Worker.h:441
std::string m_outputTarget
the target file to which we will write the histogram output
Definition Worker.h:426
Run a MT piece of code with an alternate root error handler.
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:56
void setBool(const std::string &name, bool value)
set the meta-data boolean with the given name
A smart pointer class that holds a single Sample object.
Definition SamplePtr.h:35
Tool for accessing xAOD files outside of Athena.
A relatively simple transient store for objects created in analysis.
Definition TStore.h:45
const int nEvents
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
std::string label(const std::string &format, int i)
Definition label.h:19
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
This module defines the arguments passed from the BATCH driver to the BATCH worker.
::StatusCode StatusCode
StatusCode definition for legacy code.
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
Definition RootUtils.cxx:28
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
-diff
void initialize()
UInt_t job_id
description: the job id of this segment
std::string segmentName
the name/id to use for this segment (not including the sample name)
std::string fullName
the name/id to use for this segment (including the sample name)
UInt_t sample
description: the index of the sample we are using
std::string sampleName
the name of the sample for this segment
std::string m_inputFileUrl
the input file url of the currently opened file
Definition ModuleData.h:69
const SH::MetaObject * m_metaData
the meta-data we use
Definition ModuleData.h:85
Worker * m_worker
the worker (to pass on to the algorithms)
Definition ModuleData.h:106
TTree * m_inputTree
the (main) tree in the input file
Definition ModuleData.h:75
uint64_t m_inputTreeEntry
the entry in the input tree we are currently looking at
Definition ModuleData.h:79
BatchJob * m_batchJob
the BatchJob configuration (if used)
Definition ModuleData.h:112
xAOD::TStore * m_tstore
the TStore structure, if we use one
Definition ModuleData.h:100
bool m_skipEvent
whether we are skipping the current event
Definition ModuleData.h:82
OutputStreamData * m_histOutput
the histogram output stream
Definition ModuleData.h:91
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
Definition ModuleData.h:94
xAOD::TEvent * m_tevent
the TEvent structure, if we use one
Definition ModuleData.h:97
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
Definition ModuleData.h:72
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
the list of output files
Definition ModuleData.h:109
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
Definition ModuleData.h:66
uint64_t m_eventsProcessed
the number of events that have been processed
Definition ModuleData.h:88
a range of events in a given file
Definition EventRange.h:22
std::string m_url
the location of the file
Definition EventRange.h:24
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Definition EventRange.h:34
Long64_t m_beginEvent
the first event to process
Definition EventRange.h:27
Long64_t m_endEvent
the event past the last event, or eof
Definition EventRange.h:30
static const std::string treeName_default
the default value of treeName
Definition MetaFields.h:55
static const std::string treeName
the name of the tree in the sample
Definition MetaFields.h:52
MsgStream & msg
Definition testRead.cxx:32
TFile * file