ATLAS Offline Software
Loading...
Searching...
No Matches
Worker.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7
8//
9// includes
10//
11
12#include <EventLoop/Worker.h>
13
16#include <EventLoop/BatchJob.h>
19#include <EventLoop/Driver.h>
21#include <EventLoop/Job.h>
23#include <EventLoop/Module.h>
38#include <TFile.h>
39#include <TH1.h>
40#include <TROOT.h>
41#include <TSystem.h>
42#include <TTree.h>
43#include <TObjString.h>
44#include <fstream>
45#include <memory>
46#include <exception>
47
48//
49// method implementations
50//
51
52namespace EL
53{
54 namespace
55 {
56 StatusCode make_module (std::unique_ptr<Detail::Module>& module, asg::AsgComponentConfig config)
57 {
58 using namespace msgEventLoop;
59 ANA_MSG_DEBUG ("making EventLoop module of type " + config.type());
60 ANA_CHECK (config.makeComponentExpert (module, "new %1% (\"%2%\")", false, "ELModule."));
61 ANA_MSG_DEBUG ("Created EventLoop module of type " << config.type());
62 return StatusCode::SUCCESS;
63 }
64 }
65
66
67
68 void Worker ::
69 testInvariant () const
70 {
71 RCU_INVARIANT (this != nullptr);
72 for (std::size_t iter = 0, end = m_algs.size(); iter != end; ++ iter)
73 {
74 RCU_INVARIANT (m_algs[iter].m_algorithm != nullptr);
75 }
76 }
77
78
79
80 Worker ::
81 ~Worker ()
82 {
84 }
85
86
87
88 void Worker ::
89 addOutput (TObject *output_swallow)
90 {
91 std::unique_ptr<TObject> output (output_swallow);
92
94 RCU_REQUIRE_SOFT (output_swallow != 0);
95
96 RCU::SetDirectory (output_swallow, 0);
97 ModuleData::addOutput (std::move (output));
98 }
99
100
101
102 void Worker ::
103 addOutputList (const std::string& name, TObject *output_swallow)
104 {
105 std::unique_ptr<TObject> output (output_swallow);
106
108 RCU_REQUIRE_SOFT (output_swallow != 0);
109
110 RCU::SetDirectory (output_swallow, 0);
111 std::unique_ptr<TList> list (new TList);
112 list->SetName (name.c_str());
113 list->Add (output.release());
114 addOutput (list.release());
115 }
116
117
118
119 TObject *Worker ::
120 getOutputHist (const std::string& name) const
121 {
122 RCU_READ_INVARIANT (this);
123
124 TObject *result = m_histOutput->getOutputHist (name);
125 if (result == nullptr) RCU_THROW_MSG ("unknown output histogram: " + name);
126 return result;
127 }
128
129
130
131 TFile *Worker ::
132 getOutputFile (const std::string& label) const
133 {
134 RCU_READ_INVARIANT (this);
135 TFile *result = getOutputFileNull (label);
136 if (result == 0)
137 RCU_THROW_MSG ("no output dataset defined with label: " + label);
138 return result;
139 }
140
141
142
143 TFile *Worker ::
144 getOutputFileNull (const std::string& label) const
145 {
146 RCU_READ_INVARIANT (this);
147 auto iter = m_outputs.find (label);
148 if (iter == m_outputs.end())
149 return 0;
150 return iter->second->file();
151 }
152
153
154
156 addTree( const TTree& tree, const std::string& stream )
157 {
158 using namespace msgEventLoop;
159 RCU_READ_INVARIANT( this );
160
161 auto outputIter = m_outputs.find (stream);
162 if (outputIter == m_outputs.end())
163 {
164 ANA_MSG_ERROR ( "No output file with stream name \"" + stream +
165 "\" found" );
166 return ::StatusCode::FAILURE;
167 }
168
169 outputIter->second->addClone (tree);
170
171 // Return gracefully:
172 return ::StatusCode::SUCCESS;
173 }
174
175
176
177 TTree *Worker::
178 getOutputTree( const std::string& name, const std::string& stream ) const
179 {
180 using namespace msgEventLoop;
181 RCU_READ_INVARIANT( this );
182
183 auto outputIter = m_outputs.find (stream);
184 if (outputIter == m_outputs.end())
185 {
186 RCU_THROW_MSG ( "No output file with stream name \"" + stream
187 + "\" found" );
188 }
189
190 TTree *result = outputIter->second->getOutputTree( name );
191 if( result == nullptr ) {
192 RCU_THROW_MSG ( "No tree with name \"" + name + "\" in stream \"" +
193 stream + "\"" );
194 }
195 return result;
196 }
197
198
199
200 const SH::MetaObject *Worker ::
201 metaData () const
202 {
203 RCU_READ_INVARIANT (this);
204 return m_metaData;
205 }
206
207
208
209 TTree *Worker ::
210 tree () const
211 {
212 RCU_READ_INVARIANT (this);
213 return m_inputTree;
214 }
215
216
217
218 Long64_t Worker ::
219 treeEntry () const
220 {
221 RCU_READ_INVARIANT (this);
222 return m_inputEntry;
223 }
224
225
226
227 TFile *Worker ::
228 inputFile () const
229 {
230 RCU_READ_INVARIANT (this);
231 return m_inputFile.get();
232 }
233
234
235 bool Worker ::
236 hasInputEvents () const
237 {
238 // no invariant used
239 return m_hasInputEvents;
240 }
241
242
243 std::string Worker ::
244 inputFileName () const
245 {
246 // no invariant used
247 std::string path = inputFile()->GetName();
248 auto split = path.rfind ('/');
249 if (split != std::string::npos)
250 return path.substr (split + 1);
251 else
252 return path;
253 }
254
255
256
257 TTree *Worker ::
258 triggerConfig () const
259 {
260 RCU_READ_INVARIANT (this);
261 return dynamic_cast<TTree*>(inputFile()->Get("physicsMeta/TrigConfTree"));
262 }
263
264
265
266 xAOD::Event *Worker ::
267 xaodEvent () const
268 {
269 RCU_READ_INVARIANT (this);
270
271 if (m_event == nullptr)
272 RCU_THROW_MSG ("Job not configured for xAOD support");
273 return m_event;
274 }
275
276
277
278 xAOD::TStore *Worker ::
279 xaodStore () const
280 {
281 RCU_READ_INVARIANT (this);
282
283 if (m_tstore == nullptr)
284 RCU_THROW_MSG ("Job not configured for xAOD support");
285 return m_tstore;
286 }
287
288
289
290 Algorithm *Worker ::
291 getAlg (const std::string& name) const
292 {
293 RCU_READ_INVARIANT (this);
294 for (auto& alg : m_algs)
295 {
296 if (alg->hasName (name))
297 return alg.m_algorithm->getLegacyAlg();
298 }
299 return 0;
300 }
301
302
303
304 void Worker ::
305 skipEvent ()
306 {
308 m_skipEvent = true;
309 }
310
311
312
313 bool Worker ::
314 filterPassed () const noexcept
315 {
316 RCU_READ_INVARIANT (this);
317 return !m_skipEvent;
318 }
319
320
321
322 void Worker ::
323 setFilterPassed (bool val_filterPassed) noexcept
324 {
326 m_skipEvent = !val_filterPassed;
327 }
328
329
330
331 Worker ::
332 Worker ()
333 {
334 m_worker = this;
335
336 RCU_NEW_INVARIANT (this);
337 }
338
339
340
341 void Worker ::
342 setMetaData (const SH::MetaObject *val_metaData)
343 {
345 RCU_REQUIRE (val_metaData != 0);
346
347 m_metaData = val_metaData;
348 }
349
350
351
352 void Worker ::
353 setOutputHist (const std::string& val_outputTarget)
354 {
356
357 m_outputTarget = val_outputTarget;
358 }
359
360
361
362 void Worker ::
363 setSegmentName (const std::string& val_segmentName)
364 {
366
367 m_segmentName = val_segmentName;
368 }
369
370
371
372 void Worker ::
373 setJobConfig (JobConfig&& jobConfig)
374 {
376 for (auto& alg : jobConfig.extractAlgorithms())
377 {
378 m_algs.push_back (std::move (alg));
379 }
380 }
381
382
383
384 ::StatusCode Worker ::
385 initialize ()
386 {
387 using namespace msgEventLoop;
389
390 const bool xAODInput = m_metaData->castBool (Job::optXAODInput, false);
391
392 ANA_MSG_INFO ("xAODInput = " << xAODInput);
393
394 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
395 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/EarlyMemoryMonitorModule");
396 if (auto cacheSize = metaData()->castDouble (Job::optCacheSize, 0); cacheSize > 0)
397 {
398 m_moduleConfig.emplace_back ("EL::Detail::TreeCacheModule/TreeCacheModule");
399 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheSize", Long64_t (cacheSize)));
400 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheLearnEntries", Long64_t (metaData()->castInteger (Job::optCacheLearnEntries, 0))));
401 ANA_CHECK (m_moduleConfig.back().setProperty ("printPerFileStats", metaData()->castBool (Job::optPrintPerFileStats, false)));
402 }
403 if (xAODInput)
404 {
405 m_moduleConfig.emplace_back ("EL::Detail::EventModule/EventModule");
406 if (metaData()->castDouble (Job::optXAODSummaryReport, 1) == 0)
407 ANA_CHECK (m_moduleConfig.back().setProperty ("summaryReport", false));
408 ANA_CHECK (m_moduleConfig.back().setProperty ("useStats", metaData()->castBool (Job::optXAODPerfStats, false)));
409 }
410 auto factoryPreload = metaData()->castString (Job::optFactoryPreload, "");
411 if (!factoryPreload.empty())
412 {
413 m_moduleConfig.emplace_back ("EL::Detail::FactoryPreloadModule/FactoryPreloadModule");
414 ANA_CHECK (m_moduleConfig.back().setProperty ("preloader", factoryPreload));
415 }
416 m_moduleConfig.emplace_back ("EL::Detail::LeakCheckModule/LeakCheckModule");
417 ANA_CHECK (m_moduleConfig.back().setProperty ("failOnLeak", metaData()->castBool (Job::optMemFailOnLeak, false)));
418 ANA_CHECK (m_moduleConfig.back().setProperty ("absResidentLimit", metaData()->castInteger (Job::optMemResidentIncreaseLimit, 10000)));
419 ANA_CHECK (m_moduleConfig.back().setProperty ("absVirtualLimit", metaData()->castInteger (Job::optMemVirtualIncreaseLimit, 0)));
420 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvResidentLimit", metaData()->castInteger (Job::optMemResidentPerEventIncreaseLimit, 10)));
421 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvVirtualLimit", metaData()->castInteger (Job::optMemVirtualPerEventIncreaseLimit, 0)));
422 m_moduleConfig.emplace_back ("EL::Detail::StopwatchModule/StopwatchModule");
423 if (metaData()->castBool (Job::optGridReporting, false))
424 m_moduleConfig.emplace_back ("EL::Detail::GridReportingModule/GridReportingModule");
425 if (metaData()->castBool (Job::optAlgorithmTimer, false))
426 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmTimerModule/AlgorithmTimerModule");
427 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
428 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmMemoryModule/AlgorithmMemoryModule");
429 m_moduleConfig.emplace_back ("EL::Detail::FileExecutedModule/FileExecutedModule");
430 m_moduleConfig.emplace_back ("EL::Detail::EventCountModule/EventCountModule");
431 m_moduleConfig.emplace_back ("EL::Detail::WorkerConfigModule/WorkerConfigModule");
432 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmStateModule/AlgorithmStateModule");
433 m_moduleConfig.emplace_back ("EL::Detail::PostClosedOutputsModule/PostClosedOutputsModule");
434 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
435 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/LateMemoryMonitorModule");
436
437 for (const auto& config : m_moduleConfig)
438 {
439 std::unique_ptr<Detail::Module> module;
440 ANA_CHECK (make_module (module, config));
441 m_modules.push_back (std::move (module));
442 }
443
444 if (m_outputs.find (Job::histogramStreamName) == m_outputs.end())
445 {
447 m_outputTarget + "/hist-" + m_segmentName + ".root", "RECREATE"};
449 }
452 if (auto aliases = metaData()->castString (Job::optStreamAliases, ""); !aliases.empty())
453 {
454 // the format of aliases is "alias1=realname1,alias2=realname2"
455 std::istringstream iss (aliases);
456 std::string alias;
457 while (std::getline (iss, alias, ','))
458 {
459 auto pos = alias.find ('=');
460 if (pos == std::string::npos)
461 {
462 ANA_MSG_ERROR ("Invalid alias format: " << alias);
463 return ::StatusCode::FAILURE;
464 }
465 auto aliasName = alias.substr (0, pos);
466 auto realName = alias.substr (pos + 1);
467 auto realOutput = m_outputs.find (realName);
468 if (realOutput == m_outputs.end())
469 {
470 ANA_MSG_ERROR ("output stream " << realName << " not found for alias " << aliasName);
471 return ::StatusCode::FAILURE;
472 }
473 auto [aliasOutput, success] = m_outputs.emplace (aliasName, realOutput->second);
474 if (!success)
475 {
476 ANA_MSG_ERROR ("output stream " << aliasName << " already exists, can't make alias");
477 return ::StatusCode::FAILURE;
478 }
479 }
480 }
481
482 m_jobStats = std::make_unique<TTree>
483 ("EventLoop_JobStats", "EventLoop job statistics");
484 m_jobStats->SetDirectory (nullptr);
485
486 ANA_MSG_INFO ("calling firstInitialize on all modules");
487 for (auto& module : m_modules)
488 ANA_CHECK (module->firstInitialize (*this));
489 ANA_MSG_INFO ("calling preFileInitialize on all modules");
490 for (auto& module : m_modules)
491 ANA_CHECK (module->preFileInitialize (*this));
492
493 return ::StatusCode::SUCCESS;
494 }
495
496
497
498 ::StatusCode Worker ::
499 processInputs ()
500 {
501 using namespace msgEventLoop;
502
504
505 for (auto& module : m_modules)
506 ANA_CHECK (module->processInputs (*this, *this));
507
508 return ::StatusCode::SUCCESS;
509 }
510
511
512
513 ::StatusCode Worker ::
514 finalize ()
515 {
516 using namespace msgEventLoop;
517
519
520 if (m_algorithmsInitialized == false)
521 {
522 ANA_MSG_ERROR ("algorithms never got initialized");
523 return StatusCode::FAILURE;
524 }
525
527 for (auto& module : m_modules)
528 ANA_CHECK (module->onFinalize (*this));
529 for (auto& output : m_outputs)
530 {
531 if (output.first != Job::histogramStreamName && output.second->mainStreamName() == output.first)
532 {
533 output.second->saveOutput ();
534 output.second->close ();
535 std::string path = output.second->finalFileName ();
536 if (!path.empty())
537 addOutputList ("EventLoop_OutputStream_" + output.first, new TObjString (path.c_str()));
538 }
539 }
540 for (auto& module : m_modules)
541 ANA_CHECK (module->postFinalize (*this));
542 if (m_jobStats->GetListOfBranches()->GetEntries() > 0)
543 {
544 if (m_jobStats->Fill() <= 0)
545 {
546 ANA_MSG_ERROR ("failed to fill the job statistics tree");
547 return ::StatusCode::FAILURE;
548 }
549 ModuleData::addOutput (std::move (m_jobStats));
550 }
551 m_histOutput->saveOutput ();
552 for (auto& module : m_modules)
553 ANA_CHECK (module->onWorkerEnd (*this));
554 m_histOutput->saveOutput ();
555 m_histOutput->close ();
556
557 for (auto& module : m_modules){
558 ANA_CHECK (module->postFileClose(*this));
559 }
560 ANA_MSG_INFO ("worker finished successfully");
561 return ::StatusCode::SUCCESS;
562 }
563
564
565
566 ::StatusCode Worker ::
567 processEvents (EventRange& eventRange)
568 {
569 using namespace msgEventLoop;
570
572 RCU_REQUIRE (!eventRange.m_url.empty());
573 RCU_REQUIRE (eventRange.m_beginEvent >= 0);
574 RCU_REQUIRE (eventRange.m_endEvent == EventRange::eof || eventRange.m_endEvent >= eventRange.m_beginEvent);
575
576 ANA_CHECK (openInputFile (eventRange.m_url));
577
578 if (eventRange.m_beginEvent > inputFileNumEntries())
579 {
580 ANA_MSG_ERROR ("first event (" << eventRange.m_beginEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
581 return ::StatusCode::FAILURE;
582 }
583 if (eventRange.m_endEvent == EventRange::eof)
584 {
585 eventRange.m_endEvent = inputFileNumEntries();
586 } else if (eventRange.m_endEvent > inputFileNumEntries())
587 {
588 ANA_MSG_ERROR ("end event (" << eventRange.m_endEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
589 return ::StatusCode::FAILURE;
590 }
591
592 m_inputEntry = eventRange.m_beginEvent;
593
594 if (m_algorithmsInitialized == false)
595 {
596 for (auto& module : m_modules)
597 ANA_CHECK (module->onInitialize (*this));
599 }
600
601 if (m_newInputFile)
602 {
603 m_newInputFile = false;
604 for (auto& module : m_modules)
605 ANA_CHECK (module->onNewInputFile (*this));
606 }
607
608 if (eventRange.m_beginEvent == 0)
609 {
610 for (auto& module : m_modules)
611 ANA_CHECK (module->onFileExecute (*this));
612 }
613
614 ANA_MSG_INFO ("Processing events " << eventRange.m_beginEvent << "-" << eventRange.m_endEvent << " in file " << eventRange.m_url);
615
616 for (uint64_t event = eventRange.m_beginEvent;
617 event != uint64_t (eventRange.m_endEvent);
618 ++ event)
619 {
620 m_inputEntry = event;
621 for (auto& module : m_modules)
622 {
623 if (module->onExecute (*this).isFailure())
624 {
625 ANA_MSG_ERROR ("processing event " << treeEntry() << " on file " << inputFileName());
626 return ::StatusCode::FAILURE;
627 }
628 }
629 if (m_firstEvent)
630 {
631 m_firstEvent = false;
632 for (auto& module : m_modules)
633 ANA_CHECK (module->postFirstEvent (*this));
634 }
636 if (m_eventsProcessed % 10000 == 0)
637 ANA_MSG_INFO ("Processed " << m_eventsProcessed << " events");
638 }
639 return ::StatusCode::SUCCESS;
640 }
641
642
643
644 bool Worker ::
645 fileOpenErrorFilter(int level, bool /*b1*/, const char* s1, const char * s2)
646 {
647 // Don't fail on missing dictionary messages.
648 if (strstr (s2, "no streamer or dictionary") != nullptr) {
649 return true;
650 }
651
652 // For messages above warning level (SysError, Error, Fatal)
653 if( level > kWarning ) {
654 // We won't output further; ROOT should have already put something in the log file
655 std::string msg = "ROOT error detected in Worker.cxx: ";
656 msg += s1;
657 msg += " ";
658 msg += s2;
659 throw std::runtime_error(msg);
660
661 // No need for further error handling
662 return false;
663 }
664
665 // Pass to the default error handlers
666 return true;
667 }
668
669 ::StatusCode Worker ::
670 openInputFile (const std::string& inputFileUrl)
671 {
672 using namespace msgEventLoop;
673
674 // Enable custom error handling in a nice way
676
678
679 if (m_inputFileUrl == inputFileUrl)
680 return ::StatusCode::SUCCESS;
681
682 if (!m_inputFileUrl.empty())
683 {
684 if (m_newInputFile == false)
685 {
686 for (auto& module : m_modules)
687 ANA_CHECK (module->onCloseInputFile (*this));
688 for (auto& module : m_modules)
689 ANA_CHECK (module->postCloseInputFile (*this));
690 }
691 m_newInputFile = false;
692 m_hasInputEvents = false;
693 m_inputTree = nullptr;
694 m_inputFile.reset ();
695 m_inputFileUrl.clear ();
696 }
697
698 if (inputFileUrl.empty())
699 return ::StatusCode::SUCCESS;
700
701 ANA_MSG_INFO ("Opening file " << inputFileUrl);
702 std::unique_ptr<TFile> inputFile;
703 try
704 {
705 inputFile = SH::openFile (inputFileUrl, *metaData());
706 } catch (...)
707 {
708 Detail::report_exception (std::current_exception());
709 }
710 if (inputFile.get() == 0)
711 {
712 ANA_MSG_ERROR ("failed to open file " << inputFileUrl);
713 for (auto& module : m_modules)
714 module->reportInputFailure (*this);
715 return ::StatusCode::FAILURE;
716 }
717 if (inputFile->IsZombie())
718 {
719 ANA_MSG_ERROR ("input file is a zombie: " << inputFileUrl);
720 for (auto& module : m_modules)
721 module->reportInputFailure (*this);
722 return ::StatusCode::FAILURE;
723 }
724
725 // Direct TTree access
726 TTree *tree = 0;
727 const std::string treeName
729 tree = dynamic_cast<TTree*>(inputFile->Get (treeName.c_str()));
730 if (tree == nullptr)
731 {
732 ANA_MSG_INFO ("tree " << treeName << " not found in input file: " << inputFileUrl);
733 ANA_MSG_INFO ("treating this like a tree with no events");
734 }
735 else {
736 m_hasInputEvents = (tree->GetEntries() > 0);
737 }
738
739 m_newInputFile = true;
741 m_inputEntry = 0;
742 m_inputFile = std::move (inputFile);
743 m_inputFileUrl = std::move (inputFileUrl);
744
745 // onFirstInputFile to setup Event object
747 {
748 for (auto& module : m_modules)
749 ANA_CHECK (module->onFirstInputFile (*this));
750 m_firstInputFile = false;
751 }
752 else {
753 for (auto& module : m_modules)
754 ANA_CHECK (module->onNextInputFile (*this));
755 }
756
757 // Check if we have input events - done above for TTree
758 if (m_inputTree == nullptr) {
759 if (m_event) m_hasInputEvents = (m_event->getEntries() > 0);
760 }
761
762 return ::StatusCode::SUCCESS;
763 }
764
765
766
767 ::StatusCode Worker ::
768 addOutputStream (const std::string& label,
770 {
771 using namespace msgEventLoop;
773
774 if (m_outputs.find (label) != m_outputs.end())
775 {
776 ANA_MSG_ERROR ("output file already defined for label: " + label);
777 return ::StatusCode::FAILURE;
778 }
779 if (data.file() == nullptr)
780 {
781 ANA_MSG_ERROR ("output stream does not have a file attached");
782 return ::StatusCode::FAILURE;
783 }
784 if (data.mainStreamName().empty())
785 data.setMainStreamName (label);
786 m_outputs.insert (std::make_pair (label, std::make_shared<Detail::OutputStreamData>(std::move (data))));
787 return ::StatusCode::SUCCESS;
788 }
789
790
791
792 Long64_t Worker ::
793 inputFileNumEntries () const
794 {
795 RCU_READ_INVARIANT (this);
796 RCU_REQUIRE (inputFile() != 0);
797
798 if (m_event) {
799 return m_event->getEntries();
800 }
801 else if (m_inputTree != 0)
802 return m_inputTree->GetEntries();
803 else
804 return 0;
805 }
806
807
808
809 uint64_t Worker ::
810 eventsProcessed () const noexcept
811 {
812 RCU_READ_INVARIANT (this);
813 return m_eventsProcessed;
814 }
815
816
817
818 ::StatusCode Worker ::
819 directExecute (const SH::Sample& sample, const Job& job,
820 const std::string& location, const SH::MetaObject& options)
821 {
822 using namespace msgEventLoop;
824
825 SH::MetaObject meta (*sample.meta());
826 meta.fetchDefaults (options);
827
828 setMetaData (&meta);
829 setOutputHist (location);
830 setSegmentName (sample.name());
831
832 ANA_MSG_INFO ("Running sample: " << sample.name());
833
834 setJobConfig (JobConfig (job.jobConfig()));
835
836 for (Job::outputIter out = job.outputBegin(),
837 end = job.outputEnd(); out != end; ++ out)
838 {
840 out->output()->makeWriter (sample.name(), "", ".root")};
841 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
842 }
843
844 {
845 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
846 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", sample.makeFileList()));
847 Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
848 if (maxEvents != -1)
849 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
850 Long64_t skipEvents = metaData()->castDouble (Job::optSkipEvents, 0);
851 if (skipEvents != 0)
852 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", skipEvents));
853 }
854
857 ANA_CHECK (finalize ());
858 return ::StatusCode::SUCCESS;
859 }
860
861
862
863 ::StatusCode Worker ::
864 batchExecute (unsigned job_id, const char *confFile)
865 {
866 using namespace msgEventLoop;
868
869 try
870 {
871 std::unique_ptr<TFile> file (TFile::Open (confFile, "READ"));
872 if (file.get() == nullptr || file->IsZombie())
873 {
874 ANA_MSG_ERROR ("failed to open file: " << confFile);
875 return ::StatusCode::FAILURE;
876 }
877
878 std::unique_ptr<BatchJob> job (dynamic_cast<BatchJob*>(file->Get ("job")));
879 m_batchJob = job.get();
880 if (job.get() == nullptr)
881 {
882 ANA_MSG_ERROR ("failed to retrieve BatchJob object");
883 return ::StatusCode::FAILURE;
884 }
885
886 if (job_id >= job->segments.size())
887 {
888 ANA_MSG_ERROR ("invalid job-id " << job_id << ", max is " << job->segments.size());
889 return ::StatusCode::FAILURE;
890 }
891 BatchSegment *segment = &job->segments[job_id];
892 RCU_ASSERT (segment->job_id == job_id);
893 RCU_ASSERT (segment->sample < job->samples.size());
894 BatchSample *sample = &job->samples[segment->sample];
895
896 gSystem->Exec ("pwd");
897 gSystem->MakeDirectory ("output");
898
899 setMetaData (&sample->meta);
900 setOutputHist (job->location + "/fetch");
901 setSegmentName (segment->fullName);
902
903 setJobConfig (JobConfig (job->job.jobConfig()));
904
905 for (Job::outputIter out = job->job.outputBegin(),
906 end = job->job.outputEnd(); out != end; ++ out)
907 {
909 out->output()->makeWriter (segment->sampleName, segment->segmentName, ".root")};
910 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
911 }
912
913 {
914 m_moduleConfig.emplace_back ("EL::Detail::BatchInputModule/BatchInputModule");
915 ANA_CHECK (m_moduleConfig.back().setProperty ("jobId", job_id));
916 Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
917 if (maxEvents != -1)
918 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
919 }
920
923 ANA_CHECK (finalize ());
924
925 std::ostringstream job_name;
926 job_name << job_id;
927 std::ofstream completed ((job->location + "/status/completed-" + job_name.str()).c_str());
928 return ::StatusCode::SUCCESS;
929 } catch (...)
930 {
931 Detail::report_exception (std::current_exception());
932 return ::StatusCode::FAILURE;
933 }
934 }
935
936
937
938 ::StatusCode Worker ::
939 gridExecute (const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
940 {
941 using namespace msgEventLoop;
943
944 ANA_MSG_INFO ("Running with ROOT version " << gROOT->GetVersion()
945 << " (" << gROOT->GetVersionDate() << ")");
946
947 ANA_MSG_INFO ("Loading EventLoop grid job");
948
949
950 TList bigOutputs;
951 std::unique_ptr<JobConfig> jobConfig;
952 SH::MetaObject *mo = 0;
953
954 std::unique_ptr<TFile> f (TFile::Open("jobdef.root"));
955 if (f == nullptr || f->IsZombie()) {
956 ANA_MSG_ERROR ("Could not read jobdef");
957 return ::StatusCode::FAILURE;
958 }
959
960 mo = dynamic_cast<SH::MetaObject*>(f->Get(sampleName.c_str()));
961 if (!mo)
962 mo = dynamic_cast<SH::MetaObject*>(f->Get("defaultMetaObject"));
963 if (!mo) {
964 ANA_MSG_ERROR ("Could not read in sample meta object");
965 return ::StatusCode::FAILURE;
966 }
967
968 jobConfig.reset (dynamic_cast<JobConfig*>(f->Get("jobConfig")));
969 if (jobConfig == nullptr)
970 {
971 ANA_MSG_ERROR ("failed to read jobConfig object");
972 return ::StatusCode::FAILURE;
973 }
974
975 {
976 std::unique_ptr<TList> outs ((TList*)f->Get("outputs"));
977 if (outs == nullptr)
978 {
979 ANA_MSG_ERROR ("Could not read list of outputs");
980 return ::StatusCode::FAILURE;
981 }
982
983 TIter itr(outs.get());
984 TObject *obj = 0;
985 while ((obj = itr())) {
986 EL::OutputStream * out = dynamic_cast<EL::OutputStream*>(obj);
987 if (out) {
988 bigOutputs.Add(out);
989 }
990 else {
991 ANA_MSG_ERROR ("Encountered unexpected entry in list of outputs");
992 return ::StatusCode::FAILURE;
993 }
994 }
995 }
996
997 f->Close();
998 f.reset ();
999
1000 const std::string location = ".";
1001
1002 mo->setBool (Job::optGridReporting, true);
1003 setMetaData (mo);
1004 setOutputHist (location);
1005 setSegmentName ("output");
1006
1007 ANA_MSG_INFO ("Starting EventLoop Grid worker");
1008
1009 {//Create and register the "big" output files with base class
1010 TIter itr(&bigOutputs);
1011 TObject *obj = 0;
1012 while ((obj = itr())) {
1013 EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
1014 if (os == nullptr)
1015 {
1016 ANA_MSG_ERROR ("Bad input");
1017 return ::StatusCode::FAILURE;
1018 }
1019 {
1021 location + "/" + os->label() + ".root", "RECREATE"};
1022 ANA_CHECK (addOutputStream (os->label(), std::move (data)));
1023 }
1024 }
1025 }
1026
1027 setJobConfig (std::move (*jobConfig));
1028
1029 {
1030 std::vector<std::string> fileList;
1031 std::ifstream infile("input.txt");
1032 while (infile) {
1033 std::string sLine;
1034 if (!getline(infile, sLine)) break;
1035 std::istringstream ssLine(sLine);
1036 while (ssLine) {
1037 std::string sFile;
1038 if (!getline(ssLine, sFile, ',')) break;
1039 fileList.push_back(sFile);
1040 }
1041 }
1042 if (fileList.size() == 0) {
1043 ANA_MSG_ERROR ("no input files provided");
1044 //User was expecting input after all.
1045 gSystem->Exit(EC_BADINPUT);
1046 }
1047 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
1048 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", fileList));
1049
1050 if (nEventsPerJob != -1)
1051 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", nEventsPerJob));
1052 if (SkipEvents != 0)
1053 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", SkipEvents));
1054 }
1055
1058 ANA_CHECK (finalize ());
1059
1060 int nEvents = eventsProcessed();
1061 ANA_MSG_INFO ("Loop finished.");
1062 ANA_MSG_INFO ("Read/processed " << nEvents << " events.");
1063
1064 ANA_MSG_INFO ("EventLoop Grid worker finished");
1065 ANA_MSG_INFO ("Saving output");
1066 return ::StatusCode::SUCCESS;
1067 }
1068}
#define RCU_INVARIANT(x)
Definition Assert.h:196
#define RCU_ASSERT(x)
Definition Assert.h:217
#define RCU_DESTROY_INVARIANT(x)
Definition Assert.h:230
#define RCU_CHANGE_INVARIANT(x)
Definition Assert.h:226
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:228
#define RCU_REQUIRE(x)
Definition Assert.h:203
#define RCU_REQUIRE_SOFT(x)
Definition Assert.h:148
#define RCU_READ_INVARIANT(x)
Definition Assert.h:224
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:53
Run a MT piece of code with an alternate root error handler.
all data needed to manage a given output stream
the job configuration that is independent of driver and dataset
Definition JobConfig.h:45
Definition Job.h:42
static const std::string optPrintPerFileStats
description: the option to turn on printing of i/o statistics at the end of each file rationale: whil...
Definition Job.h:404
static const std::string optMemFailOnLeak
Failure behaviour of the code when a "significant memory leak" is found.
Definition Job.h:586
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
Definition Job.h:221
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
Definition Job.h:489
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
Definition Job.h:201
static const std::string optMemResidentIncreaseLimit
The minimal resident memory increase necessary to trigger an error.
Definition Job.h:564
static const std::string optXAODPerfStats
description: the name of the option for turning on XAODPerfStats.
Definition Job.h:352
const OutputStream * outputIter
Definition Job.h:139
static const std::string optXAODSummaryReport
the option to turn on/off the xAOD summary reporting at the end of the job
Definition Job.h:394
static const std::string optCacheLearnEntries
description: this option allows to configure the number of tree entries used for learning cache behav...
Definition Job.h:323
static const std::string optCacheSize
description: this option allows to configure the TTreeCache size for this job.
Definition Job.h:308
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
Definition Job.h:206
static const std::string optXAODInput
the option to select whether our input is xAODs
Definition Job.h:389
static const std::string optMemResidentPerEventIncreaseLimit
The minimal per-event resident memory increase for triggering an error.
Definition Job.h:546
static const std::string optMemVirtualIncreaseLimit
The minimal virtual memory increase necessary to trigger an error.
Definition Job.h:572
static const std::string optMemVirtualPerEventIncreaseLimit
The minimal per-event virtual memory increase for triggering an error.
Definition Job.h:556
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
Definition Job.h:229
static const std::string optStreamAliases
an option for stream aliases
Definition Job.h:213
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
Definition Job.h:210
static const std::string histogramStreamName
the name of the histogram output stream
Definition Job.h:597
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
Definition Worker.cxx:219
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
Definition Worker.cxx:244
@ EC_BADINPUT
Definition Worker.h:260
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
Definition Worker.cxx:103
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
Definition Worker.cxx:156
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
Definition Worker.cxx:228
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
Definition Worker.cxx:178
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
Definition Worker.cxx:645
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
Definition Worker.cxx:353
std::vector< asg::AsgComponentConfig > m_moduleConfig
the module configurations we use
Definition Worker.h:457
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
Definition Worker.cxx:768
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
Definition Worker.cxx:201
std::string m_segmentName
the name of the segment we are processing
Definition Worker.h:442
::StatusCode finalize()
finalize the worker
Definition Worker.cxx:514
bool m_firstInputFile
whether this is the first input file
Definition Worker.h:426
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
Definition Worker.cxx:373
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
Definition Worker.cxx:670
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
Definition Worker.cxx:810
bool m_newInputFile
whether this is a new input file (i.e.
Definition Worker.h:432
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
Definition Worker.cxx:210
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
Definition Worker.cxx:342
bool m_algorithmsInitialized
whether the algorithms are initialized
Definition Worker.h:447
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
Definition Worker.h:421
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
Definition Worker.cxx:89
::StatusCode initialize()
initialize the worker
Definition Worker.cxx:385
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition Worker.cxx:144
void setSegmentName(const std::string &val_segmentName)
set the segment name
Definition Worker.cxx:363
Long64_t inputFileNumEntries() const override
the number of events in the input file
Definition Worker.cxx:793
::StatusCode processInputs()
process all the inputs
Definition Worker.cxx:499
bool m_firstEvent
whether we are still to process the first event
Definition Worker.h:452
std::string m_outputTarget
the target file to which we will write the histogram output
Definition Worker.h:437
Run a MT piece of code with an alternate root error handler.
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:48
void setBool(const std::string &name, bool value)
set the meta-data boolean with the given name
a base class that manages a set of files belonging to a particular data set and the associated meta-d...
Definition Sample.h:49
Base class for the event (xAOD::TEvent and xAOD::REvent) classes.
Definition Event.h:60
A relatively simple transient store for objects created in analysis.
Definition TStore.h:45
const int nEvents
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179
std::string label(const std::string &format, int i)
Definition label.h:19
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
This module defines the arguments passed from the BATCH driver to the BATCH worker.
::StatusCode StatusCode
StatusCode definition for legacy code.
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
Definition RootUtils.cxx:25
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
-diff
void initialize()
UInt_t job_id
description: the job id of this segment
std::string segmentName
the name/id to use for this segment (not including the sample name)
std::string fullName
the name/id to use for this segment (including the sample name)
UInt_t sample
description: the index of the sample we are using
std::string sampleName
the name of the sample for this segment
std::string m_inputFileUrl
the input file url of the currently opened file
Definition ModuleData.h:70
const SH::MetaObject * m_metaData
the meta-data we use
Definition ModuleData.h:89
Worker * m_worker
the worker (to pass on to the algorithms)
Definition ModuleData.h:110
TTree * m_inputTree
the (main) tree in the input file
Definition ModuleData.h:76
uint64_t m_inputEntry
the entry in the input tree we are currently looking at
Definition ModuleData.h:80
BatchJob * m_batchJob
the BatchJob configuration (if used)
Definition ModuleData.h:116
xAOD::TStore * m_tstore
the TStore structure, if we use one
Definition ModuleData.h:104
bool m_skipEvent
whether we are skipping the current event
Definition ModuleData.h:86
OutputStreamData * m_histOutput
the histogram output stream
Definition ModuleData.h:95
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
Definition ModuleData.h:98
bool m_hasInputEvents
flag whether the most recently opened input file has events or not
Definition ModuleData.h:83
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
Definition ModuleData.h:73
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
the list of output files
Definition ModuleData.h:113
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
Definition ModuleData.h:67
uint64_t m_eventsProcessed
the number of events that have been processed
Definition ModuleData.h:92
xAOD::Event * m_event
the Event object, if we use one
Definition ModuleData.h:101
a range of events in a given file
Definition EventRange.h:22
std::string m_url
the location of the file
Definition EventRange.h:24
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Definition EventRange.h:34
Long64_t m_beginEvent
the first event to process
Definition EventRange.h:27
Long64_t m_endEvent
the event past the last event, or eof
Definition EventRange.h:30
static const std::string treeName_default
the default value of treeName
Definition MetaFields.h:47
static const std::string treeName
the name of the tree in the sample
Definition MetaFields.h:44
MsgStream & msg
Definition testRead.cxx:32
TFile * file